QueueHandler   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 112
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Importance

Changes 0
Metric Value
wmc 14
lcom 1
cbo 9
dl 0
loc 112
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 9 2
A factory() 0 5 1
C __invoke() 0 39 8
A wrap() 0 11 3
1
<?php
2
3
namespace BenTools\GuzzleQueueHandler;
4
5
use GuzzleHttp\Exception\RequestException;
6
use GuzzleHttp\Promise\Promise;
7
use function GuzzleHttp\Psr7\parse_response;
8
use function GuzzleHttp\Psr7\str;
9
use Interop\Queue\PsrConnectionFactory;
10
use Interop\Queue\PsrContext;
11
use Interop\Queue\PsrQueue;
12
use Psr\Http\Message\RequestInterface;
13
use Psr\Http\Message\UriInterface;
14
15
class QueueHandler
16
{
17
    /**
18
     * @var PsrContext
19
     */
20
    private $context;
21
22
    /**
23
     * @var PsrQueue
24
     */
25
    private $queue;
26
    /**
27
     * @var int
28
     */
29
    private $timeout;
30
31
32
    /**
33
     * EnqueuePromisor constructor.
34
     * @param PsrContext $context
35
     * @param string     $queueName
36
     * @param float      $timeout
37
     * @throws \InvalidArgumentException
38
     */
39
    public function __construct(PsrContext $context, $queueName = '', $timeout = 0.0)
40
    {
41
        $this->context = $context;
42
        if ('' === $queueName) {
43
            throw new \InvalidArgumentException("Queue name must not be blank.");
44
        }
45
        $this->queue = $context->createQueue($queueName);
46
        $this->timeout = (float) $timeout;
0 ignored issues
show
Documentation Bug introduced by
The property $timeout was declared of type integer, but (double) $timeout is of type double. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
47
    }
48
49
    /**
50
     * @param PsrConnectionFactory $factory
51
     * @param string               $queueName
52
     * @param float                $timeout
53
     * @return QueueHandler
54
     */
55
    public static function factory(PsrConnectionFactory $factory, $queueName = '', $timeout = 0.0)
56
    {
57
        $context = $factory->createContext();
58
        return new static($context, $queueName, $timeout);
59
    }
60
61
    /**
62
     * @param RequestInterface $request
63
     * @return Promise
64
     * @throws \Interop\Queue\Exception
65
     * @throws \Interop\Queue\InvalidDestinationException
66
     * @throws \Interop\Queue\InvalidMessageException
67
     * @throws \InvalidArgumentException
68
     */
69
    public function __invoke(RequestInterface $request, array $options)
70
    {
71
        $message = $this->wrap($request, $options);
72
        $replyTo = $this->queue->getQueueName() . strtr(uniqid(null, true), '.', 0);
73
        $message->setReplyTo($replyTo);
74
        $this->context->createProducer()->send($this->queue, $message);
75
        $promise = new Promise(function () use (&$promise, $replyTo, $request) {
76
            try {
77
                $queue = $this->context->createQueue($replyTo);
78
                $consumer = $this->context->createConsumer($queue);
79
80
81
                if (0.0 === $this->timeout) {
82
                    while (null === ($message = $consumer->receive())) {
83
                        usleep(100);
84
                    }
85
                } else {
86
                    $max = microtime(true) + $this->timeout;
87
                    while (null === ($message = $consumer->receive($this->timeout)) && microtime(true) <= $max) {
88
                        usleep(100);
89
                    }
90
                }
91
92
                if (null === $message) {
93
                    throw new RequestException('Timeout reached.', $request);
94
                }
95
96
                if (null !== $message->getProperty('error')) {
97
                    throw RequestException::create($request, null, new \RuntimeException($message->getProperty('error')));
98
                }
99
                $consumer->acknowledge($message);
100
                $response = parse_response($message->getBody());
101
                $promise->resolve($response);
102
            } catch (\Exception $e) {
103
                $promise->reject($e);
104
            }
105
        });
106
        return $promise;
107
    }
108
109
    /**
110
     * @param RequestInterface $request
111
     * @param array            $options
112
     * @return \Interop\Queue\PsrMessage
113
     * @throws \InvalidArgumentException
114
     */
115
    private function wrap(RequestInterface $request, array $options)
116
    {
117
        if (isset($options['base_uri']) && $options['base_uri'] instanceof UriInterface) {
118
            $options['base_uri'] = (string) $options['base_uri'];
119
        }
120
        $message = $this->context->createMessage(json_encode([
121
            str($request->withRequestTarget((string) $request->getUri())),
122
            $options,
123
        ]));
124
        return $message;
125
    }
126
}
127