RabbitMqQueueStoreAdapter::dequeue()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 19
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 10
c 1
b 0
f 0
nc 2
nop 0
dl 0
loc 19
rs 9.9332
1
<?php
2
3
namespace Da\Mailer\Queue\Backend\RabbitMq;
4
5
use Da\Mailer\Queue\Backend\MailJobInterface;
6
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use phpseclib3\Crypt\Random;
10
11
class RabbitMqQueueStoreAdapter implements QueueStoreAdapterInterface
12
{
13
    /**
14
     * @var string
15
     */
16
    private $queueName;
17
18
    /**
19
     * @var int
20
     */
21
    private $expireTime;
22
23
    /**
24
     * @var RabbitMqQueueConnection
25
     */
26
    protected $connection;
27
28
    /**
29
     * @param RabbitMqQueueConnection $connection
30
     * @param $queueName
31
     * @param $expireTime
32
     */
33
    public function __construct(RabbitMqQueueConnection $connection, $queueName = 'mail_queue', $expireTime = 60)
34
    {
35
        $this->connection = $connection;
36
        $this->expireTime = $expireTime;
37
        $this->queueName = $queueName;
38
39
        $this->init();
40
    }
41
42
    /**
43
     * @return
44
     */
45
    public function init()
46
    {
47
        $this->getConnection()
48
            ->connect();
49
50
        return $this;
51
    }
52
53
    /**
54
     * @inheritDoc
55
     */
56
    public function getConnection()
57
    {
58
        return $this->connection;
59
    }
60
61
    /**
62
     * @var RabbitMqJob|MailJobInterface $mailJob
63
     *
64
     * @return bool;
65
     */
66
    public function enqueue(MailJobInterface $mailJob)
67
    {
68
        try {
69
            /** @var AMQPChannel $chanel */
70
            $chanel = $this->getConnection()->getInstance();
71
            $chanel->queue_declare($this->queueName, false, false, false, false);
72
            $message = new AMQPMessage($this->createPayload($mailJob));
73
            $chanel->basic_publish($message, '', $this->queueName);
74
75
            return true;
76
        } catch (\Exception $exception) {
77
            return false;
78
        }
79
    }
80
81
    /**
82
     * @inheritDoc
83
     * @return RabbitMqJob|MailJobInterface|null
84
     */
85
    public function dequeue()
86
    {
87
        if ($this->isEmpty()) {
88
            return null;
89
        }
90
91
        /** @var AMQPChannel $chanel */
92
        $chanel = $this->getConnection()->getInstance();
93
94
        /** @var AMQPMessage $message */
95
        $message = $chanel->basic_get($this->queueName);
96
97
        $data = json_decode($message->body, true);
98
99
        return new RabbitMqJob([
100
            'id' => $data['id'],
101
            'message' => $data['message'],
102
            'attempt' => $data['attempt'],
103
            'deliveryTag' => $message->delivery_info['delivery_tag'],
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

103
            'deliveryTag' => /** @scrutinizer ignore-deprecated */ $message->delivery_info['delivery_tag'],
Loading history...
104
        ]);
105
    }
106
107
    /**
108
     * @param RabbitMqJob $mailJob
109
     */
110
    public function ack(MailJobInterface $mailJob)
111
    {
112
        /** @var AMQPChannel $chanel */
113
        $chanel = $this->getConnection()->getInstance();
114
        if ($mailJob->isCompleted()) {
115
            $chanel->basic_ack($mailJob->getDeliveryTag(), false);
0 ignored issues
show
Bug introduced by
The method getDeliveryTag() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. It seems like you code against a sub-type of Da\Mailer\Queue\Backend\MailJobInterface such as Da\Mailer\Queue\Backend\RabbitMq\RabbitMqJob. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

115
            $chanel->basic_ack($mailJob->/** @scrutinizer ignore-call */ getDeliveryTag(), false);
Loading history...
116
            return;
117
        }
118
119
        $chanel->basic_nack($mailJob->getDeliveryTag(), false, true);
120
    }
121
122
    /**
123
     * @inheritDoc
124
     */
125
    public function isEmpty()
126
    {
127
        /** @var AMQPChannel $chanel */
128
        $chanel = $this->getConnection()->getInstance();
129
        $queueProperties = $chanel->queue_declare($this->queueName, false, false, false, false);
130
131
        return is_array($queueProperties) && $queueProperties[1] === 0;
132
    }
133
134
    /**
135
     * @param MailJobInterface $mailJob
136
     * @return false|string
137
     */
138
    protected function createPayload(MailJobInterface $mailJob)
139
    {
140
        return json_encode([
141
            'id' => $mailJob->isNewRecord() ? sha1(Random::string(32)) : $mailJob->getId(),
0 ignored issues
show
Bug introduced by
The method getId() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

141
            'id' => $mailJob->isNewRecord() ? sha1(Random::string(32)) : $mailJob->/** @scrutinizer ignore-call */ getId(),
Loading history...
142
            'attempt' => $mailJob->getAttempt(),
0 ignored issues
show
Bug introduced by
The method getAttempt() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

142
            'attempt' => $mailJob->/** @scrutinizer ignore-call */ getAttempt(),
Loading history...
143
            'message' => $mailJob->getMessage(),
144
            'delivery_tag' => null,
145
        ]);
146
    }
147
}
148