Completed
Push — master ( 34cb7f...ca7300 )
by
unknown
16s queued 11s
created

Worker::sendPackage()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 9
rs 9.9666
c 0
b 0
f 0
cc 2
nc 1
nop 2
1
<?php
2
3
/**
4
 * High-performance PHP process supervisor and load balancer written in Go
5
 *
6
 * @author Wolfy-J
7
 */
8
declare(strict_types=1);
9
10
namespace Spiral\RoadRunner;
11
12
use Spiral\Goridge\Exceptions\GoridgeException;
13
use Spiral\Goridge\RelayInterface as Relay;
14
use Spiral\RoadRunner\Exception\RoadRunnerException;
15
16
/**
17
 * Accepts connection from RoadRunner server over given Goridge relay.
18
 *
19
 * Example:
20
 *
21
 * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT));
22
 * while ($task = $worker->receive($context)) {
23
 *      $worker->send("DONE", json_encode($context));
24
 * }
25
 */
26
class Worker
27
{
28
    // Send as response context to request worker termination
29
    public const STOP = '{"stop":true}';
30
31
    /** @var Relay */
32
    private $relay;
33
34
    /**
35
     * @param Relay $relay
36
     */
37
    public function __construct(Relay $relay)
38
    {
39
        $this->relay = $relay;
40
    }
41
42
    /**
43
     * Receive packet of information to process, returns null when process must be stopped. Might
44
     * return Error to wrap error message from server.
45
     *
46
     * @param mixed $header
47
     * @return \Error|null|string
48
     *
49
     * @throws GoridgeException
50
     */
51
    public function receive(&$header)
52
    {
53
        $body = $this->relay->receiveSync($flags);
54
55
        if ($flags & Relay::PAYLOAD_CONTROL) {
56
            if ($this->handleControl($body, $header, $flags)) {
57
                // wait for the next command
58
                return $this->receive($header);
59
            }
60
61
            // no context for the termination.
62
            $header = null;
63
64
            // Expect process termination
65
            return null;
66
        }
67
68
        if ($flags & Relay::PAYLOAD_ERROR) {
69
            return new \Error((string)$body);
0 ignored issues
show
Unused Code introduced by
The call to Error::__construct() has too many arguments starting with (string) $body.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
70
        }
71
72
        return $body;
73
    }
74
75
    /**
76
     * Respond to the server with result of task execution and execution context.
77
     *
78
     * Example:
79
     * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders()));
80
     *
81
     * @param string|null $payload
82
     * @param string|null $header
83
     */
84
    public function send(string $payload = null, string $header = null): void
85
    {
86
        $this->relay->sendPackage(
0 ignored issues
show
Bug introduced by
The method sendPackage() does not seem to exist on object<Spiral\Goridge\RelayInterface>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
87
            (string)$header,
88
            Relay::PAYLOAD_CONTROL | ($header === null ? Relay::PAYLOAD_NONE : Relay::PAYLOAD_RAW),
89
            (string)$payload,
90
            Relay::PAYLOAD_RAW
91
        );
92
    }
93
94
    /**
95
     * Respond to the server with an error. Error must be treated as TaskError and might not cause
96
     * worker destruction.
97
     *
98
     * Example:
99
     *
100
     * $worker->error("invalid payload");
101
     *
102
     * @param string $message
103
     */
104
    public function error(string $message): void
105
    {
106
        $this->relay->send(
107
            $message,
108
            Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR
109
        );
110
    }
111
112
    /**
113
     * Terminate the process. Server must automatically pass task to the next available process.
114
     * Worker will receive StopCommand context after calling this method.
115
     *
116
     * Attention, you MUST use continue; after invoking this method to let rr to properly
117
     * stop worker.
118
     *
119
     * @throws GoridgeException
120
     */
121
    public function stop(): void
122
    {
123
        $this->send(null, self::STOP);
124
    }
125
126
    /**
127
     * Handles incoming control command payload and executes it if required.
128
     *
129
     * @param string $body
130
     * @param mixed  $header Exported context (if any).
131
     * @param int    $flags
132
     * @return bool True when continue processing.
133
     *
134
     * @throws RoadRunnerException
135
     */
136
    private function handleControl(string $body = null, &$header = null, int $flags = 0): bool
137
    {
138
        $header = $body;
139
        if ($body === null || $flags & Relay::PAYLOAD_RAW) {
140
            // empty or raw prefix
141
            return true;
142
        }
143
144
        $p = json_decode($body, true);
145
        if ($p === false) {
146
            throw new RoadRunnerException('invalid task context, JSON payload is expected');
147
        }
148
149
        // PID negotiation (socket connections only)
150
        if (!empty($p['pid'])) {
151
            $this->relay->send(
152
                sprintf('{"pid":%s}', getmypid()),
153
                Relay::PAYLOAD_CONTROL
154
            );
155
        }
156
157
        // termination request
158
        if (!empty($p['stop'])) {
159
            return false;
160
        }
161
162
        // parsed header
163
        $header = $p;
164
165
        return true;
166
    }
167
}
168