Completed
Push — master ( 7f22a0...381ac2 )
by Rémi
04:04
created

TransactionalQueuePublisher::commit()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 7
ccs 6
cts 6
cp 1
rs 9.4285
cc 2
eloc 4
nc 2
nop 0
crap 2
1
<?php
2
3
namespace RemiSan\TransactionManager\Amqp;
4
5
use Burrow\QueuePublisher;
6
use RemiSan\TransactionManager\Exception\TransactionException;
7
use RemiSan\TransactionManager\Transactional;
8
9
class TransactionalQueuePublisher implements QueuePublisher, Transactional
10
{
11
    /** @var QueuePublisher */
12
    private $publisher;
13
14
    /** @var array */
15
    private $messages;
16
17
    /** @var bool */
18
    private $running;
19
20
    /**
21
     * Constructor.
22
     *
23
     * @param QueuePublisher $publisher
24
     */
25 15
    public function __construct(QueuePublisher $publisher)
26
    {
27 15
        $this->publisher = $publisher;
28 15
        $this->messages = [];
29 15
        $this->running = false;
30 15
    }
31
32
    /**
33
     * {@inheritdoc}
34
     */
35 12
    public function publish($data, $routingKey = "")
36
    {
37 12
        if (!$this->running) {
38 3
            throw new TransactionException('');
39
        }
40
41 9
        $this->messages[] = [
42 9
            'data' => $data,
43 3
            'routingKey' => $routingKey
44 6
        ];
45 9
    }
46
47
    /**
48
     * {@inheritdoc}
49
     */
50 9
    public function beginTransaction()
51
    {
52 9
        $this->messages = [];
53 9
        $this->running = true;
54 9
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59 3
    public function commit()
60
    {
61 3
        foreach ($this->messages as $message) {
62 3
            $this->publisher->publish($message['data'], $message['routingKey']);
63 2
        }
64 3
        $this->running = false;
65 3
    }
66
67
    /**
68
     * {@inheritdoc}
69
     */
70 3
    public function rollback()
71
    {
72 3
        $this->messages = [];
73 3
        $this->running = false;
74 3
    }
75
}
76