Completed
Push — master ( 244416...62a072 )
by Vasily
11:31
created

ComplexJob::more()   D

Complexity

Conditions 9
Paths 7

Size

Total Lines 30
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 30
rs 4.909
cc 9
eloc 22
nc 7
nop 1
1
<?php
2
namespace PHPDaemon\Core;
3
4
use PHPDaemon\Core\CallbackWrapper;
5
6
/**
7
 * ComplexJob class
8
 * @package PHPDaemon\Core
9
 * @author  Vasily Zorin <[email protected]>
10
 */
11
class ComplexJob implements \ArrayAccess {
12
	use \PHPDaemon\Traits\ClassWatchdog;
13
	use \PHPDaemon\Traits\StaticObjectWatchdog;
14
15
	/**
16
	 * State: waiting
17
	 */
18
	const STATE_WAITING = 1;
19
	
20
	/**
21
	 * State: running
22
	 */
23
	const STATE_RUNNING = 2;
24
	
25
	/**
26
	 * State: done
27
	 */
28
	const STATE_DONE = 3;
29
30
	/**
31
	 * @var array Listeners [callable, ...]
32
	 */
33
	public $listeners = [];
34
35
	/**
36
	 * @var array Hash of results [jobname -> result, ...]
37
	 */
38
	public $results = [];
39
40
	/**
41
	 * @var integer Current state
42
	 */
43
	public $state;
44
45
	/**
46
	 * @var array Hash of jobs [jobname -> callback, ...]
47
	 */
48
	public $jobs = [];
49
50
	/**
51
	 * @var integer Number of results
52
	 */
53
	public $resultsNum = 0;
54
55
	/**
56
	 * @var integer Number of jobs
57
	 */
58
	public $jobsNum = 0;
59
60
	protected $keep = false;
61
62
	protected $more = null;
63
64
	protected $moreFirstFlag;
65
66
	protected $maxConcurrency = -1;
67
68
	protected $backlog;
69
70
	public $vars = [];
71
72
	/**
73
	 * Constructor
74
	 * @param callable $cb Listener
0 ignored issues
show
Documentation introduced by
Should the type for parameter $cb not be callable|null?

This check looks for @param annotations where the type inferred by our type inference engine differs from the declared type.

It makes a suggestion as to what type it considers more descriptive.

Most often this is a case of a parameter that can be null in addition to its declared types.

Loading history...
75
	 */
76
	public function __construct($cb = null) {
77
		$this->state = self::STATE_WAITING;
78
		if ($cb !== null) {
79
			$this->addListener($cb);
80
		}
81
	}
82
83
	/**
84
	 * Handler of isset($job[$name])
85
	 * @param  string $j Job name
86
	 * @return boolean
87
	 */
88
	public function offsetExists($j) {
89
		return isset($this->results[$j]);
90
	}
91
92
	/**
93
	 * Handler of $job[$name]
94
	 * @param  string $j Job name
95
	 * @return mixed
96
	 */
97
	public function offsetGet($j) {
98
		return isset($this->results[$j]) ? $this->results[$j] : null;
99
	}
100
101
	/**
102
	 * Handler of $job[$name] = $value
103
	 * @param  string $j Job name
104
	 * @param  mixed  $v Job result
105
	 * @return void
106
	 */
107
	public function offsetSet($j, $v) {
108
		$this->setResult($j, $v);
109
	}
110
111
	/**
112
	 * Handler of unset($job[$name])
113
	 * @param  string $j Job name
114
	 * @return void
115
	 */
116
	public function offsetUnset($j) {
117
		unset($this->results[$j]);
118
	}
119
120
	/**
121
	 * Returns associative array of results
122
	 * @return array
123
	 */
124
	public function getResults() {
125
		return $this->results;
126
	}
127
	/**
128
	 * Keep
129
	 * @param  boolean $keep Keep?
130
	 * @return void
131
	 */
132
	public function keep($keep = true) {
133
		$this->keep = (boolean) $keep;
134
	}
135
136
	/**
137
	 * Has completed?
138
	 * @return boolean
139
	 */
140
	public function hasCompleted() {
141
		return $this->state === self::STATE_DONE;
142
	}
143
144
	/**
145
	 * Sets a limit of simultaneously executing tasks
146
	 * @param  integer $n Natural number or -1 (no limit)
147
	 * @return this
0 ignored issues
show
Documentation introduced by
Should the return type not be ComplexJob?

This check compares the return type specified in the @return annotation of a function or method doc comment with the types returned by the function and raises an issue if they mismatch.

Loading history...
148
	 */
149
	public function maxConcurrency($n = -1) {
150
		$this->maxConcurrency = $n;
151
		return $this;
152
	}
153
154
	/**
155
	 * Set result
156
	 * @param  string $jobname Job name
157
	 * @param  mixed  $result  Result
158
	 * @return boolean
159
	 */
160
	public function setResult($jobname, $result = null) {
161
		if (isset($this->results[$jobname])) {
162
			return false;
163
		}
164
		$this->results[$jobname] = $result;
165
		++$this->resultsNum;
166
		$this->checkIfAllReady();
167
		return true;
168
	}
169
170
	/**
171
	 * Get result
172
	 * @param  string $jobname Job name
173
	 * @return mixed Result or null
174
	 */
175
	public function getResult($jobname) {
176
		return isset($this->results[$jobname]) ? $this->results[$jobname] : null;
177
	}
178
179
	/**
180
	 * Checks if all jobs are ready
181
	 * @return void
182
	 */
183
	protected function checkIfAllReady() {
184
		$this->checkQueue();
185
		if ($this->resultsNum >= $this->jobsNum) {
186
			$this->jobs  = [];
187
			$this->state = self::STATE_DONE;
188
			foreach ($this->listeners as $cb) {
189
				call_user_func($cb, $this);
190
			}
191
			if (!$this->keep && $this->resultsNum >= $this->jobsNum) {
192
				$this->cleanup();
193
			}
194
		}
195
	}
196
197
	/**
198
	 * Called automatically. Checks whether if the queue is full. If not, tries to pull more jobs from backlog and 'more'
199
	 * @return void
200
	 */
201
	public function checkQueue() {
202
		if ($this->backlog !== null) {
203
			while (!$this->backlog->isEmpty()) {
204
				if ($this->maxConcurrency !== -1 && ($this->jobsNum - $this->resultsNum > $this->maxConcurrency)) {
205
					return;
206
				}
207
				list ($name, $cb) = $this->backlog->shift();
208
				$this->addJob($name, $cb);
209
			}
210
		}
211
		if ($this->more !== null) {
212
			$this->more();
213
		}
214
	}
215
216
	/**
217
	 * Sets a callback which is going to be fired always when we have a room for more jobs
218
	 * @param  callable $cb Callback
0 ignored issues
show
Documentation introduced by
Should the type for parameter $cb not be callable|null?

This check looks for @param annotations where the type inferred by our type inference engine differs from the declared type.

It makes a suggestion as to what type it considers more descriptive.

Most often this is a case of a parameter that can be null in addition to its declared types.

Loading history...
219
	 * @return this
0 ignored issues
show
Documentation introduced by
Should the return type not be ComplexJob?

This check compares the return type specified in the @return annotation of a function or method doc comment with the types returned by the function and raises an issue if they mismatch.

Loading history...
220
	 */
221
	public function more($cb = null) {
222
		if ($cb !== null) {
223
			$this->more = $cb;
224
			$this->moreFirstFlag = true;
225
			return $this;
226
		}
227
		if ($this->more !== null) {
228
			if ($this->more instanceof \Iterator) {
229
				iterator:
230
				$it = $this->more;
231
				while (!$this->isQueueFull() && $it->valid()) {
232
					if ($this->moreFirstFlag) {
233
						$this->moreFirstFlag = false;
234
					} else {
235
						$it->next();
236
						if (!$it->valid()) {
237
							break;
238
						}
239
					}
240
					$this->addJob($it->key(), $it->current());
0 ignored issues
show
Documentation introduced by
$it->current() is of type *, but the function expects a callable.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
241
				}
242
			} else {
243
				if (($r = call_user_func($this->more, $this)) instanceof \Iterator) {
244
					$this->more = $r;
245
					goto iterator;
246
				}
247
			}
248
		}
249
		return $this;
250
	}
251
252
	/**
253
	 * Returns whether or not the queue is full (maxConcurrency option exceed)
254
	 * @return boolean
255
	 */
256
	public function isQueueFull() {
257
		return $this->maxConcurrency !== -1 && ($this->jobsNum - $this->resultsNum >= $this->maxConcurrency);
258
	}
259
260
	/**
261
	 * Adds job
262
	 * @param  string   $name Job name
263
	 * @param  callable $cb   Callback
264
	 * @return boolean Success
265
	 */
266
	public function addJob($name, $cb) {
267
		if (isset($this->jobs[$name])) {
268
			return false;
269
		}
270
		$cb = CallbackWrapper::wrap($cb);
271
		if ($this->maxConcurrency !== -1 && ($this->jobsNum - $this->resultsNum > $this->maxConcurrency)) {
272
			if ($this->backlog === null) {
273
				$this->backlog = new \SplStack;
274
			}
275
			$this->backlog->push([$name, $cb]);
276
			return true;
277
		}
278
		$this->jobs[$name] = $cb;
279
		++$this->jobsNum;
280 View Code Duplication
		if (($this->state === self::STATE_RUNNING) || ($this->state === self::STATE_DONE)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
281
			$this->state = self::STATE_RUNNING;
282
			call_user_func($cb, $name, $this);
283
		}
284
		return true;
285
	}
286
287
	/**
288
	 * Clean up
289
	 * @return void
290
	 */
291
	public function cleanup() {
292
		$this->listeners = [];
293
		$this->results   = [];
294
		$this->jobs      = [];
295
		$this->more      = null;
296
	}
297
298
	/**
299
	 * Adds listener
300
	 * @param  callable $cb Callback
301
	 * @return void
302
	 */
303
	public function addListener($cb) {
304
		if ($this->state === self::STATE_DONE) {
305
			call_user_func($cb, $this);
306
			return;
307
		}
308
		$this->listeners[] = CallbackWrapper::wrap($cb);
309
	}
310
311
	/**
312
	 * Runs the job
313
	 * @return void
314
	 */
315
	public function execute() {
316
		if ($this->state === self::STATE_WAITING) {
317
			$this->state = self::STATE_RUNNING;
318
			foreach ($this->jobs as $name => $cb) {
319
				call_user_func($cb, $name, $this);
320
				$this->jobs[$name] = null;
321
			}
322
			$this->checkIfAllReady();
323
		}
324
	}
325
326
	/**
327
	 * Adds new job or calls execute() method
328
	 * @param  mixed    $name
329
	 * @param  callable $cb
0 ignored issues
show
Documentation introduced by
Should the type for parameter $cb not be callable|null?

This check looks for @param annotations where the type inferred by our type inference engine differs from the declared type.

It makes a suggestion as to what type it considers more descriptive.

Most often this is a case of a parameter that can be null in addition to its declared types.

Loading history...
330
	 * @return void
331
	 */
332
	public function __invoke($name = null, $cb = null) {
333
		if (func_num_args() === 0) {
334
			$this->execute();
335
			return;
336
		}
337
		$this->addJob($name, $cb);
0 ignored issues
show
Bug introduced by
It seems like $cb defined by parameter $cb on line 332 can also be of type null; however, PHPDaemon\Core\ComplexJob::addJob() does only seem to accept callable, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
338
	}
339
}
340