Passed
Pull Request — master (#20)
by Anatolyi
02:29
created

Producer::dispatch()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
eloc 5
nc 1
nop 2
dl 0
loc 10
ccs 6
cts 6
cp 1
crap 1
rs 10
c 2
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Simple\Queue;
6
7
use DateTimeImmutable;
8
use Simple\Queue\Store\StoreInterface;
9
10
/**
11
 * Class Producer
12
 *
13
 * Sending a message to the queue
14
 *
15
 * @package Simple\Queue
16
 */
17
class Producer
18
{
19
    /** @var StoreInterface */
20
    private StoreInterface $store;
21
22
    /** @var Config */
23
    private Config $config;
24
25
    /**
26
     * Producer constructor.
27
     * @param StoreInterface $store
28
     * @param Config|null $config
29
     */
30 15
    public function __construct(StoreInterface $store, ?Config $config = null)
31
    {
32 15
        $this->store = $store;
33 15
        $this->config = $config ?: Config::getDefault();
34 15
    }
35
36
    /**
37
     * Create new message
38
     *
39
     * @param string $queue
40
     * @param $body
41
     * @return Message
42
     * @throws QueueException
43
     */
44 5
    public function createMessage(string $queue, $body): Message
45
    {
46 5
        if (is_callable($body)) {
47 1
            throw new QueueException('The closure cannot be serialized.');
48
        }
49
50 4
        if (is_object($body) && method_exists($body, '__toString')) {
51 1
            $body = (string)$body;
52
        }
53
54 4
        if (is_object($body) || is_array($body)) {
55 2
            $body = $this->config->getSerializer()->serialize($body);
56
        }
57
58 4
        return new Message($queue, (string)$body);
59
    }
60
61
    /**
62
     * Redelivered a message to the queue
63
     *
64
     * @param Message $message
65
     * @return Message
66
     */
67 6
    public function makeRedeliveryMessage(Message $message): Message
68
    {
69 6
        $newStatus = ($message->getStatus() === Status::NEW || $message->getStatus() === Status::IN_PROCESS) ?
70 5
            Status::REDELIVERED :
71 6
            $message->getStatus();
72
73 6
        $redeliveredTime = (new DateTimeImmutable('now'))
74 6
            ->modify(sprintf('+%s seconds', $this->config->getRedeliveryTimeInSeconds()));
75
76
        if (
77 6
            $message->getRedeliveredAt() &&
78 6
            $message->getRedeliveredAt()->getTimestamp() > $redeliveredTime->getTimestamp()
79
        ) {
80 1
            $redeliveredTime = $message->getRedeliveredAt();
81
        }
82
83 6
        if ($newStatus === Status::FAILURE) {
84 1
            $redeliveredTime = null;
85
        }
86
87 6
        $redeliveredMessage = (new Message($message->getQueue(), $message->getBody()))
88 6
            ->changePriority($message->getPriority())
89 6
            ->withEvent($message->getEvent())
90 6
            ->changeRedeliveredAt($redeliveredTime);
91
92 6
        return (new MessageHydrator($redeliveredMessage))
93 6
            ->changeStatus($newStatus)
94 6
            ->jobable($message->isJob())
95 6
            ->setError($message->getError())
96 6
            ->changeAttempts($message->getAttempts() + 1)
97 6
            ->getMessage();
98
    }
99
100
    /**
101
     * Dispatch a job
102
     *
103
     * @param string $jobName
104
     * @param array $data
105
     * @throws QueueException
106
     */
107 2
    public function dispatch(string $jobName, array $data): void
108
    {
109 2
        $job = $this->config->getJob($jobName);
110
111 1
        $message = $this->createMessage($job->queue(), $data)
112 1
            ->withEvent($this->config->getJobAlias($jobName));
113
114 1
        $message = (new MessageHydrator($message))->jobable()->getMessage();
115
116 1
        $this->send($message);
117 1
    }
118
119
    /**
120
     * Send message to queue
121
     *
122
     * @param Message $message
123
     */
124 6
    public function send(Message $message): void
125
    {
126 6
        $this->store->send($message);
127 5
    }
128
}
129