Passed
Push — master ( ddf6f5...2e933f )
by Nicolas
03:42 queued 28s
created

Pecl::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 0
cts 8
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 6
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Puzzle\AMQP\Clients;
4
5
use Puzzle\Configuration;
6
use Puzzle\PrefixedConfiguration;
7
8
use Puzzle\AMQP\Client;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\NullLogger;
11
use Puzzle\AMQP\WritableMessage;
12
use Puzzle\AMQP\Clients\Processors\MessageProcessorAware;
13
use Puzzle\AMQP\Clients\MemoryManagementStrategies\NullMemoryManagementStrategy;
14
15
class Pecl implements Client
16
{
17
    use
18
        LoggerAwareTrait,
19
        MessageProcessorAware;
20
21
    const
22
        DEFAULT_PORT = 5672;
23
24
    private
25
        $applicationId,
26
        $configuration,
27
        $channel,
28
        $memoryManagementStrategy;
29
30
    public function __construct(Configuration $configuration)
31
    {
32
        $this->applicationId = $configuration->read('app/id', 'Unknown application');
33
        $this->configuration = $configuration;
34
        $this->channel = null;
35
        $this->memoryManagementStrategy = new NullMemoryManagementStrategy();
36
        $this->logger = new NullLogger();
37
    }
38
39
    public function setMemoryManagementStrategy(MemoryManagementStrategy $strategy)
40
    {
41
        $this->memoryManagementStrategy = $strategy;
42
43
        return $this;
44
    }
45
46
    private function ensureIsConnected()
47
    {
48
        if(! $this->channel instanceof \AMQPChannel)
49
        {
50
            $configuration = new PrefixedConfiguration($this->configuration, 'amqp/broker');
51
52
            // Create a connection
53
            $connection = new \AMQPConnection();
54
            $connection->setHost($configuration->readRequired('host'));
55
            $connection->setLogin($configuration->readRequired('login'));
56
            $connection->setPassword($configuration->readRequired('password'));
57
            $connection->setPort($configuration->read('port', self::DEFAULT_PORT));
58
59
            $vhost = $configuration->read('vhost', null);
60
            if($vhost !== null)
61
            {
62
                $connection->setVhost($vhost);
63
            }
64
65
            $connection->connect();
66
67
            // Create a channel
68
            $this->channel = new \AMQPChannel($connection);
69
        }
70
    }
71
72
    public function publish($exchangeName, WritableMessage $message)
73
    {
74
        if($message->isChunked())
75
        {
76
            $client = new ChunkedMessageClient($this, $this->memoryManagementStrategy);
77
78
            return $client->publish($exchangeName, $message);
79
        }
80
81
        try
82
        {
83
            $ex = $this->getExchange($exchangeName);
84
        }
85
        catch(\Exception $e)
86
        {
87
            $this->logMessage($exchangeName, $message);
88
89
            return false;
90
        }
91
92
        return $this->sendMessage($ex, $message);
93
    }
94
95
    private function logMessage($exchangeName, WritableMessage $message)
96
    {
97
        $log = json_encode(array(
98
            'exchange' => $exchangeName,
99
            'message' => (string) $message,
100
        ));
101
102
        $this->logger->error($log, ['This message was involved by an error (it was sent ... or not. Please check other logs)']);
103
    }
104
105
    private function sendMessage(\AMQPExchange $ex, WritableMessage $message)
106
    {
107
        try
108
        {
109
            $this->updateMessageAttributes($message);
110
111
            $ex->publish(
112
                $message->getBodyInTransportFormat(),
113
                $message->getRoutingKey(),
114
                $this->computeMessageFlag($message),
115
                $message->packAttributes()
116
            );
117
        }
118
        catch(\Exception $e)
119
        {
120
            $this->logMessage($ex->getName(), $message);
121
122
            return false;
123
        }
124
125
        return true;
126
    }
127
128
    private function computeMessageFlag(WritableMessage $message)
129
    {
130
        $flag = AMQP_NOPARAM;
131
        $disallowSilentDropping = $this->configuration->read('amqp/global/disallowSilentDropping', false);
132
133
        if($disallowSilentDropping === true || $message->canBeDroppedSilently() === false)
134
        {
135
            $flag = AMQP_MANDATORY;
136
        }
137
138
        return $flag;
139
    }
140
141
    public function getExchange($exchangeName = null, $type = AMQP_EX_TYPE_TOPIC)
142
    {
143
        $this->ensureIsConnected();
144
145
        $ex = new \AMQPExchange($this->channel);
146
147
        if(!empty($exchangeName))
148
        {
149
            $ex->setName($exchangeName);
150
            $ex->setType($type);
151
            $ex->setFlags(AMQP_PASSIVE);
152
            $ex->declareExchange();
153
        }
154
155
        return $ex;
156
    }
157
158
    private function updateMessageAttributes(WritableMessage $message)
159
    {
160
        $message->setAttribute('app_id', $this->applicationId);
161
        $message->addHeader('routing_key', $message->getRoutingKey());
162
163
        $this->onPublish($message);
164
    }
165
166
    public function getQueue($queueName)
167
    {
168
        $this->ensureIsConnected();
169
170
        $queue = new \AMQPQueue($this->channel);
171
        $queue->setName($queueName);
172
173
        return $queue;
174
    }
175
}
176