Completed
Pull Request — master (#18)
by Peter
04:08
created

AmqpMessagePublisher   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 55
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 4

Test Coverage

Coverage 92.86%

Importance

Changes 2
Bugs 0 Features 2
Metric Value
wmc 5
c 2
b 0
f 2
lcom 2
cbo 4
dl 0
loc 55
ccs 13
cts 14
cp 0.9286
rs 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A createMessage() 0 7 1
A publish() 0 18 3
1
<?php
2
3
namespace TreeHouse\Queue\Amqp\Driver\Amqp\Publisher;
4
5
use TreeHouse\Queue\Amqp\ExchangeInterface;
6
use TreeHouse\Queue\Message\Composer\MessageComposerInterface;
7
use TreeHouse\Queue\Message\Message;
8
use TreeHouse\Queue\Message\MessageProperties;
9
use TreeHouse\Queue\Message\Publisher\MessagePublisherInterface;
10
11
class AmqpMessagePublisher implements MessagePublisherInterface
12
{
13
    /**
14
     * @var ExchangeInterface
15
     */
16
    protected $exchange;
17
18
    /**
19
     * @var MessageComposerInterface
20
     */
21
    protected $composer;
22
23
    /**
24
     * @param ExchangeInterface        $exchange
25
     * @param MessageComposerInterface $composer
26
     */
27
    public function __construct(ExchangeInterface $exchange, MessageComposerInterface $composer)
28
    {
29
        $this->exchange = $exchange;
30
        $this->composer = $composer;
31
    }
32
33
    /**
34
     * @inheritdoc
35 6
     */
36
    public function createMessage($payload, $priority = self::DEFAULT_PRIORITY)
37 6
    {
38 6
        $message = $this->composer->compose($payload);
39 6
        $message->setPriority($priority);
40
41
        return $message;
42
    }
43
44 5
    /**
45
     * @inheritdoc
46 5
     */
47 5
    public function publish(Message $message, \DateTime $date = null, $flags = ExchangeInterface::NOPARAM)
48
    {
49 5
        if ($date instanceof \DateTime) {
50
            $delay = $date->getTimestamp() - time();
51
            if ($delay < 0) {
52
                throw new \OutOFBoundsException('You cannot publish a message in the past');
53
            }
54
55 3
            // set delay in milliseconds
56
            $message->setHeader(MessageProperties::KEY_DELAY, $delay * 1000);
57 3
        }
58 1
59
        $body = $message->getBody();
60 1
        $route = $message->getRoutingKey();
61 1
        $props = $message->getProperties()->toArray();
62
63
        return $this->exchange->publish($body, $route, $flags, $props);
64
    }
65
}
66