Completed
Push — master ( 5fca0d...2a987b )
by Quim
02:16
created

Subscriber::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 2
1
<?php
2
namespace Cmp\Queues\Domain\Event;
3
4
use Cmp\Queues\Domain\Event\Exception\DomainEventException;
5
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
6
use Cmp\Queues\Domain\Queue\QueueReader;
7
use Psr\Log\LoggerInterface;
8
9
class Subscriber
10
{
11
    /**
12
     * @var QueueReader
13
     */
14
    protected $queueReader;
15
16
    /**
17
     * @var LoggerInterface
18
     */
19
    protected $logger;
20
21
    /**
22
     * @var EventSubscriptor[]
23
     */
24
    protected $subscriptors = [];
25
26
    /**
27
     * Subscriber constructor.
28
     * @param QueueReader $queueReader
29
     */
30
    public function __construct(QueueReader $queueReader, LoggerInterface $logger)
31
    {
32
        $this->queueReader = $queueReader;
33
        $this->logger = $logger;
34
    }
35
36
    /**
37
     * @param EventSubscriptor $eventSubscriptor
38
     * @return $this
39
     */
40
    public function subscribe(EventSubscriptor $eventSubscriptor)
41
    {
42
        $this->subscriptors[] = $eventSubscriptor;
43
        return $this;
44
    }
45
46
    public function start($timeout=0)
47
    {
48
        while(true) {
49
            try {
50
                $this->processOne($timeout);
51
            } catch(TimeoutReaderException $e) {
52
                break;
53
            }
54
        }
55
    }
56
57
    public function processOne($timeout)
58
    {
59
        if(!isset($this->subscriptors[0])) {
60
            throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
61
        }
62
        $this->queueReader->read(array($this, 'notify'), $timeout);
63
    }
64
65
    /**
66
     * @param DomainEvent $domainEvent
67
     */
68
    public function notify(DomainEvent $domainEvent)
69
    {
70
        $this->logger->info('Domain Event received, notifying subscribers');
71
        foreach($this->subscriptors as $subscriptor) {
72
            if($subscriptor->isSubscribed($domainEvent)) {
73
                $subscriptor->notify($domainEvent);
74
            }
75
        }
76
    }
77
78
    /**
79
     * @return EventSubscriptor[]
80
     */
81
    public function getSubscriptors()
82
    {
83
        return $this->subscriptors;
84
    }
85
}