Completed
Push — master ( 664743...75fb6c )
by Anton
02:24 queued 01:06
created

Worker::terminate()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
/**
3
 * High-performance PHP process supervisor and load balancer written in Go
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\RoadRunner;
9
10
use Spiral\Goridge\Exceptions\GoridgeException;
11
use Spiral\Goridge\RelayInterface as Relay;
12
use Spiral\RoadRunner\Exceptions\RoadRunnerException;
13
14
/**
15
 * Accepts connection from RoadRunner server over given Goridge relay.
16
 *
17
 * Example:
18
 *
19
 * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT));
20
 * while ($task = $worker->receive($context)) {
21
 *      $worker->send("DONE", json_encode($context));
22
 * }
23
 */
24
class Worker
25
{
26
    // Send as response context to request worker termination
27
    const TERMINATE = '{"stop": true}';
28
29
    /** @var Relay */
30
    private $relay;
31
32
    /**
33
     * @param Relay $relay
34
     */
35
    public function __construct(Relay $relay)
36
    {
37
        $this->relay = $relay;
38
    }
39
40
    /**
41
     * Receive packet of information to process, returns null when process must be stopped. Might
42
     * return Error to wrap error message from server.
43
     *
44
     * @param mixed $header
45
     *
46
     * @return \Error|null|string
47
     * @throws GoridgeException
48
     */
49
    public function receive(&$header)
50
    {
51
        $body = $this->relay->receiveSync($flags);
52
53
        if ($flags & Relay::PAYLOAD_CONTROL) {
54
            if ($this->handleControl($body, $header, $flags)) {
55
                // wait for the next command
56
                return $this->receive($header);
57
            }
58
59
            // Expect process termination
60
            return null;
61
        }
62
63
        if ($flags & Relay::PAYLOAD_ERROR) {
64
            return new \Error($body);
0 ignored issues
show
Unused Code introduced by
The call to Error::__construct() has too many arguments starting with $body.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
65
        }
66
67
        return $body;
68
    }
69
70
    /**
71
     * Respond to the server with result of task execution and execution context.
72
     *
73
     * Example:
74
     * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders()));
75
     *
76
     * @param string|null $payload
77
     * @param string|null $header
78
     */
79
    public function send(string $payload = null, string $header = null)
80
    {
81
        if (is_null($header)) {
82
            $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE);
83
        } else {
84
            $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
85
        }
86
87
        $this->relay->send($payload, Relay::PAYLOAD_RAW);
88
    }
89
90
    /**
91
     * Respond to the server with an error. Error must be treated as TaskError and might not cause
92
     * worker destruction.
93
     *
94
     * Example:
95
     *
96
     * $worker->error("invalid payload");
97
     *
98
     * @param string $message
99
     */
100
    public function error(string $message)
101
    {
102
        $this->relay->send(
103
            $message,
104
            Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR
105
        );
106
    }
107
108
    /**
109
     * Terminate the process. Server must automatically pass task to the next available process.
110
     * Worker will receive TerminateCommand context after calling this method.
111
     *
112
     * @throws GoridgeException
113
     */
114
    public function stop()
115
    {
116
        $this->send(null, self::TERMINATE);
117
    }
118
119
    /**
120
     * Handles incoming control command payload and executes it if required.
121
     *
122
     * @param string $body
123
     * @param mixed  $header Exported context (if any).
124
     * @param int    $flags
125
     *
126
     * @returns bool True when continue processing.
127
     *
128
     * @throws RoadRunnerException
129
     */
130
    private function handleControl(string $body = null, &$header = null, int $flags): bool
131
    {
132
        $header = $body;
133
        if (is_null($body) || $flags & Relay::PAYLOAD_RAW) {
134
            // empty or raw prefix
135
            return true;
136
        }
137
138
        $p = json_decode($body, true);
139
        if ($p === false) {
140
            throw new RoadRunnerException("invalid task context, JSON payload is expected");
141
        }
142
143
        // PID negotiation (socket connections only)
144
        if (!empty($p['pid'])) {
145
            $this->relay->send(
146
                sprintf('{"pid":%s}', getmypid()), Relay::PAYLOAD_CONTROL
147
            );
148
        }
149
150
        // termination request
151
        if (!empty($p['stop'])) {
152
            return false;
153
        }
154
155
        // parsed header
156
        $header = $p;
157
158
        return true;
159
    }
160
}