Completed
Push — master ( ec1ffe...1f9c3e )
by Ryota
02:11
created

Worker/Worker.php (2 issues)

Labels
Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
namespace Tavii\SQSJobQueue\Worker;
3
4
use Tavii\SQSJobQueue\Exception\RuntimeException;
5
use Tavii\SQSJobQueue\Queue\QueueInterface;
6
use Tavii\SQSJobQueue\Storage\EntityInterface;
7
use Tavii\SQSJobQueue\Storage\StorageInterface;
8
9
class Worker implements WorkerInterface
10
{
11
    /**
12
     * @var QueueInterface
13
     */
14
    private $queue;
15
16
    /**
17
     * @var StorageInterface
18
     */
19
    private $storage;
20
21
    /**
22
     * Worker constructor.
23
     * @param QueueInterface $queue
24
     * @param StorageInterface $storage
25
     */
26
    public function __construct(QueueInterface $queue, StorageInterface $storage)
27
    {
28
        $this->queue = $queue;
29
        $this->storage = $storage;
30
    }
31
32
    /**
33
     * {@inheritdoc}
34
     */
35
    public function run($name)
36
    {
37
        $message = $this->queue->receive($name);
38
        if (is_null($message)) {
39
            return false;
40
        }
41
42
        if ($message->getJob()->execute()) {
43
            $this->queue->delete($message);
44
            return true;
45
        }
46
        return false;
47
    }
48
49
    /**
50
     * {@inheritdoc}
51
     */
52
    public function start($name, $sleep = 5)
53
    {
54
        $pid = pcntl_fork();
55
        if ($pid === -1) {
56
            throw new RuntimeException('Could not fork the process');
57
        } elseif ($pid > 0) {
58
            if (function_exists('gethostname')) {
59
                $server = gethostname();
60
            } else {
61
                $server = php_uname('n');
62
            }
63
            $this->storage->set($name, $server, $pid);
0 ignored issues
show
The call to set() misses a required argument $prefix.

This check looks for function calls that miss required arguments.

Loading history...
64
        } else {
65
            while(true) {
66
                $this->run($name);
67
                sleep($sleep);
68
            }
69
        }
70
    }
71
72
    /**
73
     * {@inheritdoc}
74
     */
75
    public function stop($name, $pid = null, $force = false)
76
    {
77
        if (function_exists('gethostname')) {
78
            $server = gethostname();
79
        } else {
80
            $server = php_uname('n');
81
        }
82
        $processes = $this->storage->find($name, $server, $pid);
83
        foreach ($processes as $process) {
84
85
            if (!$process instanceof EntityInterface) {
86
                throw new RuntimeException('no support data type.');
87
            }
88
89
            if (posix_kill($process->getProcId(), 3)) {
90
                $this->storage->remove($process->getQueue(), $process->getServer(), $process->getProcId());
0 ignored issues
show
The call to remove() misses a required argument $prefix.

This check looks for function calls that miss required arguments.

Loading history...
91
            }
92
        }
93
94
        if ($force) {
95
            $this->storage->removeForce($name, $server);
96
        }
97
98
    }
99
}