Completed
Pull Request — master (#19)
by Akihito
01:39
created

Worker::setResultQueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
namespace Ackintosh\Snidel;
3
4
use Ackintosh\Snidel\Result\QueueInterface as ResultQueueInterface;
5
use Ackintosh\Snidel\Result\Result;
6
use Ackintosh\Snidel\Result\Formatter as ResultFormatter;
7
use Ackintosh\Snidel\Task\Formatter as TaskFormatter;
8
use Ackintosh\Snidel\Task\QueueInterface as TaskQueueInterface;
9
use Bernard\Consumer;
10
use Bernard\Message\PlainMessage;
11
use Bernard\Producer;
12
use Bernard\QueueFactory\PersistentFactory;
13
use Bernard\Router\SimpleRouter;
14
use Bernard\Serializer;
15
use Symfony\Component\EventDispatcher\EventDispatcher;
16
17
class Worker
18
{
19
    /** @var \Ackintosh\Snidel\Task\Task */
20
    private $latestTask;
21
22
    /** @var \Ackintosh\Snidel\Fork\Process */
23
    private $process;
24
25
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
26
    private $taskQueue;
27
28
    /** @var \Ackintosh\Snidel\Result\QueueInterface */
29
    private $resultQueue;
30
31
    /** @var \Ackintosh\Snidel\Pcntl */
32
    private $pcntl;
33
34
    /** @var bool */
35
    private $isInProgress = false;
36
37
    private $factory;
38
    private $consumer;
39
    private $producer;
40
41
    /**
42
     * @param \Ackintosh\Snidel\Fork\Process $process
43
     * @param \Bernard\Driver $driver
44
     */
45
    public function __construct($process, $driver)
46
    {
47
        $this->pcntl = new Pcntl();
48
        $this->process = $process;
49
50
        $this->factory = new PersistentFactory($driver, new Serializer());
51
        $router = new SimpleRouter();
52
        $router->add('Task', $this);
53
        $this->consumer = new Consumer($router, new EventDispatcher());
54
        $this->producer = new Producer($this->factory, new EventDispatcher());
55
    }
56
57
    /**
58
     * @param   \Ackintosh\Snidel\Task\QueueInterface
59
     * @return  void
60
     */
61
    public function setTaskQueue(TaskQueueInterface $queue)
62
    {
63
        $this->taskQueue = $queue;
64
    }
65
66
    /**
67
     * @param   \Ackintosh\Snidel\Result\QueueInterface
68
     * @return  void
69
     */
70
    public function setResultQueue(ResultQueueInterface $queue)
71
    {
72
        $this->resultQueue = $queue;
73
    }
74
75
    /**
76
     * @return  int
77
     */
78
    public function getPid()
79
    {
80
        return $this->process->getPid();
81
    }
82
83
    /**
84
     * @return  void
85
     * @throws  \RuntimeException
86
     */
87
    public function run()
88
    {
89
        $this->consumer->consume($this->factory->create('task'));
90
    }
91
92
    public function task($message)
93
    {
94
        $this->isInProgress = true;
95
        $this->latestTask = $task = TaskFormatter::unserialize($message['task']);
0 ignored issues
show
Documentation Bug introduced by
$task = \Ackintosh\Snide...alize($message['task']) is of type object<Ackintosh\Snidel\Task\TaskInterface>, but the property $latestTask was declared to be of type object<Ackintosh\Snidel\Task\Task>. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
96
        $result = $task->execute();
97
        $result->setProcess($this->process);
98
99
        $this->producer->produce(
100
            new PlainMessage(
101
                'Result',
102
                [
103
                    'result' => ResultFormatter::serialize($result),
104
                ]
105
            )
106
        );
107
        $this->isInProgress = false;
108
    }
109
110
    /**
111
     * @return  void
112
     * @throws  \RuntimeException
113
     */
114
    public function error()
115
    {
116
        $result = new Result();
117
        $result->setError(error_get_last());
118
        $result->setTask($this->latestTask);
119
        $result->setProcess($this->process);
120
121
        try {
122
            $this->producer->produce(
123
                new PlainMessage(
124
                    'Result',
125
                    [
126
                        'result' => ResultFormatter::serialize($result),
127
                    ]
128
                )
129
            );
130
        } catch (\RuntimeException $e) {
131
            throw $e;
132
        }
133
    }
134
135
    /**
136
     * @param   int     $sig
137
     * @return  void
138
     */
139
    public function terminate($sig)
140
    {
141
        posix_kill($this->process->getPid(), $sig);
142
        $status = null;
143
        $this->pcntl->waitpid($this->process->getPid(), $status);
144
    }
145
146
    /**
147
     * @return bool
148
     */
149
    public function hasTask()
150
    {
151
        return $this->latestTask !== null;
152
    }
153
154
    /**
155
     * @return bool
156
     */
157
    public function isInProgress()
158
    {
159
        return $this->isInProgress;
160
    }
161
}
162