Completed
Push — master ( 7ef906...de99d2 )
by BENOIT
02:11
created

QueueWorker::getLastResponse()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
namespace BenTools\GuzzleQueueHandler;
4
5
use GuzzleHttp\ClientInterface;
6
use function GuzzleHttp\Psr7\parse_request;
7
use function GuzzleHttp\Psr7\str;
8
use function GuzzleHttp\json_decode;
9
use Interop\Queue\PsrConnectionFactory;
10
use Interop\Queue\PsrConsumer;
11
use Interop\Queue\PsrContext;
12
use Interop\Queue\PsrMessage;
13
use Interop\Queue\PsrQueue;
14
use Psr\Http\Message\RequestInterface;
15
use Psr\Http\Message\ResponseInterface;
16
use SplObserver;
17
18
class QueueWorker implements \SplSubject
19
{
20
21
    const STATE_REQUEST = 1;
22
    const STATE_RESPONSE = 2;
23
24
    /**
25
     * @var PsrContext
26
     */
27
    private $context;
28
29
    /**
30
     * @var PsrQueue
31
     */
32
    private $queue;
33
34
    /**
35
     * @var ClientInterface
36
     */
37
    private $guzzle;
38
39
    /**
40
     * @var float
41
     */
42
    private $timeout;
43
44
    /**
45
     * @var \SplObserver[]
46
     */
47
    private $observers = [];
48
49
    /**
50
     * @var RequestInterface
51
     */
52
    private $currentRequest;
53
54
    /**
55
     * @var ResponseInterface
56
     */
57
    private $lastResponse;
58
59
    /**
60
     * @var int
61
     */
62
    private $state;
63
64
    /**
65
     * EnqueuePromisor constructor.
66
     * @param PsrContext      $context
67
     * @param string          $queueName
68
     * @param ClientInterface $guzzle
69
     * @param float           $timeout
70
     * @throws \InvalidArgumentException
71
     */
72
    public function __construct(PsrContext $context, ClientInterface $guzzle, $queueName = '', $timeout = 0.0)
73
    {
74
        $this->context = $context;
75
        if ('' === $queueName) {
76
            throw new \InvalidArgumentException("Queue name must not be blank.");
77
        }
78
        $this->guzzle = $guzzle;
79
        $this->queue = $context->createQueue($queueName);
80
        $this->timeout = (float) $timeout;
81
    }
82
83
    /**
84
     * @param PsrConnectionFactory $factory
85
     * @param string               $queueName
86
     * @param ClientInterface      $guzzle
87
     * @param float                $timeout
88
     * @return QueueWorker
89
     */
90
    public static function factory(PsrConnectionFactory $factory, ClientInterface $guzzle, $queueName = '', $timeout = 0.0)
91
    {
92
        return new static($factory->createContext(), $guzzle, $queueName, $timeout);
93
    }
94
95
    /**
96
     * @param null                    $maxRequests
97
     * @param \DateTimeImmutable|null $endAt
98
     */
99
    public function loop($maxRequests = null, \DateTimeImmutable $endAt = null)
100
    {
101
        $nbRequests = 0;
102
        $consumer = $this->context->createConsumer($this->queue);
103
        while (true) {
104
            if (0.0 === $this->timeout) {
105
                while (null === ($message = $consumer->receive())) {
106
                    usleep(10000);
107
                }
108
            } else {
109
                $max = microtime(true) + $this->timeout;
110
                while (null === ($message = $consumer->receive($this->timeout)) && microtime(true) <= $max) {
111
                    usleep(10000);
112
                }
113
            }
114
115
            // Process the message
116
            $this->process($message, $consumer);
117
118
            // If max requests reached
119
            if (null !== $maxRequests) {
120
                $nbRequests++;
121
                if ($nbRequests >= $maxRequests) {
122
                    break;
123
                }
124
            }
125
126
            // If maximum time reached
127
            if (null !== $endAt && time() >= $endAt->format('U')) {
128
                break;
129
            }
130
        }
131
    }
132
133
    /**
134
     * @return RequestInterface
135
     */
136
    public function getCurrentRequest()
137
    {
138
        return $this->currentRequest;
139
    }
140
141
    /**
142
     * @return ResponseInterface
143
     */
144
    public function getLastResponse()
145
    {
146
        return $this->lastResponse;
147
    }
148
149
    /**
150
     * @return int
151
     */
152
    public function getState()
153
    {
154
        return $this->state;
155
    }
156
157
    /**
158
     * @param PsrMessage  $message
159
     * @param PsrConsumer $consumer
160
     * @throws \GuzzleHttp\Exception\GuzzleException
161
     */
162
    private function process(PsrMessage $message, PsrConsumer $consumer)
163
    {
164
        /** @var RequestInterface $request */
165
        try {
166
            list($request, $options) = $this->unwrap($message);
167
            $request = parse_request($request);
168
        } catch (\InvalidArgumentException $e) {
169
            $message->setProperty('error', $e->getMessage());
170
            $consumer->reject($message);
171
            return;
172
        }
173
        $replyTo = $message->getReplyTo();
174
        if (!$replyTo) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $replyTo of type string|null is loosely compared to false; this is ambiguous if the string can be empty. You might want to explicitly use === null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
175
            $message->setProperty('error', 'replyTo was not filled.');
176
            $consumer->reject($message);
177
            return;
178
        }
179
180
        $options['handler'] = $this->guzzle->getConfig('handler');
181
        $options['synchronous'] = true;
182
        $options['http_errors'] = false;
183
184
        $this->lastResponse = null;
185
186
        try {
187
            $this->state = self::STATE_REQUEST;
188
            $this->currentRequest = $request;
189
            $this->notify();
190
191
            $response = $this->guzzle->send($request, $options);
192
193
            $this->state = self::STATE_RESPONSE;
194
            $this->lastResponse = $response;
195
            $this->notify();
196
197
198
            $queue = $this->context->createQueue($replyTo);
199
            $producer = $this->context->createProducer();
200
            $producer->send($queue, $this->context->createMessage(str($response)));
201
            $consumer->acknowledge($message);
202
        } catch (\Throwable $e) {
0 ignored issues
show
Bug introduced by
The class Throwable does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
203
            throw  $e;
204
            $message->setProperty('error', $e->getMessage());
0 ignored issues
show
Unused Code introduced by
$message->setProperty('error', $e->getMessage()); does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
205
            $consumer->reject($message);
206
        } catch (\Exception $e) {
207
            throw  $e;
208
            $message->setProperty('error', $e->getMessage());
0 ignored issues
show
Unused Code introduced by
$message->setProperty('error', $e->getMessage()); does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
209
            $consumer->reject($message);
210
        }
211
    }
212
213
    /**
214
     * @param PsrMessage $message
215
     * @return array
216
     */
217
    private function unwrap(PsrMessage $message)
218
    {
219
        $json = json_decode($message->getBody(), true);
220
        return $json;
221
    }
222
223
    /**
224
     * @inheritDoc
225
     */
226
    public function attach(SplObserver $observer)
227
    {
228
        $this->observers[] = $observer;
229
    }
230
231
    /**
232
     * @inheritDoc
233
     */
234
    public function detach(SplObserver $observer)
235
    {
236
        foreach ($this->observers as $o => $_observer) {
237
            if ($_observer === $observer) {
238
                unset($this->observers[$o]);
239
            }
240
        }
241
    }
242
243
    /**
244
     * @inheritDoc
245
     */
246
    public function notify()
247
    {
248
        if (!empty($this->observers)) {
249
            foreach ($this->observers as $observer) {
250
                $observer->update($this);
251
            }
252
        }
253
    }
254
}
255