TransactionalQueuePublisher::publish()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 8
cts 8
cp 1
rs 9.8666
c 0
b 0
f 0
cc 2
nc 2
nop 3
crap 2
1
<?php
2
3
namespace RemiSan\TransactionManager\Amqp;
4
5
use Burrow\QueuePublisher;
6
use RemiSan\TransactionManager\Exception\TransactionException;
7
use RemiSan\TransactionManager\Transactional;
8
9
final class TransactionalQueuePublisher implements QueuePublisher, Transactional
10
{
11
    /** @var QueuePublisher */
12
    private $publisher;
13
14
    /** @var array */
15
    private $messages;
16
17
    /** @var bool */
18
    private $running;
19
20
    /**
21
     * Constructor.
22
     *
23
     * @param QueuePublisher $publisher
24
     */
25 12
    public function __construct(QueuePublisher $publisher)
26
    {
27 12
        $this->publisher = $publisher;
28 12
        $this->messages = [];
29 12
        $this->running = false;
30 12
    }
31
32
    /**
33
     * {@inheritdoc}
34
     */
35 9
    public function publish($data, $routingKey = '', array $headers = [])
36
    {
37 9
        if (!$this->running) {
38 3
            throw new TransactionException('Cannot publish outside a transaction');
39
        }
40
41 6
        $this->messages[] = [
42 6
            'data' => $data,
43 6
            'routingKey' => $routingKey,
44
            'headers' => $headers
45 6
        ];
46 6
    }
47
48
    /**
49
     * {@inheritdoc}
50
     */
51 6
    public function beginTransaction()
52
    {
53 6
        $this->running = true;
54 6
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59 3
    public function commit()
60
    {
61 3
        foreach ($this->messages as $message) {
62 3
            $this->publisher->publish($message['data'], $message['routingKey'], $message['headers']);
63 3
        }
64 3
        $this->messages = [];
65 3
        $this->running = false;
66 3
    }
67
68
    /**
69
     * {@inheritdoc}
70
     */
71 3
    public function rollback()
72
    {
73 3
        $this->messages = [];
74 3
        $this->running = false;
75 3
    }
76
}
77