Completed
Push — master ( 4bf91f...d42076 )
by Nicolas
03:14
created

Pecl::convertEnvelopeToMessage()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 31
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 31
ccs 0
cts 31
cp 0
rs 8.8571
cc 1
eloc 26
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Puzzle\AMQP\Clients;
4
5
use Puzzle\Configuration;
6
use Puzzle\PrefixedConfiguration;
7
8
use Puzzle\AMQP\Client;
9
use Puzzle\AMQP\Workers\MessageAdapter;
10
use Psr\Log\LoggerAwareTrait;
11
use Psr\Log\NullLogger;
12
use Puzzle\AMQP\WritableMessage;
13
14
class Pecl implements Client
15
{
16
    use LoggerAwareTrait;
17
18
    private
19
        $applicationId,
1 ignored issue
show
Coding Style introduced by
The visibility should be declared for property $applicationId.

The PSR-2 coding standard requires that all properties in a class have their visibility explicitly declared. If you declare a property using

class A {
    var $property;
}

the property is implicitly global.

To learn more about the PSR-2, please see the PHP-FIG site on the PSR-2.

Loading history...
20
        $configuration,
21
        $channel;
22
23
    public function __construct(Configuration $configuration)
24
    {
25
        $this->applicationId = $configuration->read('app/id', 'Unknown application');
26
        $this->configuration = $configuration;
27
        $this->channel = null;
28
        $this->logger = new NullLogger();
29
    }
30
31
    private function ensureIsConnected()
32
    {
33
        if(! $this->channel instanceof \AMQPChannel)
34
        {
35
            $configuration = new PrefixedConfiguration($this->configuration, 'amqp/broker');
36
37
            // Create a connection
38
            $connection = new \AMQPConnection();
39
            $connection->setHost($configuration->readRequired('host'));
40
            $connection->setLogin($configuration->readRequired('login'));
41
            $connection->setPassword($configuration->readRequired('password'));
42
43
            $vhost = $configuration->read('vhost', null);
44
            if($vhost !== null)
45
            {
46
                $connection->setVhost($vhost);
47
            }
48
49
            $connection->connect();
50
51
            // Create a channel
52
            $this->channel = new \AMQPChannel($connection);
53
        }
54
    }
55
56
    public function publish($exchangeName, WritableMessage $message)
57
    {
58
        try
59
        {
60
            $ex = $this->getExchange($exchangeName);
61
        }
62
        catch(\Exception $e)
63
        {
64
            $this->logMessage($exchangeName, $message);
65
66
            return false;
67
        }
68
69
        return $this->sendMessage($ex, $message);
70
    }
71
72
    private function logMessage($exchangeName, WritableMessage $message)
73
    {
74
        $log = json_encode(array(
75
            'exchange' => $exchangeName,
76
            'message' => (string) $message,
77
        ));
78
79
        $this->logger->error($log, ['This message was involved by an error (it was sent ... or not. Please check other logs)']);
80
    }
81
82
    private function sendMessage(\AMQPExchange $ex, WritableMessage $message)
83
    {
84
        try
85
        {
86
            $this->updateMessageAttributes($message);
87
88
            $ex->publish(
89
                $message->getFormattedBody(),
90
                $message->getRoutingKey(),
91
                $message->getFlags(),
92
                $message->packAttributes()
93
            );
94
        }
95
        catch(\Exception $e)
96
        {
97
            $this->logMessage($ex->getName(), $message);
98
99
            return false;
100
        }
101
102
        return true;
103
    }
104
105
    public function getExchange($exchangeName = null, $type = AMQP_EX_TYPE_TOPIC)
106
    {
107
        $this->ensureIsConnected();
108
109
        $ex = new \AMQPExchange($this->channel);
110
111
        if(!empty($exchangeName))
112
        {
113
            $ex->setName($exchangeName);
114
            $ex->setType($type);
115
            $ex->setFlags(AMQP_PASSIVE);
116
            $ex->declareExchange();
117
        }
118
119
        return $ex;
120
    }
121
122
    private function updateMessageAttributes(WritableMessage $message)
123
    {
124
        $message->setAttribute('app_id', $this->applicationId);
125
        $message->addHeader('routing_key', $message->getRoutingKey());
126
    }
127
128
    public function getQueue($queueName)
129
    {
130
        $this->ensureIsConnected();
131
132
        $queue = new \AMQPQueue($this->channel);
133
        $queue->setName($queueName);
134
135
        return $queue;
136
    }
137
}
138