Completed
Push — master ( 424dcf...baf946 )
by Nicolas
02:34
created

Pecl::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 7
ccs 0
cts 7
cp 0
rs 9.4285
cc 1
eloc 5
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Puzzle\AMQP\Clients;
4
5
use Puzzle\Configuration;
6
use Puzzle\PrefixedConfiguration;
7
8
use Puzzle\AMQP\Client;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\NullLogger;
11
use Puzzle\AMQP\WritableMessage;
12
13
class Pecl implements Client
14
{
15
    use LoggerAwareTrait;
16
17
    private
18
        $applicationId,
1 ignored issue
show
Coding Style introduced by
The visibility should be declared for property $applicationId.

The PSR-2 coding standard requires that all properties in a class have their visibility explicitly declared. If you declare a property using

class A {
    var $property;
}

the property is implicitly global.

To learn more about the PSR-2, please see the PHP-FIG site on the PSR-2.

Loading history...
19
        $configuration,
20
        $channel;
21
22
    public function __construct(Configuration $configuration)
23
    {
24
        $this->applicationId = $configuration->read('app/id', 'Unknown application');
25
        $this->configuration = $configuration;
26
        $this->channel = null;
27
        $this->logger = new NullLogger();
28
    }
29
30
    private function ensureIsConnected()
31
    {
32
        if(! $this->channel instanceof \AMQPChannel)
33
        {
34
            $configuration = new PrefixedConfiguration($this->configuration, 'amqp/broker');
35
36
            // Create a connection
37
            $connection = new \AMQPConnection();
38
            $connection->setHost($configuration->readRequired('host'));
39
            $connection->setLogin($configuration->readRequired('login'));
40
            $connection->setPassword($configuration->readRequired('password'));
41
42
            $vhost = $configuration->read('vhost', null);
43
            if($vhost !== null)
44
            {
45
                $connection->setVhost($vhost);
46
            }
47
48
            $connection->connect();
49
50
            // Create a channel
51
            $this->channel = new \AMQPChannel($connection);
52
        }
53
    }
54
55
    public function publish($exchangeName, WritableMessage $message)
56
    {
57
        try
58
        {
59
            $ex = $this->getExchange($exchangeName);
60
        }
61
        catch(\Exception $e)
62
        {
63
            $this->logMessage($exchangeName, $message);
64
65
            return false;
66
        }
67
68
        return $this->sendMessage($ex, $message);
69
    }
70
71
    private function logMessage($exchangeName, WritableMessage $message)
72
    {
73
        $log = json_encode(array(
74
            'exchange' => $exchangeName,
75
            'message' => (string) $message,
76
        ));
77
78
        $this->logger->error($log, ['This message was involved by an error (it was sent ... or not. Please check other logs)']);
79
    }
80
81
    private function sendMessage(\AMQPExchange $ex, WritableMessage $message)
82
    {
83
        try
84
        {
85
            $this->updateMessageAttributes($message);
86
87
            $ex->publish(
88
                $message->getBodyInTransportFormat(),
89
                $message->getRoutingKey(),
90
                $this->computeMessageFlag(),
91
                $message->packAttributes()
92
            );
93
        }
94
        catch(\Exception $e)
95
        {
96
            $this->logMessage($ex->getName(), $message);
97
98
            return false;
99
        }
100
101
        return true;
102
    }
103
    
104
    private function computeMessageFlag()
105
    {
106
        $flag = AMQP_NOPARAM;
107
        $disallowSilentDropping = $this->configuration->read('amqp/global/disallowSilentDropping', false);
108
        
109
        if($disallowSilentDropping === true || $message->canBeDroppedSilently() === false)
0 ignored issues
show
Bug introduced by
The variable $message does not exist. Did you forget to declare it?

This check marks access to variables or properties that have not been declared yet. While PHP has no explicit notion of declaring a variable, accessing it before a value is assigned to it is most likely a bug.

Loading history...
110
        {
111
            $flag = AMQP_MANDATORY;
112
        }
113
        
114
        return $flag;
115
    }
116
117
    public function getExchange($exchangeName = null, $type = AMQP_EX_TYPE_TOPIC)
118
    {
119
        $this->ensureIsConnected();
120
121
        $ex = new \AMQPExchange($this->channel);
122
123
        if(!empty($exchangeName))
124
        {
125
            $ex->setName($exchangeName);
126
            $ex->setType($type);
127
            $ex->setFlags(AMQP_PASSIVE);
128
            $ex->declareExchange();
129
        }
130
131
        return $ex;
132
    }
133
134
    private function updateMessageAttributes(WritableMessage $message)
135
    {
136
        $message->setAttribute('app_id', $this->applicationId);
137
        $message->addHeader('routing_key', $message->getRoutingKey());
138
    }
139
140
    public function getQueue($queueName)
141
    {
142
        $this->ensureIsConnected();
143
144
        $queue = new \AMQPQueue($this->channel);
145
        $queue->setName($queueName);
146
147
        return $queue;
148
    }
149
}
150