juju_scaleway.OpRunner.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 4
rs 10
1
# -*- coding: utf-8 -*-
2
#
3
# Copyright (c) 2014-2015 Online SAS and Contributors. All Rights Reserved.
4
#                         Edouard Bonlieu <[email protected]>
5
#                         Julien Castets <[email protected]>
6
#                         Manfred Touron <[email protected]>
7
#                         Kevin Deldycke <[email protected]>
8
#
9
# Licensed under the BSD 2-Clause License (the "License"); you may not use this
10
# file except in compliance with the License. You may obtain a copy of the
11
# License at http://opensource.org/licenses/BSD-2-Clause
12
13
"""
14
Thread based concurrency around bulk ops. scaleway api is sync
15
"""
16
17
import logging
18
19
try:
20
    from Queue import Queue, Empty
21
except ImportError:  # Python3
22
    from queue import Queue, Empty
23
24
import threading
25
26
27
logger = logging.getLogger("juju.scaleway")
28
29
30
class Runner(object):
31
32
    DEFAULT_NUM_RUNNER = 4
33
34
    def __init__(self):
35
        self.jobs = Queue()
36
        self.results = Queue()
37
        self.job_count = 0
38
        self.runners = []
39
        self.started = False
40
41
    def queue_op(self, operation):
42
        self.jobs.put(operation)
43
        self.job_count += 1
44
45
    def iter_results(self):
46
        auto = not self.started
47
48
        if auto:
49
            self.start(min(self.DEFAULT_NUM_RUNNER, self.job_count))
50
51
        for _ in range(self.job_count):
52
            self.job_count -= 1
53
            result = self.gather_result()
54
            if isinstance(result, Exception):
55
                continue
56
            yield result
57
58
        if auto:
59
            self.stop()
60
61
    def gather_result(self):
62
        return self.results.get()
63
64
    def start(self, count):
65
        for _ in range(count):
66
            runner = OpRunner(self.jobs, self.results)
67
            runner.daemon = True
68
            self.runners.append(runner)
69
            runner.start()
70
        self.started = True
71
72
    def stop(self):
73
        for runner in self.runners:
74
            runner.join()
75
        self.started = False
76
77
78
class OpRunner(threading.Thread):
79
80
    def __init__(self, ops, results):
81
        self.ops = ops
82
        self.results = results
83
        super(OpRunner, self).__init__()
84
85
    def run(self):
86
        while 1:
87
            try:
88
                operation = self.ops.get(block=False)
89
            except Empty:
90
                return
91
92
            try:
93
                result = operation.run()
94
            except Exception as exc:
95
                logger.exception("Error while processing op %s", operation)
96
                result = exc
97
98
            self.results.put(result)
99