| @@ 7-58 (lines=52) @@ | ||
| 4 | use Cmp\Queues\Domain\Event\Exception\DomainEventException; |
|
| 5 | use Cmp\Queues\Domain\Queue\QueueWriter; |
|
| 6 | ||
| 7 | class Publisher |
|
| 8 | { |
|
| 9 | /** |
|
| 10 | * @var QueueWriter |
|
| 11 | */ |
|
| 12 | protected $queueWriter; |
|
| 13 | ||
| 14 | /** |
|
| 15 | * @var DomainEvent[] |
|
| 16 | */ |
|
| 17 | protected $events = []; |
|
| 18 | ||
| 19 | /** |
|
| 20 | * Publisher constructor. |
|
| 21 | * @param QueueWriter $queueWriter |
|
| 22 | */ |
|
| 23 | public function __construct(QueueWriter $queueWriter) |
|
| 24 | { |
|
| 25 | $this->queueWriter = $queueWriter; |
|
| 26 | } |
|
| 27 | ||
| 28 | /** |
|
| 29 | * Add Domain Events to buffer |
|
| 30 | * @param DomainEvent $event |
|
| 31 | * @return $this |
|
| 32 | */ |
|
| 33 | public function add(DomainEvent $event) |
|
| 34 | { |
|
| 35 | $this->events[] = $event; |
|
| 36 | return $this; |
|
| 37 | } |
|
| 38 | ||
| 39 | /** |
|
| 40 | * @throws DomainEventException |
|
| 41 | */ |
|
| 42 | public function publish() |
|
| 43 | { |
|
| 44 | if(!isset($this->events[0])) { |
|
| 45 | throw new DomainEventException('You must add at least 1 DomainEvent in order to publish to queue.'); |
|
| 46 | } |
|
| 47 | $this->queueWriter->write($this->events); |
|
| 48 | $this->events = []; |
|
| 49 | } |
|
| 50 | ||
| 51 | /** |
|
| 52 | * @return DomainEvent[] |
|
| 53 | */ |
|
| 54 | public function getEvents() |
|
| 55 | { |
|
| 56 | return $this->events; |
|
| 57 | } |
|
| 58 | } |
|
| @@ 7-65 (lines=59) @@ | ||
| 4 | use Cmp\Queues\Domain\Queue\QueueWriter; |
|
| 5 | use Cmp\Queues\Domain\Task\Exception\TaskException; |
|
| 6 | ||
| 7 | class Producer |
|
| 8 | { |
|
| 9 | /** |
|
| 10 | * @var QueueWriter |
|
| 11 | */ |
|
| 12 | protected $queueWriter; |
|
| 13 | ||
| 14 | /** |
|
| 15 | * @var Task[] |
|
| 16 | */ |
|
| 17 | protected $tasks = []; |
|
| 18 | ||
| 19 | /** |
|
| 20 | * Producer constructor. |
|
| 21 | * @param QueueWriter $queueWriter |
|
| 22 | */ |
|
| 23 | public function __construct(QueueWriter $queueWriter) |
|
| 24 | { |
|
| 25 | $this->queueWriter = $queueWriter; |
|
| 26 | } |
|
| 27 | ||
| 28 | /** |
|
| 29 | * @param TaskInterface $task |
|
| 30 | * @return $this |
|
| 31 | */ |
|
| 32 | public function add(TaskInterface $task) |
|
| 33 | { |
|
| 34 | $this->tasks[] = $task; |
|
| 35 | return $this; |
|
| 36 | } |
|
| 37 | ||
| 38 | /** |
|
| 39 | * @throws TaskException |
|
| 40 | */ |
|
| 41 | public function produce() |
|
| 42 | { |
|
| 43 | if(!isset($this->tasks[0])) { |
|
| 44 | throw new TaskException('You must add at least 1 Task before producing.'); |
|
| 45 | } |
|
| 46 | $this->queueWriter->write($this->tasks); |
|
| 47 | $this->tasks = []; |
|
| 48 | } |
|
| 49 | ||
| 50 | /** |
|
| 51 | * @return Task[] |
|
| 52 | */ |
|
| 53 | public function getTasks() |
|
| 54 | { |
|
| 55 | return $this->tasks; |
|
| 56 | } |
|
| 57 | ||
| 58 | /** |
|
| 59 | * @return QueueWriter |
|
| 60 | */ |
|
| 61 | public function getQueueWriter() |
|
| 62 | { |
|
| 63 | return $this->queueWriter; |
|
| 64 | } |
|
| 65 | } |
|
| 66 | ||