Worker   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 152
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Importance

Changes 0
Metric Value
wmc 17
lcom 1
cbo 2
dl 0
loc 152
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A receive() 0 23 4
A send() 0 19 4
A error() 0 7 1
A stop() 0 4 1
B handleControl() 0 31 6
1
<?php
2
3
/**
4
 * High-performance PHP process supervisor and load balancer written in Go
5
 *
6
 * @author Wolfy-J
7
 */
8
declare(strict_types=1);
9
10
namespace Spiral\RoadRunner;
11
12
use Spiral\Goridge\Exceptions\GoridgeException;
13
use Spiral\Goridge\RelayInterface as Relay;
14
use Spiral\Goridge\SendPackageRelayInterface;
15
use Spiral\RoadRunner\Exception\RoadRunnerException;
16
17
/**
18
 * Accepts connection from RoadRunner server over given Goridge relay.
19
 *
20
 * Example:
21
 *
22
 * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT));
23
 * while ($task = $worker->receive($context)) {
24
 *      $worker->send("DONE", json_encode($context));
25
 * }
26
 */
27
class Worker
28
{
29
    // Send as response context to request worker termination
30
    public const STOP = '{"stop":true}';
31
32
    /** @var Relay */
33
    private $relay;
34
35
    /**
36
     * @param Relay $relay
37
     */
38
    public function __construct(Relay $relay)
39
    {
40
        $this->relay = $relay;
41
    }
42
43
    /**
44
     * Receive packet of information to process, returns null when process must be stopped. Might
45
     * return Error to wrap error message from server.
46
     *
47
     * @param mixed $header
48
     * @return \Error|null|string
49
     *
50
     * @throws GoridgeException
51
     */
52
    public function receive(&$header)
53
    {
54
        $body = $this->relay->receiveSync($flags);
55
56
        if ($flags & Relay::PAYLOAD_CONTROL) {
57
            if ($this->handleControl($body, $header, $flags)) {
58
                // wait for the next command
59
                return $this->receive($header);
60
            }
61
62
            // no context for the termination.
63
            $header = null;
64
65
            // Expect process termination
66
            return null;
67
        }
68
69
        if ($flags & Relay::PAYLOAD_ERROR) {
70
            return new \Error((string)$body);
0 ignored issues
show
Unused Code introduced by
The call to Error::__construct() has too many arguments starting with (string) $body.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
71
        }
72
73
        return $body;
74
    }
75
76
    /**
77
     * Respond to the server with result of task execution and execution context.
78
     *
79
     * Example:
80
     * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders()));
81
     *
82
     * @param string|null $payload
83
     * @param string|null $header
84
     */
85
    public function send(string $payload = null, string $header = null): void
86
    {
87
        if (!$this->relay instanceof SendPackageRelayInterface) {
88
            if ($header === null) {
89
                $this->relay->send('', Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE);
90
            } else {
91
                $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
92
            }
93
94
            $this->relay->send((string)$payload, Relay::PAYLOAD_RAW);
95
        } else {
96
            $this->relay->sendPackage(
97
                (string)$header,
98
                Relay::PAYLOAD_CONTROL | ($header === null ? Relay::PAYLOAD_NONE : Relay::PAYLOAD_RAW),
99
                (string)$payload,
100
                Relay::PAYLOAD_RAW
101
            );
102
        }
103
    }
104
105
    /**
106
     * Respond to the server with an error. Error must be treated as TaskError and might not cause
107
     * worker destruction.
108
     *
109
     * Example:
110
     *
111
     * $worker->error("invalid payload");
112
     *
113
     * @param string $message
114
     */
115
    public function error(string $message): void
116
    {
117
        $this->relay->send(
118
            $message,
119
            Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR
120
        );
121
    }
122
123
    /**
124
     * Terminate the process. Server must automatically pass task to the next available process.
125
     * Worker will receive StopCommand context after calling this method.
126
     *
127
     * Attention, you MUST use continue; after invoking this method to let rr to properly
128
     * stop worker.
129
     *
130
     * @throws GoridgeException
131
     */
132
    public function stop(): void
133
    {
134
        $this->send(null, self::STOP);
135
    }
136
137
    /**
138
     * Handles incoming control command payload and executes it if required.
139
     *
140
     * @param string $body
141
     * @param mixed  $header Exported context (if any).
142
     * @param int    $flags
143
     * @return bool True when continue processing.
144
     *
145
     * @throws RoadRunnerException
146
     */
147
    private function handleControl(string $body = null, &$header = null, int $flags = 0): bool
148
    {
149
        $header = $body;
150
        if ($body === null || $flags & Relay::PAYLOAD_RAW) {
151
            // empty or raw prefix
152
            return true;
153
        }
154
155
        $p = json_decode($body, true);
156
        if ($p === false) {
157
            throw new RoadRunnerException('invalid task context, JSON payload is expected');
158
        }
159
160
        // PID negotiation (socket connections only)
161
        if (!empty($p['pid'])) {
162
            $this->relay->send(
163
                sprintf('{"pid":%s}', getmypid()),
164
                Relay::PAYLOAD_CONTROL
165
            );
166
        }
167
168
        // termination request
169
        if (!empty($p['stop'])) {
170
            return false;
171
        }
172
173
        // parsed header
174
        $header = $p;
175
176
        return true;
177
    }
178
}
179