CommandBus::dispatchNow()   A
last analyzed

Complexity

Conditions 2
Paths 3

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 13
rs 9.4285
c 1
b 0
f 0
cc 2
eloc 9
nc 3
nop 1
1
<?php
2
/*
3
 * This file is part of the Borobudur-Cqrs package.
4
 *
5
 * (c) Hexacodelabs <http://hexacodelabs.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 */
10
11
namespace Borobudur\Cqrs\Bus;
12
13
use Borobudur\Bus\Bus;
14
use Borobudur\Bus\Message\MessageInterface;
15
use Borobudur\Cqrs\Exception\InvalidArgumentException;
16
use Borobudur\Cqrs\Message\CommandInterface;
17
use Borobudur\Cqrs\ReadModel\Storage\TransactionalInterface;
18
use Borobudur\Queue\JobInterface;
19
use Borobudur\Queue\Queue;
20
use Borobudur\Queue\QueueMessage;
21
use Exception;
22
23
/**
24
 * @author      Iqbal Maulana <[email protected]>
25
 * @created     8/18/15
26
 */
27
class CommandBus extends Bus
28
{
29
    const BUS_NAME = 'bus.command';
30
31
    /**
32
     * @var TransactionalInterface
33
     */
34
    protected $transaction;
35
36
    /**
37
     * Constructor.
38
     *
39
     * @param TransactionalInterface|null $transaction
40
     */
41
    public function __construct(TransactionalInterface $transaction = null)
42
    {
43
        $this->transaction = $transaction;
44
    }
45
46
    /**
47
     * {@inheritdoc}
48
     */
49
    public function getName()
50
    {
51
        return CommandBus::BUS_NAME;
52
    }
53
54
    /**
55
     * {@inheritdoc}
56
     */
57
    public function dispatch(MessageInterface $command)
58
    {
59
        if (!$command instanceof CommandInterface) {
60
            throw new InvalidArgumentException(sprintf(
61
                'Command "%s" should implement \Borobudur\Cqrs\Message\CommandInterface',
62
                get_class($command)
63
            ));
64
        }
65
66
        if ($command instanceof JobInterface && $command->isQueued()) {
67
            Queue::getInstance()->send(new QueueMessage($command));
68
            
69
            return null;
70
        }
71
72
        return $this->dispatchNow($command);
73
    }
74
75
    /**
76
     * Dispatch command immediately.
77
     *
78
     * @param MessageInterface $command
79
     *
80
     * @return mixed
81
     * @throws Exception
82
     */
83
    public function dispatchNow(MessageInterface $command)
84
    {
85
        $this->beginTransaction();
86
        try {
87
            $result = parent::dispatchNow($command);
88
            $this->commit();
89
90
            return $result;
91
        } catch (Exception $e) {
92
            $this->rollback();
93
            throw $e;
94
        }
95
    }
96
97
    /**
98
     * Prepare transaction if possible.
99
     */
100
    protected function beginTransaction()
101
    {
102
        if (null !== $this->transaction) {
103
            $this->transaction->beginTransaction();
104
        }
105
    }
106
107
    /**
108
     * Commit transaction if exist.
109
     */
110
    protected function commit()
111
    {
112
        if (null !== $this->transaction) {
113
            $this->transaction->commit();
114
        }
115
    }
116
117
    /**
118
     * Rollback transaction if exist.
119
     */
120
    protected function rollback()
121
    {
122
        if (null !== $this->transaction) {
123
            $this->transaction->rollback();
124
        }
125
    }
126
}
127