Completed
Push — master ( 8eb315...5aaba6 )
by Changwan
04:29
created

Worker::stop()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 4
c 0
b 0
f 0
nc 2
nop 0
dl 0
loc 7
ccs 0
cts 7
cp 0
crap 6
rs 9.4285
1
<?php
2
namespace Wandu\Q;
3
4
use Psr\Container\ContainerInterface;
5
use Psr\Log\LoggerInterface;
6
use Wandu\Q\Exception\WorkerStopException;
7
8
class Worker
9
{
10
    /** @var \Wandu\Q\Queue */
11
    protected $queue;
12
    
13
    /** @var \Psr\Container\ContainerInterface */
14
    protected $container;
15
    
16
    /** @var \Psr\Log\LoggerInterface */
17
    protected $logger;
18
    
19
    /** @var bool */
20
    protected $running = false;
21
    
22
    public function __construct(Queue $queue, ContainerInterface $container, LoggerInterface $logger = null)
23
    {
24
        $this->queue = $queue;
25
        $this->container = $container;
26
        $this->logger = $logger;
27
    }
28
29
    /**
30
     * @return void 
31
     */
32
    public function flush()
33
    {
34
        $this->queue->flush();
35
    }
36
    
37
    /**
38
     * @return void 
39
     */
40
    public function stop()
41
    {
42
        if ($this->logger) {
43
            $this->logger->info("stop by stop method.");
44
        }
45
        $this->running = false;
46
    }
47
48
    /**
49
     * @param string $class
50
     * @param string $method
51
     * @param array $arguments
52
     */
53
    public function work($class, $method, $arguments = [])
54
    {
55
        $this->queue->send([
56
            'class' => $class,
57
            'method' => $method,
58
            'arguments' => $arguments,
59
        ]);
60
    }
61
    
62
    /**
63
     * @param int $tick
64
     */
65
    public function listen($tick = 200000)
66
    {
67
        $this->running = true;
68
        $signalEnabled = false;
69
        if (function_exists('pcntl_signal') && function_exists('pcntl_signal_dispatch')) {
70
            pcntl_signal(SIGINT, [$this, 'stop']);
71
            pcntl_signal(SIGTERM, [$this, 'stop']);
72
            pcntl_signal(SIGHUP, [$this, 'stop']);
73
            $signalEnabled = true;
74
        } elseif ($this->logger) {
75
            $this->logger->info('if use pcntl_*, work more safety.');
76
        }
77
        try {
78
            while ($this->running) {
79
                if ($job = $this->queue->receive()) {
80
                    $result = $job->read();
81
                    call_user_func_array([
82
                        $this->container->get($result['class']),
83
                        $result['method']
84
                    ], $result['arguments'] ?? []);
85
                    if ($this->logger) {
86
                        $this->logger->info(sprintf("execute %s@%s", $result['method'], $result['class']));
87
                    }
88
                    $job->delete();
89
                }
90
                if ($signalEnabled) {
91
                    pcntl_signal_dispatch();
92
                }
93
                usleep($tick);
94
            }
95
        } catch (WorkerStopException $e) {
96
            if ($this->logger) {
97
                $this->logger->info("stop by WorkerStopException.");
98
            }
99
        }
100
    }
101
}
102