Worker   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 97
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Importance

Changes 0
Metric Value
wmc 13
lcom 1
cbo 4
dl 0
loc 97
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A setTopics() 0 4 1
B run() 0 30 6
B perform() 0 27 4
A registerSignalHandlers() 0 5 1
A shutdown() 0 5 1
1
<?php
2
3
declare(ticks = 1);
4
5
namespace DelayQueue\Process;
6
7
use DelayQueue\Container\ContainerAwareTrait;
8
use Exception;
9
use DelayQueue\Handler\AbstractHandler;
10
11
class Worker
12
{
13
    use ContainerAwareTrait;
14
15
    /**
16
     * @var array 轮询队列
17
     */
18
    protected $topics;
19
20
    /**
21
     * @var bool 是否在下次循环中退出
22
     */
23
    protected $shutdown = false;
24
25
    public function setTopics(array $topics)
26
    {
27
        $this->topics = $topics;
28
    }
29
30
31
    public function run()
32
    {
33
        $this->registerSignalHandlers();
34
        while(true) {
35
            if ($this->shutdown) {
36
                break;
37
            }
38
            $data = null;
39
            try {
40
                $data = $this->delayQueue->pop($this->topics);
41
            } catch (Exception $exception) {
42
                $this->logger->warning(sprintf('polling queue exception: %s', $exception->getMessage()));
43
                continue;
44
            }
45
46
            if (!$data) {
47
                // 空轮询
48
                continue;
49
            }
50
51
            try {
52
                $this->delayQueue->validateClassName($data['className']);
53
            } catch(Exception $exception) {
54
                $this->logger->emergency($exception->getMessage());
55
                continue;
56
            }
57
58
            $this->perform($data);
59
        }
60
    }
61
62
    protected function perform(array $data)
63
    {
64
        $pid = pcntl_fork();
65
        if ($pid< 0) {
66
            $this->logger->emergency('Unable to fork child worker', ['job' => $data]);
67
            return;
68
        }
69
        if ($pid === 0) {
70
            // 子进程
71
            /** @var AbstractHandler $class */
72
            $class = new $data['className']($this->container);
73
            $class->setId($data['id']);
74
            $class->setBody($data['body']);
75
            $this->logger->info('Start processing Job', ['data' => $data]);
76
            $class->run();
77
            $this->logger->info('Job finished', ['data' => $data]);
78
            exit(0);
0 ignored issues
show
Coding Style Compatibility introduced by
The method perform() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
79
        }
80
        // 父进程
81
        $status = null;
82
        pcntl_wait($status);
83
        $exitStatus = pcntl_wexitstatus($status);
84
        if ($exitStatus !== 0) {
85
            // 执行失败
86
            $this->logger->warning('Job exited with exit code ' . $exitStatus);
87
        }
88
    }
89
90
    /**
91
     * 注册信号处理
92
     */
93
    protected function registerSignalHandlers()
94
    {
95
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
96
        pcntl_signal(SIGINT , [$this, 'shutdown']);
97
    }
98
99
    /**
100
     * 无Job处理时退出
101
     */
102
    public function shutdown()
103
    {
104
        $this->logger->notice('Shutting down');
105
        $this->shutdown = true;
106
    }
107
}