Completed
Pull Request — master (#19)
by Akihito
04:09
created

Worker::task()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 15
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 15
rs 9.4285
cc 1
eloc 8
nc 1
nop 1
1
<?php
2
namespace Ackintosh\Snidel;
3
4
use Ackintosh\Snidel\Result\QueueInterface as ResultQueueInterface;
5
use Ackintosh\Snidel\Result\Result;
6
use Ackintosh\Snidel\Result\Formatter as ResultFormatter;
7
use Ackintosh\Snidel\Task\Formatter as TaskFormatter;
8
use Ackintosh\Snidel\Task\QueueInterface as TaskQueueInterface;
9
use Bernard\Consumer;
10
use Bernard\Driver\FlatFileDriver;
11
use Bernard\Message\PlainMessage;
12
use Bernard\Producer;
13
use Bernard\QueueFactory\PersistentFactory;
14
use Bernard\Router\SimpleRouter;
15
use Bernard\Serializer;
16
use Symfony\Component\EventDispatcher\EventDispatcher;
17
18
class Worker
19
{
20
    /** @var \Ackintosh\Snidel\Task\Task */
21
    private $latestTask;
22
23
    /** @var \Ackintosh\Snidel\Fork\Process */
24
    private $process;
25
26
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
27
    private $taskQueue;
28
29
    /** @var \Ackintosh\Snidel\Result\QueueInterface */
30
    private $resultQueue;
31
32
    /** @var \Ackintosh\Snidel\Pcntl */
33
    private $pcntl;
34
35
    /** @var bool */
36
    private $done = false;
37
38
    private $factory;
39
    private $consumer;
40
    private $producer;
41
42
    /**
43
     * @param   \Ackintosh\Snidel\Fork\Process $process
44
     */
45
    public function __construct($process)
46
    {
47
        $this->pcntl = new Pcntl();
48
        $this->process = $process;
49
50
        $driver = new FlatFileDriver('/tmp/hoge');
51
        $this->factory = new PersistentFactory($driver, new Serializer());
52
        $router = new SimpleRouter();
53
        $router->add('Task', $this);
54
        $this->consumer = new Consumer($router, new EventDispatcher());
55
        $this->producer = new Producer($this->factory, new EventDispatcher());
56
    }
57
58
    /**
59
     * @param   \Ackintosh\Snidel\Task\QueueInterface
60
     * @return  void
61
     */
62
    public function setTaskQueue(TaskQueueInterface $queue)
63
    {
64
        $this->taskQueue = $queue;
65
    }
66
67
    /**
68
     * @param   \Ackintosh\Snidel\Result\QueueInterface
69
     * @return  void
70
     */
71
    public function setResultQueue(ResultQueueInterface $queue)
72
    {
73
        $this->resultQueue = $queue;
74
    }
75
76
    /**
77
     * @return  int
78
     */
79
    public function getPid()
80
    {
81
        return $this->process->getPid();
82
    }
83
84
    /**
85
     * @return  void
86
     * @throws  \RuntimeException
87
     */
88
    public function run()
89
    {
90
        $this->consumer->consume($this->factory->create('task'));
91
        $this->done = true;
92
    }
93
94
    public function task($message)
95
    {
96
        $this->latestTask = $task = TaskFormatter::unserialize($message['task']);
0 ignored issues
show
Documentation Bug introduced by
$task = \Ackintosh\Snide...alize($message['task']) is of type object<Ackintosh\Snidel\Task\TaskInterface>, but the property $latestTask was declared to be of type object<Ackintosh\Snidel\Task\Task>. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
97
        $result = $task->execute();
98
        $result->setProcess($this->process);
99
100
        $this->producer->produce(
101
            new PlainMessage(
102
                'Result',
103
                [
104
                    'result' => ResultFormatter::serialize($result),
105
                ]
106
            )
107
        );
108
    }
109
110
    /**
111
     * @return  void
112
     * @throws  \RuntimeException
113
     */
114
    public function error()
115
    {
116
        $result = new Result();
117
        $result->setError(error_get_last());
118
        $result->setTask($this->latestTask);
119
        $result->setProcess($this->process);
120
121
        try {
122
            $this->producer->produce(
123
                new PlainMessage(
124
                    'Result',
125
                    [
126
                        'result' => ResultFormatter::serialize($result),
127
                    ]
128
                )
129
            );
130
        } catch (\RuntimeException $e) {
131
            throw $e;
132
        }
133
    }
134
135
    /**
136
     * @param   int     $sig
137
     * @return  void
138
     */
139
    public function terminate($sig)
140
    {
141
        posix_kill($this->process->getPid(), $sig);
142
        $status = null;
143
        $this->pcntl->waitpid($this->process->getPid(), $status);
144
    }
145
146
    /**
147
     * @return bool
148
     */
149
    public function hasTask()
150
    {
151
        return $this->latestTask !== null;
152
    }
153
154
    /**
155
     * @return bool
156
     */
157
    public function done()
158
    {
159
        return $this->done;
160
    }
161
}
162