OutgoingPhysicalToDispatchConnector::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 5
cts 5
cp 1
rs 9.9666
c 0
b 0
f 0
cc 1
nc 1
nop 3
crap 1
1
<?php
2
namespace PSB\Core\Pipeline\Outgoing;
3
4
5
use PSB\Core\DateTimeConverter;
6
use PSB\Core\HeaderTypeEnum;
7
use PSB\Core\Pipeline\Outgoing\StageContext\DispatchContext;
8
use PSB\Core\Pipeline\Outgoing\StageContext\OutgoingPhysicalMessageContext;
9
use PSB\Core\Pipeline\StageConnectorInterface;
10
use PSB\Core\Transport\TransportOperation;
11
use PSB\Core\Util\Clock\ClockInterface;
12
13
class OutgoingPhysicalToDispatchConnector implements StageConnectorInterface
14
{
15
    /**
16
     * @var OutgoingContextFactory
17
     */
18
    private $contextFactory;
19
20
    /**
21
     * @var DateTimeConverter
22
     */
23
    private $dateTimeConverter;
24
25
    /**
26
     * @var ClockInterface
27
     */
28
    private $clock;
29
30
    /**
31
     *
32
     * @param OutgoingContextFactory $contextFactory
33
     * @param DateTimeConverter      $dateTimeConverter
34
     * @param ClockInterface         $clock
35
     */
36 5
    public function __construct(
37
        OutgoingContextFactory $contextFactory,
38
        DateTimeConverter $dateTimeConverter,
39
        ClockInterface $clock
40
    ) {
41 5
        $this->contextFactory = $contextFactory;
42 5
        $this->dateTimeConverter = $dateTimeConverter;
43 5
        $this->clock = $clock;
44 5
    }
45
46
    /**
47
     * @param OutgoingPhysicalMessageContext $context
48
     * @param callable                       $next
49
     */
50 2
    public function invoke($context, callable $next)
51
    {
52 2
        $context->setHeader(HeaderTypeEnum::MESSAGE_ID, $context->getMessageId());
53 2
        $context->setHeader(
54 2
            HeaderTypeEnum::TIME_SENT,
55 2
            $this->dateTimeConverter->toWireFormattedString($this->clock->now())
56
        );
57
58 2
        $transportOperations = [];
59 2
        foreach ($context->getAddressTags() as $addressTag) {
60 2
            $transportOperations[] = new TransportOperation(clone $context->getMessage(), $addressTag);
61
        }
62
63 2
        $pendingOperations = $context->getPendingTransportOperations();
64
65 2
        if (!$context->isImmediateDispatchEnabled() && $pendingOperations) {
66 1
            $pendingOperations->addAll($transportOperations);
67 1
            return;
68
        }
69
70 1
        $next($this->contextFactory->createDispatchContext($transportOperations, $context));
71 1
    }
72
73
    /**
74
     * @return string
75
     */
76 1
    public static function getStageContextClass()
77
    {
78 1
        return OutgoingPhysicalMessageContext::class;
79
    }
80
81
    /**
82
     * @return string
83
     */
84 1
    public static function getNextStageContextClass()
85
    {
86 1
        return DispatchContext::class;
87
    }
88
}
89