Test Failed
Pull Request — master (#39)
by Aleksandr
02:42
created

RpcClient::declareRepliesQueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nc 1
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RPC;
4
5
use OldSound\RabbitMqBundle\Declarations\BindingDeclaration;
6
use OldSound\RabbitMqBundle\Declarations\Declarator;
7
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
8
use OldSound\RabbitMqBundle\Declarations\QueueDeclaration;
9
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\BatchExecuteCallbackStrategy;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...ExecuteCallbackStrategy was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use OldSound\RabbitMqBundle\RabbitMq\Exception\RpcResponseException;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...on\RpcResponseException was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
11
use OldSound\RabbitMqBundle\Serializer\JsonMessageBodySerializer;
12
use OldSound\RabbitMqBundle\Serializer\MessageBodySerializerInterface;
13
use PhpAmqpLib\Channel\AMQPChannel;
14
use PhpAmqpLib\Message\AMQPMessage;
15
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
0 ignored issues
show
Bug introduced by
The type Symfony\Component\Messen...ion\SerializerInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
16
use Symfony\Component\Serializer\Serializer;
17
18
class RpcClient implements BatchReceiverInterface
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\RPC\BatchReceiverInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
19
{
20
    /** @var AMQPChannel */
21
    private $channel;
22
    /** @var int */
23
    private $expiration;
24
25
    /** @var QueueDeclaration */
26
    private $anonRepliesQueue;
27
    /** @var MessageBodySerializerInterface[] */
28
    private $serializers;
29
30
    /** @var int */
31
    private $requests = 0;
32
    /** @var AMQPMessage[] */
33
    private $messages;
34
    private $replies = [];
0 ignored issues
show
introduced by
The private property $replies is not used, and could be removed.
Loading history...
35
36
    public function __construct(
37
        AMQPChannel $channel,
38
        int $expiration = 10000
39
    ) {
40
        $this->channel = $channel;
41
        $this->serializer = $serializer ?? new JsonMessageBodySerializer();
0 ignored issues
show
Bug Best Practice introduced by
The property serializer does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
Comprehensibility Best Practice introduced by
The variable $serializer seems to never exist and therefore isset should always be false.
Loading history...
42
        $this->expiration = $expiration;
43
    }
44
45
    public function declareRepliesQueue($repliesQueueName = null): RpcClient
46
    {
47
        $this->anonRepliesQueue = $this->createAnonQueueDeclaration();
48
        $this->anonRepliesQueue->name = $repliesQueueName;
49
        $declarator = new Declarator($this->channel);
50
        [$queueName] = $declarator->declareQueues([$this->anonRepliesQueue]);
51
        $this->anonRepliesQueue->name = $queueName;
52
53
        return $this;
54
    }
55
56
    // TODO public move
57
    private function createAnonQueueDeclaration(): QueueDeclaration
58
    {
59
        $anonQueueDeclaration = new QueueDeclaration();
60
        $anonQueueDeclaration->passive = false;
61
        $anonQueueDeclaration->durable = false;
62
        $anonQueueDeclaration->exclusive = true;
63
        $anonQueueDeclaration->autoDelete = true;
64
        $anonQueueDeclaration->nowait = false;
65
66
        return $anonQueueDeclaration;
67
    }
68
69
    public function addRequest($msgBody, $rpcQueue, MessageBodySerializerInterface $serializer = null)
70
    {
71
        if (!$this->anonRepliesQueue) {
72
            throw new \LogicException('no init anonRepliesQueue');
73
        }
74
75
        $correlationId = $this->requests;
76
        $this->serializers[$correlationId] = $serializer;
77
78
        $serializer = $serializer ?? new JsonMessageBodySerializer();
79
80
        $replyToQueue = $this->anonRepliesQueue->name; // 'amq.rabbitmq.reply-to';
81
        $msg = new AMQPMessage($serializer->serialize($msgBody, 'json'), [
0 ignored issues
show
Unused Code introduced by
The call to OldSound\RabbitMqBundle\...rInterface::serialize() has too many arguments starting with 'json'. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

81
        $msg = new AMQPMessage($serializer->/** @scrutinizer ignore-call */ serialize($msgBody, 'json'), [

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
Unused Code introduced by
The call to OldSound\RabbitMqBundle\...Serializer::serialize() has too many arguments starting with 'json'. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

81
        $msg = new AMQPMessage($serializer->/** @scrutinizer ignore-call */ serialize($msgBody, 'json'), [

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
82
            'content_type' => 'text/plain',
83
            'reply_to' => $replyToQueue,
84
            'delivery_mode' => 1, // non durable
85
            'expiration' => $this->expiration,
86
            'correlation_id' => $correlationId
87
        ]);
88
89
        $this->channel->basic_publish($msg, '', $rpcQueue);
90
        $this->requests++;
91
    }
92
93
    public function batchExecute(array $messages)
94
    {
95
        if ($this->messages !== null) {
96
            throw new \LogicException('Rpc client consming should be called once by batch count limit');
97
        }
98
        $this->messages = $messages;
99
    }
100
101
    /**
102
     * @param $name
103
     * @param MessageBodySerializerInterface $serializer
104
     * @return array|AMQPMessage[]
105
     */
106
    public function getReplies($name): array
107
    {
108
        if (0 === $this->requests) {
109
            throw new \LogicException('request empty');
110
        }
111
112
        $consumer = new Consumer($this->channel);
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\RPC\Consumer was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
113
        $consuming = new QueueConsuming();
114
        $consuming->exclusive = true;
115
        $consuming->qosPrefetchCount = $this->requests;
116
        $consuming->queueName = $this->anonRepliesQueue->name;
117
        $consuming->receiver = $this;
118
        $consumer->consumeQueue($consuming, new BatchExecuteCallbackStrategy($this->requests));
119
120
        try {
121
            $consumer->consume($this->requests);
122
        } finally {
123
            // TODO $this->getChannel()->basic_cancel($consumer_tag);
124
        }
125
126
        $replices = [];
127
        foreach($this->messages as $message) {
128
            /** @var AMQPMessage $message */
129
            if (!$message->has('correlation_id')) {
130
                $this->logger->error('unexpected message. rpc replies have no correlation_id ');
131
                continue;
132
            }
133
134
            $correlationId = $message->get('correlation_id');
135
            $serializer = $this->serializers[$correlationId];
136
            $reply = $serializer ? $serializer->deserialize($message->body) : $message;
137
            $replices[$correlationId] = $reply;
138
        }
139
        ksort($replices);
140
        return $replices;
141
    }
142
}
143