Passed
Push — master ( ce0bdd...5bd05b )
by
unknown
51s queued 12s
created

Producer::dispatch()   B

Complexity

Conditions 7
Paths 4

Size

Total Lines 23
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 7

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 7
eloc 12
c 2
b 0
f 0
nc 4
nop 2
dl 0
loc 23
ccs 12
cts 12
cp 1
crap 7
rs 8.8333
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Simple\Queue;
6
7
use Throwable;
8
use Ramsey\Uuid\Uuid;
9
use RuntimeException;
10
use Doctrine\DBAL\Connection;
11
use InvalidArgumentException;
12
use Doctrine\DBAL\Types\Types;
13
14
/**
15
 * Class Producer
16
 * @package Simple\Queue
17
 */
18
class Producer
19
{
20
    /** @var Connection */
21
    private Connection $connection;
22
23
    /** @var Config|null */
24
    private ?Config $config = null;
25
26
    /**
27
     * Producer constructor.
28
     * @param Connection $connection
29
     * @param Config|null $config
30
     */
31 14
    public function __construct(Connection $connection, ?Config $config = null)
32
    {
33 14
        $this->connection = $connection;
34 14
        $this->config = $config ?: Config::getDefault();
35 14
    }
36
37
    /**
38
     * @param string $queue
39
     * @param $body
40
     * @return Message
41
     */
42 5
    public function createMessage(string $queue, $body): Message
43
    {
44 5
        if (is_callable($body)) {
45 1
            throw new InvalidArgumentException('The closure cannot be serialized.');
46
        }
47
48 4
        if (is_object($body) && method_exists($body, '__toString')) {
49 1
            $body = (string)$body;
50
        }
51
52 4
        if (is_object($body) || is_array($body)) {
53 2
            $body = $this->config->getSerializer()->serialize($body);
0 ignored issues
show
Bug introduced by
The method getSerializer() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

53
            $body = $this->config->/** @scrutinizer ignore-call */ getSerializer()->serialize($body);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
54
        }
55
56 4
        return new Message($queue, (string)$body);
57
    }
58
59
    /**
60
     * @param string $jobName
61
     * @param array $data
62
     */
63 4
    public function dispatch(string $jobName, array $data): void
64
    {
65 4
        if ($this->config->hasJob($jobName) && (class_exists($this->config->getJob($jobName)) === false)) {
0 ignored issues
show
Bug introduced by
It seems like $this->config->getJob($jobName) can also be of type null; however, parameter $class of class_exists() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

65
        if ($this->config->hasJob($jobName) && (class_exists(/** @scrutinizer ignore-type */ $this->config->getJob($jobName)) === false)) {
Loading history...
66 1
            throw new InvalidArgumentException(sprintf(
67 1
                'A non-existent class "%s" is declared in the config.',
68
                $jobName
69
            ));
70
        }
71
72 3
        if (($this->config->hasJob($jobName) === false) && (class_exists($jobName) === false)) {
73 1
            throw new InvalidArgumentException(sprintf('Job class "%s" doesn\'t exist.', $jobName));
74
        }
75
76 2
        if (class_exists($jobName) && (is_a($jobName, Job::class) === false)) {
77 1
            throw new InvalidArgumentException(sprintf('Job class "%s" doesn\'t extends "%s".', $jobName, Job::class));
0 ignored issues
show
Bug introduced by
$jobName of type Simple\Queue\Job is incompatible with the type double|integer|string expected by parameter $values of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

77
            throw new InvalidArgumentException(sprintf('Job class "%s" doesn\'t extends "%s".', /** @scrutinizer ignore-type */ $jobName, Job::class));
Loading history...
78
        }
79
80 1
        $message = $this->createMessage('default', $data); // TODO: change default queue
81 1
        $message->setEvent($jobName);
82
83 1
        $message = (new MessageHydrator($message))->jobable()->getMessage();
84
85 1
        $this->send($message);
86 1
    }
87
88
    /**
89
     * @param Message $message
90
     */
91 5
    public function send(Message $message): void
92
    {
93
        $dataMessage = [
94 5
            'id' => Uuid::uuid4()->toString(),
95 5
            'status' => $message->getStatus(),
96 5
            'created_at' => $message->getCreatedAt(),
97 5
            'redelivered_at' => $message->getRedeliveredAt(),
98 5
            'attempts' => $message->getAttempts(),
99 5
            'queue' => $message->getQueue(),
100 5
            'event' => $message->getEvent(),
101 5
            'is_job' => $message->isJob(),
102 5
            'body' => $message->getBody(),
103 5
            'priority' => $message->getPriority(),
104 5
            'error' => $message->getError(),
105 5
            'exact_time' => $message->getExactTime(),
106
        ];
107
        try {
108 5
            $rowsAffected = $this->connection->insert(QueueTableCreator::getTableName(), $dataMessage, [
109 5
                'id' => Types::GUID,
110 5
                'status' => Types::STRING,
111 5
                'created_at' => Types::DATETIME_IMMUTABLE,
112 5
                'redelivered_at' => Types::DATETIME_IMMUTABLE,
113 5
                'attempts' => Types::SMALLINT,
114 5
                'queue' => Types::STRING,
115 5
                'event' => Types::STRING,
116 5
                'is_job' => Types::BOOLEAN,
117 5
                'body' => Types::TEXT,
118 5
                'priority' => Types::SMALLINT,
119 5
                'error' => Types::TEXT,
120 5
                'exact_time' => Types::BIGINT,
121
            ]);
122 5
            if ($rowsAffected !== 1) {
123 5
                throw new RuntimeException('The message was not enqueued. Dbal did not confirm that the record is inserted.');
124
            }
125 1
        } catch (Throwable $e) {
126 1
            throw new RuntimeException('The transport fails to send the message due to some internal error.', 0, $e);
127
        }
128 4
    }
129
}
130