PyRowMan /
pheanstalk
| 1 | <?php |
||||
| 2 | |||||
| 3 | namespace Pheanstalk; |
||||
| 4 | |||||
| 5 | use Doctrine\Common\Collections\ArrayCollection; |
||||
| 6 | use Pheanstalk\Command\CreateCommand; |
||||
| 7 | use Pheanstalk\Command\CreateScheduleCommand; |
||||
| 8 | use Pheanstalk\Command\CreateTubeCommand; |
||||
| 9 | use Pheanstalk\Command\GetWorkflowCommand; |
||||
| 10 | use Pheanstalk\Command\GetWorkflowInstancesCommand; |
||||
| 11 | use Pheanstalk\Command\GetWorkflowInstancesDetailCommand; |
||||
| 12 | use Pheanstalk\Command\ListWorkflowsCommand; |
||||
| 13 | use Pheanstalk\Command\ReleaseCommand; |
||||
|
0 ignored issues
–
show
|
|||||
| 14 | use Pheanstalk\Command\UpdateTubeCommand; |
||||
| 15 | use Pheanstalk\Command\WorkflowExistsCommand; |
||||
| 16 | use Pheanstalk\Exception\ServerDuplicateEntryException; |
||||
| 17 | use Pheanstalk\Structure\Job; |
||||
| 18 | use Pheanstalk\Structure\Schedule; |
||||
| 19 | use Pheanstalk\Structure\Task; |
||||
| 20 | use Pheanstalk\Structure\TaskInstance; |
||||
| 21 | use Pheanstalk\Structure\TimeSchedule; |
||||
| 22 | use Pheanstalk\Structure\Tube; |
||||
| 23 | use Pheanstalk\Structure\Workflow; |
||||
| 24 | use Pheanstalk\Structure\WorkflowInstance; |
||||
| 25 | |||||
| 26 | /** |
||||
| 27 | * Pheanstalk is a PHP client for the beanstalkd workqueue. |
||||
| 28 | * |
||||
| 29 | * The Pheanstalk class is a simple facade for the various underlying components. |
||||
| 30 | * |
||||
| 31 | * @see http://github.com/kr/beanstalkd |
||||
| 32 | * @see http://xph.us/software/beanstalkd/ |
||||
| 33 | * |
||||
| 34 | * @author Paul Annesley |
||||
| 35 | * @package Pheanstalk |
||||
| 36 | * @license http://www.opensource.org/licenses/mit-license.php |
||||
| 37 | */ |
||||
| 38 | class Pheanstalk implements PheanstalkInterface |
||||
| 39 | { |
||||
| 40 | |||||
| 41 | /** @var Connection $connection */ |
||||
| 42 | private $connection; |
||||
| 43 | |||||
| 44 | /** @var PheanstalkInterface $currentClass */ |
||||
| 45 | private $currentClass; |
||||
| 46 | |||||
| 47 | /** |
||||
| 48 | * @param string $host |
||||
| 49 | * @param string $user |
||||
| 50 | * @param string $password |
||||
| 51 | * @param int $port |
||||
| 52 | * @param int $connectTimeout |
||||
| 53 | * @param bool $connectPersistent |
||||
| 54 | */ |
||||
| 55 | 21 | public function __construct($host, $user = null, $password = null, $port = PheanstalkInterface::DEFAULT_PORT, $connectTimeout = null, $connectPersistent = false) |
|||
| 56 | { |
||||
| 57 | 21 | $this->setConnection(new Connection($host, $user, $password, $port, $connectTimeout, $connectPersistent)); |
|||
| 58 | } |
||||
| 59 | |||||
| 60 | /** |
||||
| 61 | * {@inheritdoc} |
||||
| 62 | */ |
||||
| 63 | 21 | public function setConnection(Connection $connection) |
|||
| 64 | { |
||||
| 65 | 21 | $this->connection = $connection; |
|||
| 66 | |||||
| 67 | 21 | return $this; |
|||
| 68 | } |
||||
| 69 | |||||
| 70 | /** |
||||
| 71 | * {@inheritdoc} |
||||
| 72 | */ |
||||
| 73 | 2 | public function getConnection() |
|||
| 74 | { |
||||
| 75 | 2 | return $this->connection; |
|||
| 76 | } |
||||
| 77 | |||||
| 78 | /** |
||||
| 79 | * @return PheanstalkInterface |
||||
| 80 | */ |
||||
| 81 | 14 | public function getCurrentClass(): PheanstalkInterface |
|||
| 82 | { |
||||
| 83 | 14 | return $this->currentClass ?? $this; |
|||
| 84 | } |
||||
| 85 | |||||
| 86 | /** |
||||
| 87 | * @param PheanstalkInterface $currentClass |
||||
| 88 | * |
||||
| 89 | * @return Pheanstalk |
||||
| 90 | */ |
||||
| 91 | 1 | public function setCurrentClass(PheanstalkInterface $currentClass): PheanstalkInterface |
|||
| 92 | { |
||||
| 93 | 1 | $this->currentClass = $currentClass; |
|||
| 94 | 1 | return $this; |
|||
| 95 | } |
||||
| 96 | |||||
| 97 | // ---------------------------------------- |
||||
| 98 | |||||
| 99 | /** |
||||
| 100 | * {@inheritdoc} |
||||
| 101 | */ |
||||
| 102 | 1 | public function deleteSchedule(Schedule $schedule) |
|||
| 103 | { |
||||
| 104 | 1 | return $this->_dispatch(new Command\DeleteScheduleCommand($schedule)); |
|||
| 105 | } |
||||
| 106 | |||||
| 107 | /** |
||||
| 108 | * {@inheritdoc} |
||||
| 109 | */ |
||||
| 110 | 6 | public function delete(Workflow $workflow) |
|||
| 111 | { |
||||
| 112 | 6 | return $this->_dispatch(new Command\DeleteCommand($workflow)); |
|||
| 113 | } |
||||
| 114 | |||||
| 115 | /** |
||||
| 116 | * {@inheritdoc} |
||||
| 117 | */ |
||||
| 118 | 1 | public function deleteTube(Tube $tube) |
|||
| 119 | { |
||||
| 120 | 1 | return $this->_dispatch(new Command\DeleteTubeCommand($tube)); |
|||
| 121 | } |
||||
| 122 | |||||
| 123 | /** |
||||
| 124 | * {@inheritdoc} |
||||
| 125 | */ |
||||
| 126 | 8 | public function workflowExists($name) |
|||
| 127 | { |
||||
| 128 | 8 | $workflow = $this->_dispatch(new Command\WorkflowExistsCommand($name)); |
|||
| 129 | 7 | if ($workflow instanceof Workflow) { |
|||
| 130 | 7 | return $this->getCurrentClass()->getWorkflow($workflow); |
|||
| 131 | } |
||||
| 132 | 1 | return false; |
|||
| 133 | } |
||||
| 134 | |||||
| 135 | /** |
||||
| 136 | * {@inheritdoc} |
||||
| 137 | */ |
||||
| 138 | 1 | public function getSchedule(int $scheduleId) |
|||
| 139 | { |
||||
| 140 | 1 | return $this->_dispatch(new Command\GetScheduleCommand($scheduleId)); |
|||
| 141 | } |
||||
| 142 | |||||
| 143 | /** |
||||
| 144 | * {@inheritdoc} |
||||
| 145 | */ |
||||
| 146 | 1 | public function listSchedules() |
|||
| 147 | { |
||||
| 148 | 1 | return $this->_dispatch(new Command\ListSchedulesCommand()); |
|||
| 149 | } |
||||
| 150 | |||||
| 151 | /** |
||||
| 152 | * {@inheritdoc} |
||||
| 153 | */ |
||||
| 154 | 7 | public function getWorkflow(Workflow $workflow) |
|||
| 155 | { |
||||
| 156 | 7 | return $this->_dispatch(new Command\GetWorkflowCommand($workflow)); |
|||
| 157 | } |
||||
| 158 | |||||
| 159 | /** |
||||
| 160 | * {@inheritdoc} |
||||
| 161 | */ |
||||
| 162 | 5 | public function getWorkflowInstances(?Workflow $workflow, string $status = null) |
|||
| 163 | { |
||||
| 164 | 5 | $paramsStatus = empty($status) ? GetWorkflowInstancesDetailCommand::FILTERS : [$status]; |
|||
| 165 | 5 | $instances = new ArrayCollection([]); |
|||
| 166 | 5 | foreach ($paramsStatus as $stat) { |
|||
| 167 | 5 | $instances[strtolower($stat)] = $this->getStatusInstance($stat, $workflow); |
|||
| 168 | } |
||||
| 169 | 5 | if (!is_null($status)) { |
|||
| 170 | 3 | return $instances->get(strtolower($status))->get('workflow_instances'); |
|||
| 171 | } |
||||
| 172 | |||||
| 173 | 2 | return $instances; |
|||
| 174 | } |
||||
| 175 | |||||
| 176 | /** |
||||
| 177 | * @param $stat |
||||
| 178 | * @param Workflow|null $workflow |
||||
| 179 | * |
||||
| 180 | * @return ArrayCollection |
||||
| 181 | * @throws Exception\ClientException |
||||
| 182 | */ |
||||
| 183 | 5 | protected function getStatusInstance($stat, ?Workflow $workflow) |
|||
| 184 | { |
||||
| 185 | 5 | $workflowInstances = $this->_dispatch(new Command\GetWorkflowInstancesCommand($workflow, $stat)); |
|||
| 186 | /** @var ArrayCollection $workflowCollection */ |
||||
| 187 | 5 | $workflowCollection = $workflowInstances->get('workflow_instances'); |
|||
| 188 | 5 | if (!empty($workflowCollection)) { |
|||
| 189 | 4 | foreach ($workflowCollection as $instance) { |
|||
| 190 | 4 | $this->getCurrentClass()->getWorkflowInstancesDetails($instance); |
|||
| 191 | } |
||||
| 192 | } |
||||
| 193 | 5 | return $workflowInstances; |
|||
| 194 | } |
||||
| 195 | |||||
| 196 | /** |
||||
| 197 | * {@inheritdoc} |
||||
| 198 | */ |
||||
| 199 | 4 | public function getWorkflowInstancesDetails(WorkflowInstance $workflowInstance) |
|||
| 200 | { |
||||
| 201 | 4 | return $this->_dispatch(new Command\GetWorkflowInstancesDetailCommand($workflowInstance)); |
|||
| 202 | } |
||||
| 203 | |||||
| 204 | /** |
||||
| 205 | * {@inheritdoc} |
||||
| 206 | */ |
||||
| 207 | 9 | public function tubeExists($name) |
|||
| 208 | { |
||||
| 209 | 9 | return $this->_dispatch(new Command\TubeExistsCommand($name)); |
|||
| 210 | } |
||||
| 211 | |||||
| 212 | /** |
||||
| 213 | * {@inheritdoc} |
||||
| 214 | */ |
||||
| 215 | 1 | public function listTubes() |
|||
| 216 | { |
||||
| 217 | 1 | return $this->_dispatch(new Command\ListTubesCommand()); |
|||
| 218 | } |
||||
| 219 | |||||
| 220 | /** |
||||
| 221 | * {@inheritdoc} |
||||
| 222 | */ |
||||
| 223 | 1 | public function peek() |
|||
| 224 | { |
||||
| 225 | 1 | $response = $this->_dispatch(new Command\PeekCommand()); |
|||
| 226 | |||||
| 227 | 1 | return $response; |
|||
| 228 | } |
||||
| 229 | |||||
| 230 | /** |
||||
| 231 | * {@inheritdoc} |
||||
| 232 | */ |
||||
| 233 | 6 | public function put(Workflow $workflow) |
|||
| 234 | { |
||||
| 235 | 6 | $response = $this->_dispatch(new Command\PutCommand($workflow)); |
|||
| 236 | |||||
| 237 | 6 | return $response['workflow-instance-id']; |
|||
| 238 | } |
||||
| 239 | |||||
| 240 | /** |
||||
| 241 | * {@inheritdoc} |
||||
| 242 | */ |
||||
| 243 | 1 | public function statsTube(Tube $tube) |
|||
| 244 | { |
||||
| 245 | 1 | return $this->_dispatch(new Command\StatsTubeCommand($tube)); |
|||
|
0 ignored issues
–
show
The expression
return $this->_dispatch(...tatsTubeCommand($tube)) returns the type array which is incompatible with the return type mandated by Pheanstalk\PheanstalkInterface::statsTube() of object.
In the issue above, the returned value is violating the contract defined by the mentioned interface. Let's take a look at an example: interface HasName {
/** @return string */
public function getName();
}
class Name {
public $name;
}
class User implements HasName {
/** @return string|Name */
public function getName() {
return new Name('foo'); // This is a violation of the ``HasName`` interface
// which only allows a string value to be returned.
}
}
Loading history...
|
|||||
| 246 | } |
||||
| 247 | |||||
| 248 | /** |
||||
| 249 | * {@inheritdoc} |
||||
| 250 | */ |
||||
| 251 | 1 | public function stats() |
|||
| 252 | { |
||||
| 253 | 1 | return $this->_dispatch(new Command\StatsCommand()); |
|||
|
0 ignored issues
–
show
The expression
return $this->_dispatch(...Command\StatsCommand()) returns the type array which is incompatible with the return type mandated by Pheanstalk\PheanstalkInterface::stats() of object.
In the issue above, the returned value is violating the contract defined by the mentioned interface. Let's take a look at an example: interface HasName {
/** @return string */
public function getName();
}
class Name {
public $name;
}
class User implements HasName {
/** @return string|Name */
public function getName() {
return new Name('foo'); // This is a violation of the ``HasName`` interface
// which only allows a string value to be returned.
}
}
Loading history...
|
|||||
| 254 | } |
||||
| 255 | |||||
| 256 | // ---------------------------------------- |
||||
| 257 | |||||
| 258 | /** |
||||
| 259 | * Dispatches the specified command to the connection object. |
||||
| 260 | * |
||||
| 261 | * If a SocketException occurs, the connection is reset, and the command is |
||||
| 262 | * re-attempted once. |
||||
| 263 | * |
||||
| 264 | * @throws Exception\ClientException |
||||
| 265 | * @param Command $command |
||||
| 266 | * |
||||
| 267 | * @return mixed |
||||
| 268 | */ |
||||
| 269 | 18 | private function _dispatch($command) |
|||
| 270 | { |
||||
| 271 | 18 | return $this->connection->dispatchCommand($command); |
|||
| 272 | } |
||||
| 273 | |||||
| 274 | /** |
||||
| 275 | * {@inheritdoc} |
||||
| 276 | */ |
||||
| 277 | 6 | public function create(Workflow $workflow, $force = false): Workflow |
|||
| 278 | { |
||||
| 279 | try { |
||||
| 280 | 6 | $this->checkAndCreateTubes($workflow); |
|||
| 281 | 6 | $workflow = $this->_dispatch(new Command\CreateCommand($workflow)); |
|||
| 282 | 3 | } catch (ServerDuplicateEntryException $e) { |
|||
| 283 | 3 | if ($force) { |
|||
| 284 | 3 | $workflowToDelete = $this->findWorkflow($workflow); |
|||
| 285 | 3 | $this->getCurrentClass()->delete($workflowToDelete); |
|||
|
0 ignored issues
–
show
It seems like
$workflowToDelete can also be of type boolean; however, parameter $workflow of Pheanstalk\Pheanstalk::delete() does only seem to accept Pheanstalk\Structure\Workflow, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 286 | |||||
| 287 | 3 | return $this->getCurrentClass()->create($workflow); |
|||
| 288 | } |
||||
| 289 | 1 | throw $e; |
|||
| 290 | } |
||||
| 291 | |||||
| 292 | 6 | return $workflow; |
|||
| 293 | } |
||||
| 294 | |||||
| 295 | /** |
||||
| 296 | * @param Workflow $workflow |
||||
| 297 | * |
||||
| 298 | * @return Workflow|bool |
||||
| 299 | * @throws Exception\ClientException |
||||
| 300 | */ |
||||
| 301 | 3 | public function findWorkflow(Workflow $workflow) |
|||
| 302 | { |
||||
| 303 | 3 | $workflows = $this->_dispatch(new Command\ListWorkflowsCommand()); |
|||
| 304 | return $workflows->filter(function (Workflow $listedWorkflow) use ($workflow) { |
||||
| 305 | 3 | return $listedWorkflow->getName() === $workflow->getName() |
|||
| 306 | 3 | && $listedWorkflow->getGroup() === $workflow->getGroup(); |
|||
| 307 | 3 | })->first(); |
|||
| 308 | } |
||||
| 309 | |||||
| 310 | /** |
||||
| 311 | * @param Workflow $workflow |
||||
| 312 | * |
||||
| 313 | * @throws Exception\ClientException |
||||
| 314 | */ |
||||
| 315 | 6 | public function checkAndCreateTubes(Workflow $workflow) |
|||
| 316 | { |
||||
| 317 | 6 | $tubes = []; |
|||
| 318 | /** @var Job $job */ |
||||
| 319 | 6 | foreach ($workflow->getJobs() as $job) { |
|||
| 320 | /** @var Task $task */ |
||||
| 321 | 6 | foreach ($job->getTasks() as $task) { |
|||
| 322 | 6 | $tubes = array_merge($tubes, [$task->getQueue()]); |
|||
| 323 | } |
||||
| 324 | } |
||||
| 325 | 6 | foreach ($tubes as $tube) { |
|||
| 326 | 6 | if (!$this->getCurrentClass()->tubeExists($tube)) { |
|||
| 327 | 1 | $this->getCurrentClass()->createTube(new Tube($tube, 1)); |
|||
| 328 | }; |
||||
| 329 | } |
||||
| 330 | } |
||||
| 331 | |||||
| 332 | /** |
||||
| 333 | * {@inheritdoc} |
||||
| 334 | */ |
||||
| 335 | 1 | public function update(Workflow $workflow): Workflow |
|||
| 336 | { |
||||
| 337 | 1 | $workflow = $this->_dispatch(new Command\UpdateCommand($workflow)); |
|||
| 338 | 1 | return $workflow; |
|||
| 339 | } |
||||
| 340 | |||||
| 341 | /** |
||||
| 342 | * {@inheritdoc} |
||||
| 343 | */ |
||||
| 344 | 1 | public function updateSchedule(Schedule $schedule): Schedule |
|||
| 345 | { |
||||
| 346 | 1 | $schedule = $this->_dispatch(new Command\UpdateScheduleCommand($schedule)); |
|||
| 347 | 1 | return $schedule; |
|||
| 348 | } |
||||
| 349 | |||||
| 350 | /** |
||||
| 351 | * {@inheritdoc} |
||||
| 352 | */ |
||||
| 353 | 1 | public function createSchedule(Schedule $schedule) |
|||
| 354 | { |
||||
| 355 | 1 | $workflowSchedule = $this->_dispatch( |
|||
| 356 | 1 | new Command\CreateScheduleCommand($schedule) |
|||
| 357 | ); |
||||
| 358 | 1 | return $workflowSchedule; |
|||
| 359 | } |
||||
| 360 | |||||
| 361 | /** |
||||
| 362 | * {@inheritdoc} |
||||
| 363 | */ |
||||
| 364 | 5 | public function createTask(string $name, string $group, string $path, $queue = 'default', $useAgent = false, $user = null, $host = null, $comment = null): Workflow |
|||
| 365 | { |
||||
| 366 | 5 | $task = new Task($path, $queue, $useAgent, $user, $host); |
|||
| 367 | 5 | $job = new Job(new ArrayCollection([$task])); |
|||
| 368 | 5 | $workflow = new Workflow($name, $group, new ArrayCollection([$job]), $comment); |
|||
| 369 | |||||
| 370 | 5 | return $this->getCurrentClass()->create($workflow, true); |
|||
| 371 | } |
||||
| 372 | |||||
| 373 | /** |
||||
| 374 | * {@inheritDoc} |
||||
| 375 | */ |
||||
| 376 | 1 | public function createTube(Tube $tube): Tube |
|||
| 377 | { |
||||
| 378 | 1 | return $this->_dispatch(new Command\CreateTubeCommand($tube)); |
|||
| 379 | } |
||||
| 380 | |||||
| 381 | /** |
||||
| 382 | * {@inheritdoc} |
||||
| 383 | */ |
||||
| 384 | 1 | public function updateTube(Tube $tube): Tube |
|||
| 385 | { |
||||
| 386 | 1 | return $this->_dispatch(new Command\UpdateTubeCommand($tube)); |
|||
| 387 | } |
||||
| 388 | |||||
| 389 | /** |
||||
| 390 | * {@inheritdoc} |
||||
| 391 | */ |
||||
| 392 | 1 | public function cancel(WorkflowInstance $workflowInstance) |
|||
| 393 | { |
||||
| 394 | 1 | return $this->_dispatch(new Command\CancelCommand($workflowInstance)); |
|||
| 395 | } |
||||
| 396 | |||||
| 397 | /** |
||||
| 398 | * {@inheritdoc} |
||||
| 399 | */ |
||||
| 400 | 2 | public function kill(WorkflowInstance $workflowInstance, TaskInstance $taskInstance) |
|||
| 401 | { |
||||
| 402 | 2 | return $this->_dispatch(new Command\KillCommand($workflowInstance, $taskInstance)); |
|||
| 403 | } |
||||
| 404 | } |
||||
| 405 |
The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g.
excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths