1 | <?php |
||
2 | |||
3 | namespace Pheanstalk; |
||
4 | |||
5 | use Doctrine\Common\Collections\ArrayCollection; |
||
6 | use Pheanstalk\Command\CreateCommand; |
||
7 | use Pheanstalk\Command\CreateScheduleCommand; |
||
8 | use Pheanstalk\Command\CreateTubeCommand; |
||
9 | use Pheanstalk\Command\GetWorkflowCommand; |
||
10 | use Pheanstalk\Command\GetWorkflowInstancesCommand; |
||
11 | use Pheanstalk\Command\GetWorkflowInstancesDetailCommand; |
||
12 | use Pheanstalk\Command\ListWorkflowsCommand; |
||
13 | use Pheanstalk\Command\ReleaseCommand; |
||
14 | use Pheanstalk\Command\UpdateTubeCommand; |
||
15 | use Pheanstalk\Command\WorkflowExistsCommand; |
||
16 | use Pheanstalk\Exception\ServerDuplicateEntryException; |
||
17 | use Pheanstalk\Structure\Job; |
||
18 | use Pheanstalk\Structure\Schedule; |
||
19 | use Pheanstalk\Structure\Task; |
||
20 | use Pheanstalk\Structure\TaskInstance; |
||
21 | use Pheanstalk\Structure\TimeSchedule; |
||
22 | use Pheanstalk\Structure\Tube; |
||
23 | use Pheanstalk\Structure\Workflow; |
||
24 | use Pheanstalk\Structure\WorkflowInstance; |
||
25 | |||
26 | /** |
||
27 | * Pheanstalk is a PHP client for the beanstalkd workqueue. |
||
28 | * |
||
29 | * The Pheanstalk class is a simple facade for the various underlying components. |
||
30 | * |
||
31 | * @see http://github.com/kr/beanstalkd |
||
32 | * @see http://xph.us/software/beanstalkd/ |
||
33 | * |
||
34 | * @author Paul Annesley |
||
35 | * @package Pheanstalk |
||
36 | * @license http://www.opensource.org/licenses/mit-license.php |
||
37 | */ |
||
38 | class Pheanstalk implements PheanstalkInterface |
||
39 | { |
||
40 | |||
41 | /** @var Connection $connection */ |
||
42 | private $connection; |
||
43 | |||
44 | /** @var PheanstalkInterface $currentClass */ |
||
45 | private $currentClass; |
||
46 | |||
47 | /** |
||
48 | * @param string $host |
||
49 | * @param string $user |
||
50 | * @param string $password |
||
51 | * @param int $port |
||
52 | * @param int $connectTimeout |
||
53 | * @param bool $connectPersistent |
||
54 | */ |
||
55 | 21 | public function __construct($host, $user = null, $password = null, $port = PheanstalkInterface::DEFAULT_PORT, $connectTimeout = null, $connectPersistent = false) |
|
56 | { |
||
57 | 21 | $this->setConnection(new Connection($host, $user, $password, $port, $connectTimeout, $connectPersistent)); |
|
58 | } |
||
59 | |||
60 | /** |
||
61 | * {@inheritdoc} |
||
62 | */ |
||
63 | 21 | public function setConnection(Connection $connection) |
|
64 | { |
||
65 | 21 | $this->connection = $connection; |
|
66 | |||
67 | 21 | return $this; |
|
68 | } |
||
69 | |||
70 | /** |
||
71 | * {@inheritdoc} |
||
72 | */ |
||
73 | 2 | public function getConnection() |
|
74 | { |
||
75 | 2 | return $this->connection; |
|
76 | } |
||
77 | |||
78 | /** |
||
79 | * @return PheanstalkInterface |
||
80 | */ |
||
81 | 14 | public function getCurrentClass(): PheanstalkInterface |
|
82 | { |
||
83 | 14 | return $this->currentClass ?? $this; |
|
84 | } |
||
85 | |||
86 | /** |
||
87 | * @param PheanstalkInterface $currentClass |
||
88 | * |
||
89 | * @return Pheanstalk |
||
90 | */ |
||
91 | 1 | public function setCurrentClass(PheanstalkInterface $currentClass): PheanstalkInterface |
|
92 | { |
||
93 | 1 | $this->currentClass = $currentClass; |
|
94 | 1 | return $this; |
|
95 | } |
||
96 | |||
97 | // ---------------------------------------- |
||
98 | |||
99 | /** |
||
100 | * {@inheritdoc} |
||
101 | */ |
||
102 | 1 | public function deleteSchedule(Schedule $schedule) |
|
103 | { |
||
104 | 1 | return $this->_dispatch(new Command\DeleteScheduleCommand($schedule)); |
|
105 | } |
||
106 | |||
107 | /** |
||
108 | * {@inheritdoc} |
||
109 | */ |
||
110 | 6 | public function delete(Workflow $workflow) |
|
111 | { |
||
112 | 6 | return $this->_dispatch(new Command\DeleteCommand($workflow)); |
|
113 | } |
||
114 | |||
115 | /** |
||
116 | * {@inheritdoc} |
||
117 | */ |
||
118 | 1 | public function deleteTube(Tube $tube) |
|
119 | { |
||
120 | 1 | return $this->_dispatch(new Command\DeleteTubeCommand($tube)); |
|
121 | } |
||
122 | |||
123 | /** |
||
124 | * {@inheritdoc} |
||
125 | */ |
||
126 | 8 | public function workflowExists($name) |
|
127 | { |
||
128 | 8 | $workflow = $this->_dispatch(new Command\WorkflowExistsCommand($name)); |
|
129 | 7 | if ($workflow instanceof Workflow) { |
|
130 | 7 | return $this->getCurrentClass()->getWorkflow($workflow); |
|
131 | } |
||
132 | 1 | return false; |
|
133 | } |
||
134 | |||
135 | /** |
||
136 | * {@inheritdoc} |
||
137 | */ |
||
138 | 1 | public function getSchedule(int $scheduleId) |
|
139 | { |
||
140 | 1 | return $this->_dispatch(new Command\GetScheduleCommand($scheduleId)); |
|
141 | } |
||
142 | |||
143 | /** |
||
144 | * {@inheritdoc} |
||
145 | */ |
||
146 | 1 | public function listSchedules() |
|
147 | { |
||
148 | 1 | return $this->_dispatch(new Command\ListSchedulesCommand()); |
|
149 | } |
||
150 | |||
151 | /** |
||
152 | * {@inheritdoc} |
||
153 | */ |
||
154 | 7 | public function getWorkflow(Workflow $workflow) |
|
155 | { |
||
156 | 7 | return $this->_dispatch(new Command\GetWorkflowCommand($workflow)); |
|
157 | } |
||
158 | |||
159 | /** |
||
160 | * {@inheritdoc} |
||
161 | */ |
||
162 | 5 | public function getWorkflowInstances(?Workflow $workflow, string $status = null) |
|
163 | { |
||
164 | 5 | $paramsStatus = empty($status) ? GetWorkflowInstancesDetailCommand::FILTERS : [$status]; |
|
165 | 5 | $instances = new ArrayCollection([]); |
|
166 | 5 | foreach ($paramsStatus as $stat) { |
|
167 | 5 | $instances[strtolower($stat)] = $this->getStatusInstance($stat, $workflow); |
|
168 | } |
||
169 | 5 | if (!is_null($status)) { |
|
170 | 3 | return $instances->get(strtolower($status))->get('workflow_instances'); |
|
171 | } |
||
172 | |||
173 | 2 | return $instances; |
|
174 | } |
||
175 | |||
176 | /** |
||
177 | * @param $stat |
||
178 | * @param Workflow|null $workflow |
||
179 | * |
||
180 | * @return ArrayCollection |
||
181 | * @throws Exception\ClientException |
||
182 | */ |
||
183 | 5 | protected function getStatusInstance($stat, ?Workflow $workflow) |
|
184 | { |
||
185 | 5 | $workflowInstances = $this->_dispatch(new Command\GetWorkflowInstancesCommand($workflow, $stat)); |
|
186 | /** @var ArrayCollection $workflowCollection */ |
||
187 | 5 | $workflowCollection = $workflowInstances->get('workflow_instances'); |
|
188 | 5 | if (!empty($workflowCollection)) { |
|
189 | 4 | foreach ($workflowCollection as $instance) { |
|
190 | 4 | $this->getCurrentClass()->getWorkflowInstancesDetails($instance); |
|
191 | } |
||
192 | } |
||
193 | 5 | return $workflowInstances; |
|
194 | } |
||
195 | |||
196 | /** |
||
197 | * {@inheritdoc} |
||
198 | */ |
||
199 | 4 | public function getWorkflowInstancesDetails(WorkflowInstance $workflowInstance) |
|
200 | { |
||
201 | 4 | return $this->_dispatch(new Command\GetWorkflowInstancesDetailCommand($workflowInstance)); |
|
202 | } |
||
203 | |||
204 | /** |
||
205 | * {@inheritdoc} |
||
206 | */ |
||
207 | 9 | public function tubeExists($name) |
|
208 | { |
||
209 | 9 | return $this->_dispatch(new Command\TubeExistsCommand($name)); |
|
210 | } |
||
211 | |||
212 | /** |
||
213 | * {@inheritdoc} |
||
214 | */ |
||
215 | 1 | public function listTubes() |
|
216 | { |
||
217 | 1 | return $this->_dispatch(new Command\ListTubesCommand()); |
|
218 | } |
||
219 | |||
220 | /** |
||
221 | * {@inheritdoc} |
||
222 | */ |
||
223 | 1 | public function peek() |
|
224 | { |
||
225 | 1 | $response = $this->_dispatch(new Command\PeekCommand()); |
|
226 | |||
227 | 1 | return $response; |
|
228 | } |
||
229 | |||
230 | /** |
||
231 | * {@inheritdoc} |
||
232 | */ |
||
233 | 6 | public function put(Workflow $workflow) |
|
234 | { |
||
235 | 6 | $response = $this->_dispatch(new Command\PutCommand($workflow)); |
|
236 | |||
237 | 6 | return $response['workflow-instance-id']; |
|
238 | } |
||
239 | |||
240 | /** |
||
241 | * {@inheritdoc} |
||
242 | */ |
||
243 | 1 | public function statsTube(Tube $tube) |
|
244 | { |
||
245 | 1 | return $this->_dispatch(new Command\StatsTubeCommand($tube)); |
|
246 | } |
||
247 | |||
248 | /** |
||
249 | * {@inheritdoc} |
||
250 | */ |
||
251 | 1 | public function stats() |
|
252 | { |
||
253 | 1 | return $this->_dispatch(new Command\StatsCommand()); |
|
254 | } |
||
255 | |||
256 | // ---------------------------------------- |
||
257 | |||
258 | /** |
||
259 | * Dispatches the specified command to the connection object. |
||
260 | * |
||
261 | * If a SocketException occurs, the connection is reset, and the command is |
||
262 | * re-attempted once. |
||
263 | * |
||
264 | * @throws Exception\ClientException |
||
265 | * @param Command $command |
||
266 | * |
||
267 | * @return mixed |
||
268 | */ |
||
269 | 18 | private function _dispatch($command) |
|
270 | { |
||
271 | 18 | return $this->connection->dispatchCommand($command); |
|
272 | } |
||
273 | |||
274 | /** |
||
275 | * {@inheritdoc} |
||
276 | */ |
||
277 | 6 | public function create(Workflow $workflow, $force = false): Workflow |
|
278 | { |
||
279 | try { |
||
280 | 6 | $this->checkAndCreateTubes($workflow); |
|
281 | 6 | $workflow = $this->_dispatch(new Command\CreateCommand($workflow)); |
|
282 | 3 | } catch (ServerDuplicateEntryException $e) { |
|
283 | 3 | if ($force) { |
|
284 | 3 | $workflowToDelete = $this->findWorkflow($workflow); |
|
285 | 3 | $this->getCurrentClass()->delete($workflowToDelete); |
|
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||
286 | |||
287 | 3 | return $this->getCurrentClass()->create($workflow); |
|
288 | } |
||
289 | 1 | throw $e; |
|
290 | } |
||
291 | |||
292 | 6 | return $workflow; |
|
293 | } |
||
294 | |||
295 | /** |
||
296 | * @param Workflow $workflow |
||
297 | * |
||
298 | * @return Workflow|bool |
||
299 | * @throws Exception\ClientException |
||
300 | */ |
||
301 | 3 | public function findWorkflow(Workflow $workflow) |
|
302 | { |
||
303 | 3 | $workflows = $this->_dispatch(new Command\ListWorkflowsCommand()); |
|
304 | return $workflows->filter(function (Workflow $listedWorkflow) use ($workflow) { |
||
305 | 3 | return $listedWorkflow->getName() === $workflow->getName() |
|
306 | 3 | && $listedWorkflow->getGroup() === $workflow->getGroup(); |
|
307 | 3 | })->first(); |
|
308 | } |
||
309 | |||
310 | /** |
||
311 | * @param Workflow $workflow |
||
312 | * |
||
313 | * @throws Exception\ClientException |
||
314 | */ |
||
315 | 6 | public function checkAndCreateTubes(Workflow $workflow) |
|
316 | { |
||
317 | 6 | $tubes = []; |
|
318 | /** @var Job $job */ |
||
319 | 6 | foreach ($workflow->getJobs() as $job) { |
|
320 | /** @var Task $task */ |
||
321 | 6 | foreach ($job->getTasks() as $task) { |
|
322 | 6 | $tubes = array_merge($tubes, [$task->getQueue()]); |
|
323 | } |
||
324 | } |
||
325 | 6 | foreach ($tubes as $tube) { |
|
326 | 6 | if (!$this->getCurrentClass()->tubeExists($tube)) { |
|
327 | 1 | $this->getCurrentClass()->createTube(new Tube($tube, 1)); |
|
328 | }; |
||
329 | } |
||
330 | } |
||
331 | |||
332 | /** |
||
333 | * {@inheritdoc} |
||
334 | */ |
||
335 | 1 | public function update(Workflow $workflow): Workflow |
|
336 | { |
||
337 | 1 | $workflow = $this->_dispatch(new Command\UpdateCommand($workflow)); |
|
338 | 1 | return $workflow; |
|
339 | } |
||
340 | |||
341 | /** |
||
342 | * {@inheritdoc} |
||
343 | */ |
||
344 | 1 | public function updateSchedule(Schedule $schedule): Schedule |
|
345 | { |
||
346 | 1 | $schedule = $this->_dispatch(new Command\UpdateScheduleCommand($schedule)); |
|
347 | 1 | return $schedule; |
|
348 | } |
||
349 | |||
350 | /** |
||
351 | * {@inheritdoc} |
||
352 | */ |
||
353 | 1 | public function createSchedule(Schedule $schedule) |
|
354 | { |
||
355 | 1 | $workflowSchedule = $this->_dispatch( |
|
356 | 1 | new Command\CreateScheduleCommand($schedule) |
|
357 | ); |
||
358 | 1 | return $workflowSchedule; |
|
359 | } |
||
360 | |||
361 | /** |
||
362 | * {@inheritdoc} |
||
363 | */ |
||
364 | 5 | public function createTask(string $name, string $group, string $path, $queue = 'default', $useAgent = false, $user = null, $host = null, $comment = null): Workflow |
|
365 | { |
||
366 | 5 | $task = new Task($path, $queue, $useAgent, $user, $host); |
|
367 | 5 | $job = new Job(new ArrayCollection([$task])); |
|
368 | 5 | $workflow = new Workflow($name, $group, new ArrayCollection([$job]), $comment); |
|
369 | |||
370 | 5 | return $this->getCurrentClass()->create($workflow, true); |
|
371 | } |
||
372 | |||
373 | /** |
||
374 | * {@inheritDoc} |
||
375 | */ |
||
376 | 1 | public function createTube(Tube $tube): Tube |
|
377 | { |
||
378 | 1 | return $this->_dispatch(new Command\CreateTubeCommand($tube)); |
|
379 | } |
||
380 | |||
381 | /** |
||
382 | * {@inheritdoc} |
||
383 | */ |
||
384 | 1 | public function updateTube(Tube $tube): Tube |
|
385 | { |
||
386 | 1 | return $this->_dispatch(new Command\UpdateTubeCommand($tube)); |
|
387 | } |
||
388 | |||
389 | /** |
||
390 | * {@inheritdoc} |
||
391 | */ |
||
392 | 1 | public function cancel(WorkflowInstance $workflowInstance) |
|
393 | { |
||
394 | 1 | return $this->_dispatch(new Command\CancelCommand($workflowInstance)); |
|
395 | } |
||
396 | |||
397 | /** |
||
398 | * {@inheritdoc} |
||
399 | */ |
||
400 | 2 | public function kill(WorkflowInstance $workflowInstance, TaskInstance $taskInstance) |
|
401 | { |
||
402 | 2 | return $this->_dispatch(new Command\KillCommand($workflowInstance, $taskInstance)); |
|
403 | } |
||
404 | } |
||
405 |