Passed
Push — master ( 3969eb...5e03f4 )
by Tomasz
01:33
created

Client::consume()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 3
nc 3
nop 1
1
<?php
2
/**
3
 *
4
 * This file is part of the Aggrego.
5
 * (c) Tomasz Kunicki <[email protected]>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 */
11
12
declare(strict_types = 1);
13
14
namespace Aggrego\CommandLogicUnit\EventConsumer;
15
16
use Aggrego\CommandConsumer\Client as CommandConsumerClient;
17
use Aggrego\CommandLogicUnit\ResponseProcessor\Factory;
18
use Aggrego\CommandLogicUnit\EventProcessor\EventProcessor;
19
use Aggrego\EventConsumer\Client as EventConsumerClient;
20
use Aggrego\EventConsumer\Event;
21
use Aggrego\EventConsumer\Exception\UnprocessableEventException;
22
23
class Client implements EventConsumerClient
24
{
25
    /** @var EventProcessor */
26
    private $eventProcessor;
27
28
    /** @var CommandConsumerClient */
29
    private $commandConsumer;
30
31
    /** @var Factory */
32
    private $responseProcessorFactory;
33
34
    /** @var EventConsumerClient */
35
    private $eventConsumer;
36
37
    public function __construct(
38
        EventProcessor $eventProcessor,
39
        CommandConsumerClient $commandConsumer,
40
        Factory $responseProcessorFactory,
41
        EventConsumerClient $eventConsumer
42
    )
43
    {
44
        $this->eventProcessor = $eventProcessor;
45
        $this->commandConsumer = $commandConsumer;
46
        $this->responseProcessorFactory = $responseProcessorFactory;
47
        $this->eventConsumer = $eventConsumer;
48
    }
49
50
    /**
51
     * @param Event $event
52
     * @throws UnprocessableEventException if event (payload) have invalid structure.
53
     */
54
    public function consume(Event $event): void
55
    {
56
        foreach ($this->eventProcessor->transform($event) as $command) {
57
            $response = $this->commandConsumer->consume($command);
58
59
            $responseProcessor = $this->responseProcessorFactory->create($command, $response);
60
            $responseProcessor->processResponse($command, $response);
61
62
            foreach ($responseProcessor->pullEvents() as $event) {
63
                $this->eventConsumer->consume($event);
64
            }
65
        }
66
    }
67
}
68