Completed
Pull Request — master (#24)
by Miguel
04:59
created

Subscriber::start()   A

Complexity

Conditions 6
Paths 6

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 18
rs 9.0444
c 0
b 0
f 0
cc 6
nc 6
nop 1
1
<?php
2
namespace Cmp\Queues\Domain\Event;
3
4
use Cmp\Queues\Domain\Event\Exception\DomainEventException;
5
use Cmp\Queues\Domain\Queue\Exception\GracefulStopException;
6
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
7
use Cmp\Queues\Domain\Queue\QueueReader;
8
use Cmp\Queues\Domain\Task\Exception\ParseMessageException;
9
use Psr\Log\LoggerInterface;
10
11
class Subscriber
12
{
13
    /**
14
     * @var QueueReader
15
     */
16
    protected $queueReader;
17
18
    /**
19
     * @var LoggerInterface
20
     */
21
    protected $logger;
22
23
    /**
24
     * @var EventSubscriptor[]
25
     */
26
    protected $subscriptors = [];
27
28
    /**
29
     * Subscriber constructor.
30
     * @param QueueReader $queueReader
31
     * @param LoggerInterface $logger
32
     */
33
    public function __construct(QueueReader $queueReader, LoggerInterface $logger)
34
    {
35
        $this->queueReader = $queueReader;
36
        $this->logger = $logger;
37
    }
38
39
    /**
40
     * @param EventSubscriptor $eventSubscriptor
41
     * @return $this
42
     */
43
    public function subscribe(EventSubscriptor $eventSubscriptor)
44
    {
45
        $this->subscriptors[] = $eventSubscriptor;
46
        return $this;
47
    }
48
49
    /**
50
     * @param int $timeout
51
     * @throws DomainEventException
52
     */
53
    public function start($timeout=0)
54
    {
55
        if(!isset($this->subscriptors[0])) {
56
            throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
57
        }
58
59
        while(true) {
60
            try {
61
                $this->queueReader->read(array($this, 'notify'), $timeout);
62
            } catch(ParseMessageException $e){
63
                break;
64
            } catch(TimeoutReaderException $e) {
65
                break;
66
            } catch(GracefulStopException $e) {
67
                break;
68
            }
69
        }
70
    }
71
72
    /**
73
     * @param int $timeout
74
     * @throws DomainEventException
75
     */
76
    public function batch($timeout=0)
77
    {
78
        if(!isset($this->subscriptors[0])) {
79
            throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
80
        }
81
82
        $this->queueReader->read(array($this, 'notify'), $timeout);
83
    }
84
85
    /**
86
     * @param DomainEvent $domainEvent
87
     */
88
    public function notify(DomainEvent $domainEvent)
89
    {
90
        $this->logger->debug('Domain Event received, notifying subscribers');
91
        foreach($this->subscriptors as $subscriptor) {
92
            if($subscriptor->isSubscribed($domainEvent)) {
93
                $subscriptor->notify($domainEvent);
94
            }
95
        }
96
    }
97
98
    /**
99
     * @return EventSubscriptor[]
100
     */
101
    public function getSubscriptors()
102
    {
103
        return $this->subscriptors;
104
    }
105
}