Completed
Pull Request — master (#19)
by Akihito
01:28
created

Worker::run()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 6
Bugs 0 Features 0
Metric Value
c 6
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
namespace Ackintosh\Snidel;
3
4
use Ackintosh\Snidel\Result\QueueInterface as ResultQueueInterface;
5
use Ackintosh\Snidel\Result\Result;
6
use Ackintosh\Snidel\Result\Formatter as ResultFormatter;
7
use Ackintosh\Snidel\Task\Formatter as TaskFormatter;
8
use Ackintosh\Snidel\Task\QueueInterface as TaskQueueInterface;
9
use Bernard\Consumer;
10
use Bernard\Message\PlainMessage;
11
use Bernard\Producer;
12
use Bernard\QueueFactory\PersistentFactory;
13
use Bernard\Router\SimpleRouter;
14
use Bernard\Serializer;
15
use Symfony\Component\EventDispatcher\EventDispatcher;
16
17
class Worker
18
{
19
    /** @var \Ackintosh\Snidel\Task\Task */
20
    private $latestTask;
21
22
    /** @var \Ackintosh\Snidel\Fork\Process */
23
    private $process;
24
25
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
26
    private $taskQueue;
27
28
    /** @var \Ackintosh\Snidel\Result\QueueInterface */
29
    private $resultQueue;
30
31
    /** @var \Ackintosh\Snidel\Pcntl */
32
    private $pcntl;
33
34
    /** @var bool */
35
    private $isInProgress = false;
36
37
    private $factory;
38
    private $consumer;
39
    private $producer;
40
41
    /**
42
     * @param \Ackintosh\Snidel\Fork\Process $process
43
     * @param \Bernard\Driver $driver
44
     */
45
    public function __construct($process, $driver)
46
    {
47
        $this->pcntl = new Pcntl();
48
        $this->process = $process;
49
50
        $this->factory = new PersistentFactory($driver, new Serializer());
51
        $router = new SimpleRouter();
52
        $router->add('Task', $this);
53
        $this->consumer = new Consumer($router, new EventDispatcher());
54
        $this->producer = new Producer($this->factory, new EventDispatcher());
55
    }
56
57
    /**
58
     * @param   \Ackintosh\Snidel\Task\QueueInterface
59
     * @return  void
60
     */
61
    public function setTaskQueue(TaskQueueInterface $queue)
62
    {
63
        $this->taskQueue = $queue;
64
    }
65
66
    /**
67
     * @param   \Ackintosh\Snidel\Result\QueueInterface
68
     * @return  void
69
     */
70
    public function setResultQueue(ResultQueueInterface $queue)
71
    {
72
        $this->resultQueue = $queue;
73
    }
74
75
    /**
76
     * @return  int
77
     */
78
    public function getPid()
79
    {
80
        return $this->process->getPid();
81
    }
82
83
    /**
84
     * @return  void
85
     * @throws  \RuntimeException
86
     * @codeCoverageIgnore covered by SnidelTest via worker process
87
     */
88
    public function run()
89
    {
90
        $this->consumer->consume($this->factory->create('task'));
91
    }
92
93
    /**
94
     * @param PlainMessage $message
95
     * @return void
96
     * @codeCoverageIgnore covered by SnidelTest via worker process
97
     */
98
    public function task($message)
99
    {
100
        $this->isInProgress = true;
101
        $this->latestTask = $task = TaskFormatter::unserialize($message['task']);
0 ignored issues
show
Documentation Bug introduced by
$task = \Ackintosh\Snide...alize($message['task']) is of type object<Ackintosh\Snidel\Task\TaskInterface>, but the property $latestTask was declared to be of type object<Ackintosh\Snidel\Task\Task>. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
102
        $result = $task->execute();
103
        $result->setProcess($this->process);
104
105
        $this->producer->produce(
106
            new PlainMessage(
107
                'Result',
108
                [
109
                    'result' => ResultFormatter::serialize($result),
110
                ]
111
            )
112
        );
113
        $this->isInProgress = false;
114
    }
115
116
    /**
117
     * @return  void
118
     * @throws  \RuntimeException
119
     * @codeCoverageIgnore covered by SnidelTest via worker process
120
     */
121
    public function error()
122
    {
123
        $result = new Result();
124
        $result->setError(error_get_last());
125
        $result->setTask($this->latestTask);
126
        $result->setProcess($this->process);
127
128
        try {
129
            $this->producer->produce(
130
                new PlainMessage(
131
                    'Result',
132
                    [
133
                        'result' => ResultFormatter::serialize($result),
134
                    ]
135
                )
136
            );
137
        } catch (\RuntimeException $e) {
138
            throw $e;
139
        }
140
    }
141
142
    /**
143
     * @param   int     $sig
144
     * @return  void
145
     * @codeCoverageIgnore covered by SnidelTest via worker process
146
     */
147
    public function terminate($sig)
148
    {
149
        posix_kill($this->process->getPid(), $sig);
150
        $status = null;
151
        $this->pcntl->waitpid($this->process->getPid(), $status);
152
    }
153
154
    /**
155
     * @return bool
156
     */
157
    public function hasTask()
158
    {
159
        return $this->latestTask !== null;
160
    }
161
162
    /**
163
     * @return bool
164
     */
165
    public function isInProgress()
166
    {
167
        return $this->isInProgress;
168
    }
169
}
170