Passed
Push — master ( 5bd05b...9a2fe3 )
by
unknown
48s queued 11s
created

Producer   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 110
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 3
Bugs 0 Features 1
Metric Value
eloc 39
dl 0
loc 110
ccs 42
cts 42
cp 1
rs 10
c 3
b 0
f 1
wmc 16

5 Methods

Rating   Name   Duplication   Size   Complexity  
B makeRedeliveryMessage() 0 31 6
A __construct() 0 4 2
A createMessage() 0 15 6
A send() 0 3 1
A dispatch() 0 10 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Simple\Queue;
6
7
use DateTimeImmutable;
8
use Simple\Queue\Store\StoreInterface;
9
10
/**
11
 * Class Producer
12
 *
13
 * Sending a message to the queue
14
 *
15
 * @package Simple\Queue
16
 */
17
class Producer
18
{
19
    /** @var StoreInterface */
20
    private StoreInterface $store;
21
22
    /** @var Config */
23
    private Config $config;
24
25
    /**
26
     * Producer constructor.
27
     * @param StoreInterface $store
28
     * @param Config|null $config
29
     */
30 15
    public function __construct(StoreInterface $store, ?Config $config = null)
31
    {
32 15
        $this->store = $store;
33 15
        $this->config = $config ?: Config::getDefault();
34 15
    }
35
36
    /**
37
     * Create new message
38
     *
39
     * @param string $queue
40
     * @param $body
41
     * @return Message
42
     * @throws QueueException
43
     */
44 5
    public function createMessage(string $queue, $body): Message
45
    {
46 5
        if (is_callable($body)) {
47 1
            throw new QueueException('The closure cannot be serialized.');
48
        }
49
50 4
        if (is_object($body) && method_exists($body, '__toString')) {
51 1
            $body = (string)$body;
52
        }
53
54 4
        if (is_object($body) || is_array($body)) {
55 2
            $body = $this->config->getSerializer()->serialize($body);
56
        }
57
58 4
        return new Message($queue, (string)$body);
59
    }
60
61
    /**
62
     * Redelivered a message to the queue
63
     *
64
     * @param Message $message
65
     * @return Message
66
     */
67 6
    public function makeRedeliveryMessage(Message $message): Message
68
    {
69 6
        $newStatus = ($message->getStatus() === Status::NEW || $message->getStatus() === Status::IN_PROCESS) ?
70 5
            Status::REDELIVERED :
71 6
            $message->getStatus();
72
73 6
        $redeliveredTime = (new DateTimeImmutable('now'))
74 6
            ->modify(sprintf('+%s seconds', $this->config->getRedeliveryTimeInSeconds()));
75
76
        if (
77 6
            $message->getRedeliveredAt() &&
78 6
            $message->getRedeliveredAt()->getTimestamp() > $redeliveredTime->getTimestamp()
79
        ) {
80 1
            $redeliveredTime = $message->getRedeliveredAt();
81
        }
82
83 6
        if ($newStatus === Status::FAILURE) {
84 1
            $redeliveredTime = null;
85
        }
86
87 6
        $redeliveredMessage = (new Message($message->getQueue(), $message->getBody()))
88 6
            ->changePriority($message->getPriority())
89 6
            ->withEvent($message->getEvent())
90 6
            ->changeRedeliveredAt($redeliveredTime);
91
92 6
        return (new MessageHydrator($redeliveredMessage))
93 6
            ->changeStatus($newStatus)
94 6
            ->jobable($message->isJob())
95 6
            ->setError($message->getError())
96 6
            ->changeAttempts($message->getAttempts() + 1)
97 6
            ->getMessage();
98
    }
99
100
    /**
101
     * Dispatch a job
102
     *
103
     * @param string $jobName
104
     * @param array $data
105
     * @throws QueueException
106
     */
107 2
    public function dispatch(string $jobName, array $data): void
108
    {
109 2
        $job = $this->config->getJob($jobName);
110
111 1
        $message = $this->createMessage($job->queue(), $data)
112 1
            ->withEvent($this->config->getJobAlias($jobName));
113
114 1
        $message = (new MessageHydrator($message))->jobable()->getMessage();
115
116 1
        $this->send($message);
117 1
    }
118
119
    /**
120
     * Send message to queue
121
     *
122
     * @param Message $message
123
     */
124 6
    public function send(Message $message): void
125
    {
126 6
        $this->store->send($message);
127 5
    }
128
}
129