Issues (48)

src/Pheanstalk.php (4 issues)

1
<?php
2
3
namespace Pheanstalk;
4
5
use Doctrine\Common\Collections\ArrayCollection;
6
use Pheanstalk\Command\CreateCommand;
7
use Pheanstalk\Command\CreateScheduleCommand;
8
use Pheanstalk\Command\CreateTubeCommand;
9
use Pheanstalk\Command\GetWorkflowCommand;
10
use Pheanstalk\Command\GetWorkflowInstancesCommand;
11
use Pheanstalk\Command\GetWorkflowInstancesDetailCommand;
12
use Pheanstalk\Command\ListWorkflowsCommand;
13
use Pheanstalk\Command\ReleaseCommand;
0 ignored issues
show
The type Pheanstalk\Command\ReleaseCommand was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
14
use Pheanstalk\Command\UpdateTubeCommand;
15
use Pheanstalk\Command\WorkflowExistsCommand;
16
use Pheanstalk\Exception\ServerDuplicateEntryException;
17
use Pheanstalk\Structure\Job;
18
use Pheanstalk\Structure\Schedule;
19
use Pheanstalk\Structure\Task;
20
use Pheanstalk\Structure\TaskInstance;
21
use Pheanstalk\Structure\TimeSchedule;
22
use Pheanstalk\Structure\Tube;
23
use Pheanstalk\Structure\Workflow;
24
use Pheanstalk\Structure\WorkflowInstance;
25
26
/**
27
 * Pheanstalk is a PHP client for the beanstalkd workqueue.
28
 *
29
 * The Pheanstalk class is a simple facade for the various underlying components.
30
 *
31
 * @see http://github.com/kr/beanstalkd
32
 * @see http://xph.us/software/beanstalkd/
33
 *
34
 * @author  Paul Annesley
35
 * @package Pheanstalk
36
 * @license http://www.opensource.org/licenses/mit-license.php
37
 */
