AmqpMessagePublisher::publish()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
nc 3
nop 3
dl 0
loc 18
ccs 10
cts 10
cp 1
crap 3
rs 9.6666
c 0
b 0
f 0
1
<?php
2
3
namespace TreeHouse\Queue\Amqp\Driver\Amqp\Publisher;
4
5
use TreeHouse\Queue\Amqp\ExchangeInterface;
6
use TreeHouse\Queue\Message\Composer\MessageComposerInterface;
7
use TreeHouse\Queue\Message\Message;
8
use TreeHouse\Queue\Message\MessageProperties;
9
use TreeHouse\Queue\Message\Publisher\MessagePublisherInterface;
10
11
class AmqpMessagePublisher implements MessagePublisherInterface
12
{
13
    /**
14
     * @var ExchangeInterface
15
     */
16
    protected $exchange;
17
18
    /**
19
     * @var MessageComposerInterface
20
     */
21
    protected $composer;
22
23
    /**
24
     * @param ExchangeInterface        $exchange
25
     * @param MessageComposerInterface $composer
26
     */
27 7
    public function __construct(ExchangeInterface $exchange, MessageComposerInterface $composer)
28
    {
29 7
        $this->exchange = $exchange;
30 7
        $this->composer = $composer;
31 7
    }
32
33
    /**
34
     * @inheritdoc
35
     */
36 6
    public function createMessage($payload, $priority = self::DEFAULT_PRIORITY)
37
    {
38 6
        $message = $this->composer->compose($payload);
39 6
        $message->setPriority($priority);
40
41 6
        return $message;
42
    }
43
44
    /**
45
     * @inheritdoc
46
     */
47 4
    public function publish(Message $message, \DateTime $date = null, $flags = ExchangeInterface::NOPARAM)
48
    {
49 4
        if ($date instanceof \DateTime) {
50 2
            $delay = $date->getTimestamp() - time();
51 2
            if ($delay < 0) {
52 1
                throw new \OutOFBoundsException('You cannot publish a message in the past');
53
            }
54
55
            // set delay in milliseconds
56 1
            $message->setHeader(MessageProperties::KEY_DELAY, $delay * 1000);
57
        }
58
59 3
        $body = $message->getBody();
60 3
        $route = $message->getRoutingKey();
61 3
        $props = $message->getProperties()->toArray();
62
63 3
        return $this->exchange->publish($body, $route, $flags, $props);
64
    }
65
}
66