Test Failed
Pull Request — master (#39)
by Aleksandr
05:36
created

RpcClient::createAnonQueueDeclaration()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nc 1
nop 0
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RPC;
4
5
use OldSound\RabbitMqBundle\Declarations\BindingDeclaration;
6
use OldSound\RabbitMqBundle\Declarations\Declarator;
7
use OldSound\RabbitMqBundle\Declarations\ConsumeOptions;
8
use OldSound\RabbitMqBundle\Declarations\QueueDeclaration;
9
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\BatchExecuteCallbackStrategy;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...ExecuteCallbackStrategy was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use OldSound\RabbitMqBundle\Producer\ProducerInterface;
11
use OldSound\RabbitMqBundle\RabbitMq\Exception\RpcResponseException;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...on\RpcResponseException was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
12
use OldSound\RabbitMqBundle\Serializer\JsonMessageBodySerializer;
13
use OldSound\RabbitMqBundle\Serializer\MessageBodySerializerInterface;
14
use PhpAmqpLib\Channel\AMQPChannel;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
0 ignored issues
show
Bug introduced by
The type Symfony\Component\Messen...ion\SerializerInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
17
use Symfony\Component\Serializer\Serializer;
18
19
class RpcClient implements BatchReceiverInterface
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\RPC\BatchReceiverInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
20
{
21
    const PROPERTY_REPLAY_TO = 'reply_to';
22
    const PROPERTY_CORRELATION_ID = 'correlation_id';
23
24
    /** @var AMQPChannel */
25
    private $channel;
26
    /** @var int */
27
    private $expiration;
28
29
    /** @var QueueDeclaration */
30
    private $anonRepliesQueue;
31
    /** @var MessageBodySerializerInterface[] */
32
    private $serializers;
33
34
    /** @var int */
35
    private $requests = 0;
36
    /** @var AMQPMessage[] */
37
    private $messages;
38
    private $replies = [];
0 ignored issues
show
introduced by
The private property $replies is not used, and could be removed.
Loading history...
39
40
    public function __construct(
41
        AMQPChannel $channel,
42
        int $expiration = 10000
43
    ) {
44
        $this->channel = $channel;
45
        $this->serializer = $serializer ?? new JsonMessageBodySerializer();
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $serializer seems to never exist and therefore isset should always be false.
Loading history...
Bug Best Practice introduced by
The property serializer does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
46
        $this->expiration = $expiration;
47
    }
48
49
    public function declareRepliesQueue($repliesQueueName = null): RpcClient
50
    {
51
        $this->anonRepliesQueue = $this->createAnonQueueDeclaration();
52
        $this->anonRepliesQueue->name = $repliesQueueName;
53
        $declarator = new Declarator($this->channel);
54
        [$queueName] = $declarator->declareQueues([$this->anonRepliesQueue]);
55
        $this->anonRepliesQueue->name = $queueName;
56
57
        return $this;
58
    }
59
60
    // TODO public move
61
    private function createAnonQueueDeclaration(): QueueDeclaration
62
    {
63
        $anonQueueDeclaration = new QueueDeclaration();
64
        $anonQueueDeclaration->passive = false;
65
        $anonQueueDeclaration->durable = false;
66
        $anonQueueDeclaration->exclusive = true;
67
        $anonQueueDeclaration->autoDelete = true;
68
        $anonQueueDeclaration->nowait = false;
69
70
        return $anonQueueDeclaration;
71
    }
72
73
    public function addRequest($msgBody, $rpcQueue, MessageBodySerializerInterface $serializer = null)
74
    {
75
        if (!$this->anonRepliesQueue) {
76
            throw new \LogicException('no init anonRepliesQueue');
77
        }
78
79
        $correlationId = $this->requests;
80
        $this->serializers[$correlationId] = $serializer;
81
82
        $serializer = $serializer ?? new JsonMessageBodySerializer();
83
84
        $replyToQueue = $this->anonRepliesQueue->name; // 'amq.rabbitmq.reply-to';
85
        $msg = new AMQPMessage($serializer->serialize($msgBody, 'json'), [
0 ignored issues
show
Unused Code introduced by
The call to OldSound\RabbitMqBundle\...Serializer::serialize() has too many arguments starting with 'json'. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

85
        $msg = new AMQPMessage($serializer->/** @scrutinizer ignore-call */ serialize($msgBody, 'json'), [

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
Unused Code introduced by
The call to OldSound\RabbitMqBundle\...rInterface::serialize() has too many arguments starting with 'json'. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

85
        $msg = new AMQPMessage($serializer->/** @scrutinizer ignore-call */ serialize($msgBody, 'json'), [

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
86
            self::PROPERTY_REPLAY_TO => $replyToQueue,
87
            self::PROPERTY_CORRELATION_ID => $correlationId,
88
            'content_type' => 'text/plain',
89
            'delivery_mode' => ProducerInterface::DELIVERY_MODE_NON_PERSISTENT,
90
            'expiration' => $this->expiration,
91
        ]);
92
93
        $this->channel->basic_publish($msg, '', $rpcQueue);
94
        $this->requests++;
95
    }
96
97
    public function batchExecute(array $messages)
98
    {
99
        if ($this->messages !== null) {
100
            throw new \LogicException('Rpc client consming should be called once by batch count limit');
101
        }
102
        $this->messages = $messages;
103
    }
104
105
    /**
106
     * @param $name
107
     * @param MessageBodySerializerInterface $serializer
108
     * @return array|AMQPMessage[]
109
     */
110
    public function getReplies($name): array
111
    {
112
        if (0 === $this->requests) {
113
            throw new \LogicException('request empty');
114
        }
115
116
        $consumer = new Consumer($this->channel);
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\RPC\Consumer was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
117
        $consuming = new ConsumeOptions();
118
        $consuming->exclusive = true;
119
        $consuming->qosPrefetchCount = $this->requests;
120
        $consuming->queue = $this->anonRepliesQueue->name;
121
        $consuming->receiver = $this;
122
        $consumer->consumeQueue($consuming, new BatchExecuteCallbackStrategy($this->requests));
123
124
        try {
125
            $consumer->consume($this->requests);
126
        } finally {
127
            // TODO $this->getChannel()->basic_cancel($consumer_tag);
128
        }
129
130
        $replices = [];
131
        foreach($this->messages as $message) {
132
            /** @var AMQPMessage $message */
133
            if (!$message->has('correlation_id')) {
134
                $this->logger->error('unexpected message. rpc replies have no correlation_id ');
135
                continue;
136
            }
137
138
            $correlationId = $message->get('correlation_id');
139
            $serializer = $this->serializers[$correlationId];
140
            $reply = $serializer ? $serializer->deserialize($message->body) : $message;
141
            $replices[$correlationId] = $reply;
142
        }
143
        ksort($replices);
144
        return $replices;
145
    }
146
}
147