Completed
Push — master ( f016e7...2ea06c )
by Hilari
14s
created

Subscriber::start()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 16
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 16
rs 8.8571
c 0
b 0
f 0
cc 5
eloc 10
nc 5
nop 1
1
<?php
2
namespace Cmp\Queues\Domain\Event;
3
4
use Cmp\Queues\Domain\Event\Exception\DomainEventException;
5
use Cmp\Queues\Domain\Queue\Exception\GracefulStopException;
6
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
7
use Cmp\Queues\Domain\Queue\QueueReader;
8
use Psr\Log\LoggerInterface;
9
10
class Subscriber
11
{
12
    /**
13
     * @var QueueReader
14
     */
15
    protected $queueReader;
16
17
    /**
18
     * @var LoggerInterface
19
     */
20
    protected $logger;
21
22
    /**
23
     * @var EventSubscriptor[]
24
     */
25
    protected $subscriptors = [];
26
27
    /**
28
     * Subscriber constructor.
29
     * @param QueueReader $queueReader
30
     */
31
    public function __construct(QueueReader $queueReader, LoggerInterface $logger)
32
    {
33
        $this->queueReader = $queueReader;
34
        $this->logger = $logger;
35
    }
36
37
    /**
38
     * @param EventSubscriptor $eventSubscriptor
39
     * @return $this
40
     */
41
    public function subscribe(EventSubscriptor $eventSubscriptor)
42
    {
43
        $this->subscriptors[] = $eventSubscriptor;
44
        return $this;
45
    }
46
47
    public function start($timeout=0)
48
    {
49
        if(!isset($this->subscriptors[0])) {
50
            throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
51
        }
52
53
        while(true) {
54
            try {
55
                $this->queueReader->read(array($this, 'notify'), $timeout);
56
            } catch(TimeoutReaderException $e) {
57
                break;
58
            } catch(GracefulStopException $e) {
59
                break;
60
            }
61
        }
62
    }
63
64
    /**
65
     * @param DomainEvent $domainEvent
66
     */
67
    public function notify(DomainEvent $domainEvent)
68
    {
69
        $this->logger->debug('Domain Event received, notifying subscribers');
70
        foreach($this->subscriptors as $subscriptor) {
71
            if($subscriptor->isSubscribed($domainEvent)) {
72
                $subscriptor->notify($domainEvent);
73
            }
74
        }
75
    }
76
77
    /**
78
     * @return EventSubscriptor[]
79
     */
80
    public function getSubscriptors()
81
    {
82
        return $this->subscriptors;
83
    }
84
}