Pool::get()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 7
ccs 5
cts 5
cp 1
rs 9.4285
cc 2
eloc 4
nc 2
nop 2
crap 2
1
<?php
2
3
namespace Gielfeldt\SimpleWorker;
4
5
/**
6
 * Class Pool
7
 *
8
 * @package Gielfeldt\SimpleWorker
9
 */
10
class Pool
11
{
12
    /**
13
     * Workers in the pool.
14
     * @var array
15
     */
16
    protected $workers = [];
17
18
    /**
19
     * Options for the pool.
20
     * @var array
21
     */
22
    protected $options;
23
24
    /**
25
     * Pool singletons.
26
     * @var Pool[]
27
     */
28
    static protected $pools = [];
29
30
    /**
31
     * Pool constructor.
32
     *
33
     * @param array $options
34
     *   Options for the pool.
35
     *    - concurrency
36
     *    - polling_interval
37
     */
38 6
    public function __construct(array $options = array())
39
    {
40 6
        $this->options = $options + [
41 6
                'concurrency' => 5,
42 6
                'polling_interval' => 0.1,
43 6
                'progress_callback' => null,
44 6
                'finish_callback' => null,
45 6
            ];
46 6
    }
47
48
    /**
49
     * Get a singleton pool.
50
     *
51
     * @param string $name
52
     *   Name of pool.
53
     * @param array $options
54
     *   Options for the pool.
55
     *
56
     * @return \Gielfeldt\SimpleWorker\Pool
57
     */
58 1
    public static function get($name, array $options = array())
59
    {
60 1
        if (!isset(static::$pools[$name])) {
61 1
            static::$pools[$name] = new static($options);
62 1
        }
63 1
        return static::$pools[$name];
64
    }
65
66
    /**
67
     * Add a worker to the pool.
68
     *
69
     * @param \Gielfeldt\SimpleWorker\SimpleWorkerInterface $worker
70
     *   The worker to add.
71
     * @param $callback
72
     *   The callback to fire when ready.
73
     */
74 4
    public function addWorker(SimpleWorkerInterface $worker, $callback)
75
    {
76 4
        $this->workers[] = [$worker, $callback];
77 4
    }
78
79
    /**
80
     * Add multiple workers to the pool.
81
     *
82
     * @param \Gielfeldt\SimpleWorker\SimpleWorkerInterface[] $workers
83
     *   The worker to add.
84
     * @param $callback
85
     *   The callback to fire when ready.
86
     */
87 1
    public function addWorkers(array $workers, $callback)
88
    {
89 1
        foreach ($workers as $worker) {
90 1
            $this->addWorker($worker, $callback);
91 1
        }
92 1
    }
93
94
    /**
95
     * Process workers.
96
     */
97 4
    public function process(array $options = array())
98
    {
99 4
        $options += $this->options;
100 4
        while (!empty($this->workers)) {
101 4
            $skipSleep = false;
102 4
            foreach ($this->workers as $idx => $worker) {
103 4
                if ($idx >= $options['concurrency']) {
104 1
                    break;
105
                }
106
                try {
107 4
                    if ($worker[0]->isReady()) {
108 4
                        call_user_func($worker[1], $worker[0]);
109 3
                        unset($this->workers[$idx]);
110 3
                        $this->workers = array_values($this->workers);
111 3
                        $skipSleep = true;
112 3
                        break;
113
                    }
114 4
                } catch (\Exception $e) {
115
                    // Remove worker from pool if exception occurred.
116 1
                    unset($this->workers[$idx]);
117 1
                    throw $e;
118
                }
119 4
            }
120 4
            if ($options['progress_callback']) {
121 1
                call_user_func($options['progress_callback'], $this);
122 1
            }
123 4
            if (!$skipSleep && $options['polling_interval'] > 0) {
124 4
                usleep($options['polling_interval'] * 1000000);
125 4
            }
126 4
        }
127 3
        if ($options['finish_callback']) {
128 1
            call_user_func($options['finish_callback'], $this);
129 1
        }
130 3
    }
131
}
132