38
class Pheanstalk implements PheanstalkInterface
39
{
40
41
    /** @var Connection $connection */
42
    private $connection;
43
44
    /** @var PheanstalkInterface $currentClass */
45
    private $currentClass;
46
47
    /**
48
     * @param string $host
49
     * @param string $user
50
     * @param string $password
51
     * @param int    $port
52
     * @param int    $connectTimeout
53
     * @param bool   $connectPersistent
54
     */
55 21
    public function __construct($host, $user = null, $password = null, $port = PheanstalkInterface::DEFAULT_PORT, $connectTimeout = null, $connectPersistent = false)
56
    {
57 21
        $this->setConnection(new Connection($host, $user, $password, $port, $connectTimeout, $connectPersistent));
58
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 21
    public function setConnection(Connection $connection)
64
    {
65 21
        $this->connection = $connection;
66
67 21
        return $this;
68
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73 2
    public function getConnection()
74
    {
75 2
        return $this->connection;
76
    }
77
78
    /**
79
     * @return PheanstalkInterface
80
     */
81 14
    public function getCurrentClass(): PheanstalkInterface
82
    {
83 14
        return $this->currentClass ?? $this;
84
    }
85
86
    /**
87
     * @param PheanstalkInterface $currentClass
88
     *
89
     * @return Pheanstalk
90
     */
91 1
    public function setCurrentClass(PheanstalkInterface $currentClass): PheanstalkInterface
92
    {
93 1
        $this->currentClass = $currentClass;
94 1
        return $this;
95
    }
96
97
    // ----------------------------------------
98
99
    /**
100
     * {@inheritdoc}
101
     */
102 1
    public function deleteSchedule(Schedule $schedule)
103
    {
104 1
        return $this->_dispatch(new Command\DeleteScheduleCommand($schedule));
105
    }
106
107
    /**
108
     * {@inheritdoc}
109
     */
110 6
    public function delete(Workflow $workflow)
111
    {
112 6
        return $this->_dispatch(new Command\DeleteCommand($workflow));
113
    }
114
115
    /**
116
     * {@inheritdoc}
117
     */
118 1
    public function deleteTube(Tube $tube)
119
    {
120 1
        return $this->_dispatch(new Command\DeleteTubeCommand($tube));
121
    }
122
123
    /**
124
     * {@inheritdoc}
125
     */
126 8
    public function workflowExists($name)
127
    {
128 8
        $workflow = $this->_dispatch(new Command\WorkflowExistsCommand($name));
129 7
        if ($workflow instanceof Workflow) {
130 7
            return $this->getCurrentClass()->getWorkflow($workflow);
131
        }
132 1
        return false;
133
    }
134
135
    /**
136
     * {@inheritdoc}
137
     */
138 1
    public function getSchedule(int $scheduleId)
139
    {
140 1
        return $this->_dispatch(new Command\GetScheduleCommand($scheduleId));
141
    }
142
143
    /**
144
     * {@inheritdoc}
145
     */
146 1
    public function listSchedules()
147
    {
148 1
        return $this->_dispatch(new Command\ListSchedulesCommand());
149
    }
150
151
    /**
152
     * {@inheritdoc}
153
     */
154 7
    public function getWorkflow(Workflow $workflow)
155
    {
156 7
        return $this->_dispatch(new Command\GetWorkflowCommand($workflow));
157
    }
158
159
    /**
160
     * {@inheritdoc}
161
     */
162 5
    public function getWorkflowInstances(?Workflow $workflow, string $status = null)
163
    {
164 5
        $paramsStatus = empty($status) ? GetWorkflowInstancesDetailCommand::FILTERS : [$status];
165 5
        $instances = new ArrayCollection([]);
166 5
        foreach ($paramsStatus as $stat) {
167 5
            $instances[strtolower($stat)] = $this->getStatusInstance($stat, $workflow);
168
        }
169 5
        if (!is_null($status)) {
170 3
            return $instances->get(strtolower($status))->get('workflow_instances');
171
        }
172
173 2
        return $instances;
174
    }
175
176
    /**
177
     * @param               $stat
178
     * @param Workflow|null $workflow
179
     *
180
     * @return ArrayCollection
181
     * @throws Exception\ClientException
182
     */
183 5
    protected function getStatusInstance($stat, ?Workflow $workflow)
184
    {
185 5
        $workflowInstances = $this->_dispatch(new Command\GetWorkflowInstancesCommand($workflow, $stat));
186
        /** @var ArrayCollection $workflowCollection */
187 5
        $workflowCollection = $workflowInstances->get('workflow_instances');
188 5
        if (!empty($workflowCollection)) {
189 4
            foreach ($workflowCollection as $instance) {
190 4
                $this->getCurrentClass()->getWorkflowInstancesDetails($instance);
191
            }
192
        }
193 5
        return $workflowInstances;
194
    }
195
196
    /**
197
     * {@inheritdoc}
198
     */
199 4
    public function getWorkflowInstancesDetails(WorkflowInstance $workflowInstance)
200
    {
201 4
        return $this->_dispatch(new Command\GetWorkflowInstancesDetailCommand($workflowInstance));
202
    }
203
204
    /**
205
     * {@inheritdoc}
206
     */
207 9
    public function tubeExists($name)
208
    {
209 9
        return $this->_dispatch(new Command\TubeExistsCommand($name));
210
    }
211
212
    /**
213
     * {@inheritdoc}
214
     */
215 1
    public function listTubes()
216
    {
217 1
        return $this->_dispatch(new Command\ListTubesCommand());
218
    }
219
220
    /**
221
     * {@inheritdoc}
222
     */
223 1
    public function peek()
224
    {
225 1
        $response = $this->_dispatch(new Command\PeekCommand());
226
227 1
        return $response;
228
    }
229
230
    /**
231
     * {@inheritdoc}
232
     */
233 6
    public function put(Workflow $workflow)
234
    {
235 6
        $response = $this->_dispatch(new Command\PutCommand($workflow));
236
237 6
        return $response['workflow-instance-id'];
238
    }
239
240
    /**
241
     * {@inheritdoc}
242
     */
243 1
    public function statsTube(Tube $tube)
244
    {
245 1
        return $this->_dispatch(new Command\StatsTubeCommand($tube));
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->_dispatch(...tatsTubeCommand($tube)) returns the type array which is incompatible with the return type mandated by Pheanstalk\PheanstalkInterface::statsTube() of object.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
246
    }
247
248
    /**
249
     * {@inheritdoc}
250
     */
251 1
    public function stats()
252
    {
253 1
        return $this->_dispatch(new Command\StatsCommand());
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->_dispatch(...Command\StatsCommand()) returns the type array which is incompatible with the return type mandated by Pheanstalk\PheanstalkInterface::stats() of object.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
254
    }
255
256
    // ----------------------------------------
257
258
    /**
259
     * Dispatches the specified command to the connection object.
260
     *
261
     * If a SocketException occurs, the connection is reset, and the command is
262
     * re-attempted once.
263
     *
264
     * @throws Exception\ClientException
265
     * @param Command $command
266
     *
267
     * @return mixed
268
     */
269 18
    private function _dispatch($command)
270
    {
271 18
        return $this->connection->dispatchCommand($command);
272
    }
273
274
    /**
275
     * {@inheritdoc}
276
     */
277 6
    public function create(Workflow $workflow, $force = false): Workflow
278
    {
279
        try {
280 6
            $this->checkAndCreateTubes($workflow);
281 6
            $workflow = $this->_dispatch(new Command\CreateCommand($workflow));
282 3
        } catch (ServerDuplicateEntryException $e) {
283 3
            if ($force) {
284 3
                $workflowToDelete = $this->findWorkflow($workflow);
285 3
                $this->getCurrentClass()->delete($workflowToDelete);
0 ignored issues
show
It seems like $workflowToDelete can also be of type boolean; however, parameter $workflow of Pheanstalk\Pheanstalk::delete() does only seem to accept Pheanstalk\Structure\Workflow, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

285
                $this->getCurrentClass()->delete(/** @scrutinizer ignore-type */ $workflowToDelete);
Loading history...
286
287 3
                return $this->getCurrentClass()->create($workflow);
288
            }
289 1
            throw $e;
290
        }
291
292 6
        return $workflow;
293
    }
294
295
    /**
296
     * @param Workflow $workflow
297
     *
298
     * @return Workflow|bool
299
     * @throws Exception\ClientException
300
     */
301 3
    public function findWorkflow(Workflow $workflow)
302
    {
303 3
        $workflows = $this->_dispatch(new Command\ListWorkflowsCommand());
304
        return $workflows->filter(function (Workflow $listedWorkflow) use ($workflow) {
305 3
            return $listedWorkflow->getName() === $workflow->getName()
306 3
                && $listedWorkflow->getGroup() === $workflow->getGroup();
307 3
        })->first();
308
    }
309
310
    /**
311
     * @param Workflow $workflow
312
     *
313
     * @throws Exception\ClientException
314
     */
315 6
    public function checkAndCreateTubes(Workflow $workflow)
316
    {
317 6
        $tubes = [];
318
        /** @var Job $job */
319 6
        foreach ($workflow->getJobs() as $job) {
320
            /** @var Task $task */
321 6
            foreach ($job->getTasks() as $task) {
322 6
                $tubes = array_merge($tubes, [$task->getQueue()]);
323
            }
324
        }
325 6
        foreach ($tubes as $tube) {
326 6
            if (!$this->getCurrentClass()->tubeExists($tube)) {
327 1
                $this->getCurrentClass()->createTube(new Tube($tube, 1));
328
            };
329
        }
330
    }
331
332
    /**
333
     * {@inheritdoc}
334
     */
335 1
    public function update(Workflow $workflow): Workflow
336
    {
337 1
        $workflow = $this->_dispatch(new Command\UpdateCommand($workflow));
338 1
        return $workflow;
339
    }
340
341
    /**
342
     * {@inheritdoc}
343
     */
344 1
    public function updateSchedule(Schedule $schedule): Schedule
345
    {
346 1
        $schedule = $this->_dispatch(new Command\UpdateScheduleCommand($schedule));
347 1
        return $schedule;
348
    }
349
350
    /**
351
     * {@inheritdoc}
352
     */
353 1
    public function createSchedule(Schedule $schedule)
354
    {
355 1
        $workflowSchedule = $this->_dispatch(
356 1
            new Command\CreateScheduleCommand($schedule)
357
        );
358 1
        return $workflowSchedule;
359
    }
360
361
    /**
362
     * {@inheritdoc}
363
     */
364 5
    public function createTask(string $name, string $group, string $path, $queue = 'default', $useAgent = false, $user = null, $host = null, $comment = null): Workflow
365
    {
366 5
        $task = new Task($path, $queue, $useAgent, $user, $host);
367 5
        $job = new Job(new ArrayCollection([$task]));
368 5
        $workflow = new Workflow($name, $group, new ArrayCollection([$job]), $comment);
369
370 5
        return $this->getCurrentClass()->create($workflow, true);
371
    }
372
373
    /**
374
     * {@inheritDoc}
375
     */
376 1
    public function createTube(Tube $tube): Tube
377
    {
378 1
        return $this->_dispatch(new Command\CreateTubeCommand($tube));
379
    }
380
381
    /**
382
     * {@inheritdoc}
383
     */
384 1
    public function updateTube(Tube $tube): Tube
385
    {
386 1
        return $this->_dispatch(new Command\UpdateTubeCommand($tube));
387
    }
388
389
    /**
390
     * {@inheritdoc}
391
     */
392 1
    public function cancel(WorkflowInstance $workflowInstance)
393
    {
394 1
        return $this->_dispatch(new Command\CancelCommand($workflowInstance));
395
    }
396
397
    /**
398
     * {@inheritdoc}
399
     */
400 2
    public function kill(WorkflowInstance $workflowInstance, TaskInstance $taskInstance)
401
    {
402 2
        return $this->_dispatch(new Command\KillCommand($workflowInstance, $taskInstance));
403
    }
404
}
405