Worker   A
last analyzed

Complexity

Total Complexity 15

Size/Duplication

Total Lines 91
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 91
rs 10
wmc 15
lcom 1
cbo 6

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A run() 0 13 3
B start() 0 19 5
B stop() 0 24 6
1
<?php
2
namespace Tavii\SQSJobQueue\Worker;
3
4
use Tavii\SQSJobQueue\Exception\RuntimeException;
5
use Tavii\SQSJobQueue\Queue\QueueName;
6
use Tavii\SQSJobQueue\Queue\QueueInterface;
7
use Tavii\SQSJobQueue\Storage\EntityInterface;
8
use Tavii\SQSJobQueue\Storage\StorageInterface;
9
10
class Worker implements WorkerInterface
11
{
12
    /**
13
     * @var QueueInterface
14
     */
15
    private $queue;
16
17
    /**
18
     * @var StorageInterface
19
     */
20
    private $storage;
21
22
    /**
23
     * Worker constructor.
24
     * @param QueueInterface $queue
25
     * @param StorageInterface $storage
26
     */
27
    public function __construct(QueueInterface $queue, StorageInterface $storage)
28
    {
29
        $this->queue = $queue;
30
        $this->storage = $storage;
31
    }
32
33
    /**
34
     * {@inheritdoc}
35
     */
36
    public function run(QueueName $queueName)
37
    {
38
        $message = $this->queue->receive($queueName);
39
        if (is_null($message)) {
40
            return false;
41
        }
42
43
        if ($message->getJob()->execute()) {
44
            $this->queue->delete($message);
45
            return true;
46
        }
47
        return false;
48
    }
49
50
    /**
51
     * {@inheritdoc}
52
     */
53
    public function start(QueueName $queueName, $sleep = 5, $prefix = null)
54
    {
55
        $pid = pcntl_fork();
56
        if ($pid === -1) {
57
            throw new RuntimeException('Could not fork the process');
58
        } elseif ($pid > 0) {
59
            if (function_exists('gethostname')) {
60
                $server = gethostname();
61
            } else {
62
                $server = php_uname('n');
63
            }
64
            $this->storage->set($queueName, $server, $pid);
65
        } else {
66
            while(true) {
67
                $this->run($queueName);
68
                sleep($sleep);
69
            }
70
        }
71
    }
72
73
    /**
74
     * {@inheritdoc}
75
     */
76
    public function stop(QueueName $queueName, $pid = null, $force = false)
77
    {
78
        if (function_exists('gethostname')) {
79
            $server = gethostname();
80
        } else {
81
            $server = php_uname('n');
82
        }
83
        $processes = $this->storage->find($queueName, $server, $pid);
84
        foreach ($processes as $process) {
0 ignored issues
show
Bug introduced by
The expression $processes of type object<Tavii\SQSJobQueue\Storage\EntityInterface> is not traversable.
Loading history...
85
86
            if (!$process instanceof EntityInterface) {
87
                throw new RuntimeException('no support data type.');
88
            }
89
90
            if (posix_kill($process->getProcId(), 3)) {
91
                $this->storage->remove($process->getQueueName(), $process->getServer(), $process->getProcId());
92
            }
93
        }
94
95
        if ($force) {
96
            $this->storage->removeForce($queueName, $server);
97
        }
98
99
    }
100
}