Completed
Push — master ( 540e0e...34cb7f )
by
unknown
14s queued 12s
created

Worker::sendPackage()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 9
rs 9.9666
c 0
b 0
f 0
cc 2
nc 1
nop 2
1
<?php
2
3
/**
4
 * High-performance PHP process supervisor and load balancer written in Go
5
 *
6
 * @author Wolfy-J
7
 */
8
declare(strict_types=1);
9
10
namespace Spiral\RoadRunner;
11
12
use Spiral\Goridge\Exceptions\GoridgeException;
13
use Spiral\Goridge\RelayInterface as Relay;
14
use Spiral\RoadRunner\Exception\RoadRunnerException;
15
16
/**
17
 * Accepts connection from RoadRunner server over given Goridge relay.
18
 *
19
 * Example:
20
 *
21
 * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT));
22
 * while ($task = $worker->receive($context)) {
23
 *      $worker->send("DONE", json_encode($context));
24
 * }
25
 */
26
class Worker
27
{
28
    // Send as response context to request worker termination
29
    public const STOP = '{"stop":true}';
30
31
    /** @var Relay */
32
    private $relay;
33
34
    /**
35
     * @param Relay $relay
36
     */
37
    public function __construct(Relay $relay)
38
    {
39
        $this->relay = $relay;
40
    }
41
42
    /**
43
     * Receive packet of information to process, returns null when process must be stopped. Might
44
     * return Error to wrap error message from server.
45
     *
46
     * @param mixed $header
47
     * @return \Error|null|string
48
     *
49
     * @throws GoridgeException
50
     */
51
    public function receive(&$header)
52
    {
53
        $body = $this->relay->receiveSync($flags);
54
55
        if ($flags & Relay::PAYLOAD_CONTROL) {
56
            if ($this->handleControl($body, $header, $flags)) {
57
                // wait for the next command
58
                return $this->receive($header);
59
            }
60
61
            // no context for the termination.
62
            $header = null;
63
64
            // Expect process termination
65
            return null;
66
        }
67
68
        if ($flags & Relay::PAYLOAD_ERROR) {
69
            return new \Error((string)$body);
0 ignored issues
show
Unused Code introduced by
The call to Error::__construct() has too many arguments starting with (string) $body.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
70
        }
71
72
        return $body;
73
    }
74
75
    /**
76
     * Respond to the server with result of task execution and execution context.
77
     *
78
     * Example:
79
     * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders()));
80
     *
81
     * @param string|null $payload
82
     * @param string|null $header
83
     */
84
    public function send(string $payload = null, string $header = null): void
85
    {
86
        if ($header === null) {
87
            $this->relay->send('', Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE);
88
        } else {
89
            $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
90
        }
91
92
        $this->relay->send((string)$payload, Relay::PAYLOAD_RAW);
93
    }
94
95
    /**
96
     * Respond to the server with result of task execution and execution context. Uses less amount of sys_calls.
97
     *
98
     * @param string|null $payload
99
     * @param string|null $header
100
     */
101
    public function sendPackage(string $payload = null, string $header = null): void
102
    {
103
        $this->relay->sendPackage(
0 ignored issues
show
Bug introduced by
The method sendPackage() does not seem to exist on object<Spiral\Goridge\RelayInterface>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
104
            (string)$header,
105
            Relay::PAYLOAD_CONTROL | ($header ? Relay::PAYLOAD_NONE : Relay::PAYLOAD_RAW),
106
            (string)$payload,
107
            Relay::PAYLOAD_RAW
108
        );
109
    }
110
111
    /**
112
     * Respond to the server with an error. Error must be treated as TaskError and might not cause
113
     * worker destruction.
114
     *
115
     * Example:
116
     *
117
     * $worker->error("invalid payload");
118
     *
119
     * @param string $message
120
     */
121
    public function error(string $message): void
122
    {
123
        $this->relay->send(
124
            $message,
125
            Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR
126
        );
127
    }
128
129
    /**
130
     * Terminate the process. Server must automatically pass task to the next available process.
131
     * Worker will receive StopCommand context after calling this method.
132
     *
133
     * Attention, you MUST use continue; after invoking this method to let rr to properly
134
     * stop worker.
135
     *
136
     * @throws GoridgeException
137
     */
138
    public function stop(): void
139
    {
140
        $this->send(null, self::STOP);
141
    }
142
143
    /**
144
     * Handles incoming control command payload and executes it if required.
145
     *
146
     * @param string $body
147
     * @param mixed  $header Exported context (if any).
148
     * @param int    $flags
149
     * @return bool True when continue processing.
150
     *
151
     * @throws RoadRunnerException
152
     */
153
    private function handleControl(string $body = null, &$header = null, int $flags = 0): bool
154
    {
155
        $header = $body;
156
        if ($body === null || $flags & Relay::PAYLOAD_RAW) {
157
            // empty or raw prefix
158
            return true;
159
        }
160
161
        $p = json_decode($body, true);
162
        if ($p === false) {
163
            throw new RoadRunnerException('invalid task context, JSON payload is expected');
164
        }
165
166
        // PID negotiation (socket connections only)
167
        if (!empty($p['pid'])) {
168
            $this->relay->send(
169
                sprintf('{"pid":%s}', getmypid()),
170
                Relay::PAYLOAD_CONTROL
171
            );
172
        }
173
174
        // termination request
175
        if (!empty($p['stop'])) {
176
            return false;
177
        }
178
179
        // parsed header
180
        $header = $p;
181
182
        return true;
183
    }
184
}
185