OutboxConnector   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 122
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 8
lcom 1
cbo 9
dl 0
loc 122
ccs 38
cts 38
cp 1
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 13 1
A invoke() 0 33 3
A dispatchPendingOperations() 0 13 2
A getStageContextClass() 0 4 1
A getNextStageContextClass() 0 4 1
1
<?php
2
namespace PSB\Core\Outbox\Pipeline;
3
4
5
use PSB\Core\Outbox\OutboxMessage;
6
use PSB\Core\Outbox\OutboxStorageInterface;
7
use PSB\Core\Pipeline\Incoming\IncomingContextFactory;
8
use PSB\Core\Pipeline\Incoming\StageContext\IncomingPhysicalMessageContext;
9
use PSB\Core\Pipeline\Incoming\StageContext\TransportReceiveContext;
10
use PSB\Core\Pipeline\Incoming\TransportOperationsConverter;
11
use PSB\Core\Pipeline\Outgoing\OutgoingContextFactory;
12
use PSB\Core\Pipeline\PendingTransportOperations;
13
use PSB\Core\Pipeline\PipelineInterface;
14
use PSB\Core\Pipeline\StageConnectorInterface;
15
16
class OutboxConnector implements StageConnectorInterface
17
{
18
    /**
19
     * @var PipelineInterface
20
     */
21
    private $dispatchPipeline;
22
23
    /**
24
     * @var OutboxStorageInterface
25
     */
26
    private $outboxStorage;
27
28
    /**
29
     * @var IncomingContextFactory
30
     */
31
    private $incomingContextFactory;
32
33
    /**
34
     * @var OutgoingContextFactory
35
     */
36
    private $outgoingContextFactory;
37
38
    /**
39
     * @var TransportOperationsConverter
40
     */
41
    private $operationsConverter;
42
43
    /**
44
     * @param PipelineInterface            $dispatchPipeline
45
     * @param OutboxStorageInterface       $outboxStorage
46
     * @param IncomingContextFactory       $incomingContextFactory
47
     * @param OutgoingContextFactory       $outgoingContextFactory
48
     * @param TransportOperationsConverter $operationsConverter
49
     */
50 8
    public function __construct(
51
        PipelineInterface $dispatchPipeline,
52
        OutboxStorageInterface $outboxStorage,
53
        IncomingContextFactory $incomingContextFactory,
54
        OutgoingContextFactory $outgoingContextFactory,
55
        TransportOperationsConverter $operationsConverter
56
    ) {
57 8
        $this->dispatchPipeline = $dispatchPipeline;
58 8
        $this->outboxStorage = $outboxStorage;
59 8
        $this->incomingContextFactory = $incomingContextFactory;
60 8
        $this->operationsConverter = $operationsConverter;
61 8
        $this->outgoingContextFactory = $outgoingContextFactory;
62 8
    }
63
64
    /**
65
     * @param TransportReceiveContext $context
66
     * @param callable                $next
67
     *
68
     * @throws \Throwable
69
     */
70 5
    public function invoke($context, callable $next)
71
    {
72 5
        $messageId = $context->getMessageId();
73 5
        $physicalMessageContext = $this->incomingContextFactory->createPhysicalMessageContext($context);
74
75 5
        $deduplicationEntry = $this->outboxStorage->get($messageId);
76 5
        $pendingTransportOperations = $physicalMessageContext->getPendingTransportOperations();
77
78 5
        if (!$deduplicationEntry) {
79 2
            $this->outboxStorage->beginTransaction();
80
81
            try {
82 2
                $next($physicalMessageContext);
83
84 1
                $outboxOperations = $this->operationsConverter->convertToOutboxOperations($pendingTransportOperations);
85
86 1
                $outboxMessage = new OutboxMessage($messageId, $outboxOperations);
87 1
                $this->outboxStorage->store($outboxMessage);
88 1
                $this->outboxStorage->commit();
89 1
            } catch (\Throwable $t) {
90 1
                $this->outboxStorage->rollBack();
91 2
                throw $t;
92
            }
93
        } else {
94 3
            $pendingTransportOperations = $this->operationsConverter->convertToPendingTransportOperations(
95 3
                $deduplicationEntry
96
            );
97
        }
98
99 4
        $this->dispatchPendingOperations($pendingTransportOperations, $physicalMessageContext);
100
101 4
        $this->outboxStorage->markAsDispatched($messageId);
102 4
    }
103
104
    /**
105
     * @param PendingTransportOperations     $pendingTransportOperations
106
     * @param IncomingPhysicalMessageContext $physicalMessageContext
107
     */
108 4
    private function dispatchPendingOperations(
109
        PendingTransportOperations $pendingTransportOperations,
110
        IncomingPhysicalMessageContext $physicalMessageContext
111
    ) {
112 4
        if ($pendingTransportOperations->hasOperations()) {
113 2
            $dispatchContext = $this->outgoingContextFactory->createDispatchContext(
114 2
                $pendingTransportOperations->getOperations(),
115 2
                $physicalMessageContext
116
            );
117
118 2
            $this->dispatchPipeline->invoke($dispatchContext);
119
        }
120 4
    }
121
122
    /**
123
     * @return string
124
     */
125 1
    public static function getStageContextClass()
126
    {
127 1
        return TransportReceiveContext::class;
128
    }
129
130
    /**
131
     * @return string
132
     */
133 1
    public static function getNextStageContextClass()
134
    {
135 1
        return IncomingPhysicalMessageContext::class;
136
    }
137
}
138