Completed
Pull Request — master (#19)
by Akihito
04:19
created

Worker::task()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 10
rs 9.4285
cc 1
eloc 7
nc 1
nop 1
1
<?php
2
namespace Ackintosh\Snidel;
3
4
use Ackintosh\Snidel\Result\Result;
5
use Ackintosh\Snidel\Task\Task;
6
use Ackintosh\Snidel\Traits\Queueing;
7
use Bernard\Router\SimpleRouter;
8
9
class Worker
10
{
11
    use Queueing;
12
13
    /** @var \Ackintosh\Snidel\Task\Task */
14
    private $latestTask;
15
16
    /** @var \Ackintosh\Snidel\Fork\Process */
17
    private $process;
18
19
    /** @var \Ackintosh\Snidel\Pcntl */
20
    private $pcntl;
21
22
    /** @var bool */
23
    private $isInProgress = false;
24
25
    /** @var \Bernard\QueueFactory\PersistentFactory */
26
    private $factory;
27
28
    /** @var \Bernard\Consumer */
29
    private $consumer;
30
31
    /** @var \Bernard\Producer */
32
    private $producer;
33
34
    /**
35
     * @param \Ackintosh\Snidel\Fork\Process $process
36
     * @param \Bernard\Driver $driver
37
     */
38
    public function __construct($process, $driver)
39
    {
40
        $this->pcntl = new Pcntl();
41
        $this->process = $process;
42
43
        $this->factory = $this->createFactory($driver);
44
        $router = new SimpleRouter();
45
        $router->add('Task', $this);
46
        $this->consumer = $this->createConsumer($router);
47
        $this->producer = $this->createProducer($this->factory);
48
    }
49
50
    /**
51
     * @return  int
52
     */
53
    public function getPid()
54
    {
55
        return $this->process->getPid();
56
    }
57
58
    /**
59
     * @return  void
60
     * @throws  \RuntimeException
61
     * @codeCoverageIgnore covered by SnidelTest via worker process
62
     */
63
    public function run()
64
    {
65
        $this->consumer->consume($this->factory->create('task'));
66
    }
67
68
    /**
69
     * @param Task $task
70
     * @return void
71
     * @codeCoverageIgnore covered by SnidelTest via worker process
72
     */
73
    public function task(Task $task)
74
    {
75
        $this->isInProgress = true;
76
        $this->latestTask = $task;
77
        $result = $task->execute();
78
        $result->setProcess($this->process);
79
80
        $this->producer->produce($result);
81
        $this->isInProgress = false;
82
    }
83
84
    /**
85
     * @return  void
86
     * @throws  \RuntimeException
87
     * @codeCoverageIgnore covered by SnidelTest via worker process
88
     */
89
    public function error()
90
    {
91
        $result = new Result();
92
        $result->setError(error_get_last());
93
        $result->setTask($this->latestTask);
94
        $result->setProcess($this->process);
95
96
        try {
97
            $this->producer->produce($result);
98
        } catch (\RuntimeException $e) {
99
            throw $e;
100
        }
101
    }
102
103
    /**
104
     * @param   int     $sig
105
     * @return  void
106
     * @codeCoverageIgnore covered by SnidelTest via worker process
107
     */
108
    public function terminate($sig)
109
    {
110
        posix_kill($this->process->getPid(), $sig);
111
        $status = null;
112
        $this->pcntl->waitpid($this->process->getPid(), $status);
113
    }
114
115
    /**
116
     * @return bool
117
     */
118
    public function hasTask()
119
    {
120
        return $this->latestTask !== null;
121
    }
122
123
    /**
124
     * @return bool
125
     */
126
    public function isInProgress()
127
    {
128
        return $this->isInProgress;
129
    }
130
}
131