Passed
Pull Request — master (#42)
by Nicolas
09:50 queued 06:39
created

Pecl::getExchange()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 16
ccs 0
cts 13
cp 0
rs 9.4285
cc 2
eloc 9
nc 2
nop 2
crap 6
1
<?php
2
3
namespace Puzzle\AMQP\Clients;
4
5
use Puzzle\Configuration;
6
use Puzzle\PrefixedConfiguration;
7
8
use Puzzle\AMQP\Client;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\NullLogger;
11
use Puzzle\AMQP\WritableMessage;
12
use Puzzle\AMQP\Clients\Processors\MessageProcessorAware;
13
use Puzzle\AMQP\Clients\MemoryManagementStrategies\NullMemoryManagementStrategy;
14
15
class Pecl implements Client
16
{
17
    use
18
        LoggerAwareTrait,
19
        MessageProcessorAware;
20
21
    const
22
        DEFAULT_PORT = 5672;
23
24
    private
25
        $applicationId,
26
        $configuration,
27
        $channel,
28
        $memoryManagementStrategy;
29
30
    public function __construct(Configuration $configuration)
31
    {
32
        $this->applicationId = $configuration->read('app/id', 'Unknown application');
33
        $this->configuration = $configuration;
34
        $this->channel = null;
35
        $this->memoryManagementStrategy = new NullMemoryManagementStrategy();
36
        $this->logger = new NullLogger();
37
    }
38
39
    public function setMemoryManagementStrategy(MemoryManagementStrategy $strategy)
40
    {
41
        $this->memoryManagementStrategy = $strategy;
42
43
        return $this;
44
    }
45
46
    private function ensureIsConnected()
47
    {
48
        if(! $this->channel instanceof \AMQPChannel)
49
        {
50
            $configuration = new PrefixedConfiguration($this->configuration, 'amqp/broker');
51
52
            // Create a connection
53
            $connection = new \AMQPConnection();
54
            $connection->setHost($configuration->readRequired('host'));
55
            $connection->setLogin($configuration->readRequired('login'));
56
            $connection->setPassword($configuration->readRequired('password'));
57
            $connection->setPort($configuration->read('port', self::DEFAULT_PORT));
58
59
            $vhost = $configuration->read('vhost', null);
60
            if($vhost !== null)
61
            {
62
                $connection->setVhost($vhost);
63
            }
64
65
            $connection->connect();
66
67
            // Create a channel
68
            $this->channel = new \AMQPChannel($connection);
69
        }
70
    }
71
72
    public function publish($exchangeName, WritableMessage $message)
73
    {
74
        if($message->isChunked())
75
        {
76
            $client = new ChunkedMessageClient($client, $this->memoryManagementStrategy);
0 ignored issues
show
Bug introduced by
The variable $client seems only to be defined at a later point. Did you maybe move this code here without moving the variable definition?

This error can happen if you refactor code and forget to move the variable initialization.

Let’s take a look at a simple example:

function someFunction() {
    $x = 5;
    echo $x;
}

The above code is perfectly fine. Now imagine that we re-order the statements:

function someFunction() {
    echo $x;
    $x = 5;
}

In that case, $x would be read before it is initialized. This was a very basic example, however the principle is the same for the found issue.

Loading history...
77
78
            return $client->publish($exchangeName, $message);
79
        }
80
81
        try
82
        {
83
            $ex = $this->getExchange($exchangeName);
84
        }
85
        catch(\Exception $e)
86
        {
87
            $this->logMessage($exchangeName, $message);
88
89
            return false;
90
        }
91
92
        return $this->sendMessage($ex, $message);
93
    }
94
95
    private function logMessage($exchangeName, WritableMessage $message)
96
    {
97
        $log = json_encode(array(
98
            'exchange' => $exchangeName,
99
            'message' => (string) $message,
100
        ));
101
102
        $this->logger->error($log, ['This message was involved by an error (it was sent ... or not. Please check other logs)']);
103
    }
104
105
    private function sendMessage(\AMQPExchange $ex, WritableMessage $message)
106
    {
107
        try
108
        {
109
            $this->updateMessageAttributes($message);
110
111
            $ex->publish(
112
                $message->getBodyInTransportFormat(),
113
                $message->getRoutingKey(),
114
                $this->computeMessageFlag($message),
115
                $message->packAttributes()
116
            );
117
        }
118
        catch(\Exception $e)
119
        {
120
            $this->logMessage($ex->getName(), $message);
121
122
            return false;
123
        }
124
125
        return true;
126
    }
127
128
    private function computeMessageFlag(WritableMessage $message)
129
    {
130
        $flag = AMQP_NOPARAM;
131
        $disallowSilentDropping = $this->configuration->read('amqp/global/disallowSilentDropping', false);
132
133
        if($disallowSilentDropping === true || $message->canBeDroppedSilently() === false)
134
        {
135
            $flag = AMQP_MANDATORY;
136
        }
137
138
        return $flag;
139
    }
140
141
    public function getExchange($exchangeName = null, $type = AMQP_EX_TYPE_TOPIC)
142
    {
143
        $this->ensureIsConnected();
144
145
        $ex = new \AMQPExchange($this->channel);
146
147
        if(!empty($exchangeName))
148
        {
149
            $ex->setName($exchangeName);
150
            $ex->setType($type);
151
            $ex->setFlags(AMQP_PASSIVE);
152
            $ex->declareExchange();
153
        }
154
155
        return $ex;
156
    }
157
158
    private function updateMessageAttributes(WritableMessage $message)
159
    {
160
        $message->setAttribute('app_id', $this->applicationId);
161
        $message->addHeader('routing_key', $message->getRoutingKey());
162
163
        $this->onPublish($message);
164
    }
165
166
    public function getQueue($queueName)
167
    {
168
        $this->ensureIsConnected();
169
170
        $queue = new \AMQPQueue($this->channel);
171
        $queue->setName($queueName);
172
173
        return $queue;
174
    }
175
}
176