BeanstalkdQueueStoreAdapter::ack()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 10
c 0
b 0
f 0
nc 3
nop 1
dl 0
loc 18
ccs 9
cts 9
cp 1
crap 3
rs 9.9332
1
<?php
2
3
namespace Da\Mailer\Queue\Backend\Beanstalkd;
4
5
use Da\Mailer\Exception\InvalidCallException;
6
use Da\Mailer\Queue\Backend\MailJobInterface;
7
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
8
use Pheanstalk\Job as PheanstalkJob;
9
use Pheanstalk\Pheanstalk;
10
use phpseclib3\Crypt\Random;
11
12
class BeanstalkdQueueStoreAdapter implements QueueStoreAdapterInterface
13
{
14
    /**
15
     * @var string the queue name
16
     */
17
    protected $queueName;
18
/**
19
     * @var int the time to run. Defaults to Pheanstalkd::DEFAULT_TTR.
20
     */
21
    protected $timeToRun;
22
/**
23
     * @var BeanstalkdQueueStoreConnection
24
     */
25
    protected $connection;
26
/**
27
     * @var int Reserves/locks a ready job in a watched tube. A timeout value of 0 will cause the server to immediately
28
     * return either a response or TIMED_OUT.  A positive value of timeout will limit the amount of time the client will
29
     * block on the reserve request until a job becomes available.
30
     *
31
     * We highly recommend a non-zero value. Defaults to 5.
32
     */
33
    protected $reserveTimeout;
34
/**
35
     * BeanstalkdQueueStoreAdapter constructor.
36
     *
37
     * @param BeanstalkdQueueStoreConnection $connection
38
     * @param string $queueName
39
     * @param int $timeToRun
40
     * @param int $reserveTimeOut
41
     */
42 4
    public function __construct(BeanstalkdQueueStoreConnection $connection, $queueName = 'mail_queue', $timeToRun = Pheanstalk::DEFAULT_TTR, $reserveTimeOut = 5)
43
    {
44
        $this->connection = $connection;
45
        $this->queueName = $queueName;
46
        $this->timeToRun = $timeToRun;
47
        $this->reserveTimeout = $reserveTimeOut;
48 4
        $this->init();
49 4
    }
50 4
51 4
    /**
52 4
     * @return BeanstalkdQueueStoreAdapter
53 4
     */
54
    public function init()
55
    {
56
        $this->getConnection()->connect();
57
        return $this;
58 4
    }
59
60 4
    /**
61
     * @return BeanstalkdQueueStoreConnection
62 4
     */
63
    public function getConnection()
64
    {
65
        return $this->connection;
66
    }
67
68 4
    /**
69
     * @param BeanstalkdMailJob|MailJobInterface $mailJob
70 4
     *
71
     * @return \Pheanstalk\Job
72
     */
73
    public function enqueue(MailJobInterface $mailJob)
74
    {
75
        $timestamp = $mailJob->getTimeToSend();
0 ignored issues
show
Bug introduced by
The method getTimeToSend() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. It seems like you code against a sub-type of Da\Mailer\Queue\Backend\MailJobInterface such as Da\Mailer\Queue\Backend\Pdo\PdoMailJob or Da\Mailer\Queue\Backend\...talkd\BeanstalkdMailJob or Da\Mailer\Queue\Backend\Redis\RedisMailJob. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

75
        /** @scrutinizer ignore-call */ 
76
        $timestamp = $mailJob->getTimeToSend();
Loading history...
76
        $payload = $this->createPayload($mailJob);
77
        $delay = (int) max(Pheanstalk::DEFAULT_DELAY, $timestamp - time());
78 3
        return $this->getConnection()
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->getConnect...elay, $this->timeToRun) returns the type Pheanstalk\Job which is incompatible with the return type mandated by Da\Mailer\Queue\Backend\...terInterface::enqueue() of boolean.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
79
            ->getInstance()
80 3
            ->useTube($this->queueName)
81 3
            ->put($payload, Pheanstalk::DEFAULT_PRIORITY, $delay, $this->timeToRun);
82 3
    }
83
84 3
    /**
85 3
     * @return BeanstalkdMailJob|null
86 3
     * @throws \Pheanstalk\Exception\DeadlineSoonException
87 3
     */
88
    public function dequeue()
89
    {
90
        $job = $this->getConnection()->getInstance()->watch($this->queueName)->reserveWithTimeout($this->reserveTimeout);
91
        if ($job instanceof PheanstalkJob) {
92
            $data = json_decode($job->getData(), true);
93 3
            return new BeanstalkdMailJob([
94
                    'id' => $data['id'],
95 3
                    'attempt' => $data['attempt'],
96 3
                    'message' => $data['message'],
97 3
                    'pheanstalkJob' => $job,
98
                ]);
99 3
        }
100
101 3
        return null;
102 3
    }
103 3
104 3
    /**
105
     * @param BeanstalkdMailJob|MailJobInterface $mailJob
106 3
     * @return null
107
     */
108
    public function ack(MailJobInterface $mailJob)
109 2
    {
110
        if ($mailJob->isNewRecord()) {
111
            throw new InvalidCallException('BeanstalkdMailJob cannot be a new object to be acknowledged');
112
        }
113
114
        $pheanstalk = $this->getConnection()->getInstance()->useTube($this->queueName);
115 4
        if ($mailJob->isCompleted()) {
116
            $pheanstalk->delete($mailJob->getPheanstalkJob());
0 ignored issues
show
Bug introduced by
The method getPheanstalkJob() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. It seems like you code against a sub-type of Da\Mailer\Queue\Backend\MailJobInterface such as Da\Mailer\Queue\Backend\...talkd\BeanstalkdMailJob. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

116
            $pheanstalk->delete($mailJob->/** @scrutinizer ignore-call */ getPheanstalkJob());
Loading history...
117 4
            return null;
118 1
        }
119
120
        $timestamp = $mailJob->getTimeToSend();
121 3
        $delay = max(0, $timestamp - time());
122 3
// add back to the queue as it wasn't completed maybe due to some transitory error
123 2
        // could also be failed.
124 2
        $pheanstalk->release($mailJob->getPheanstalkJob(), Pheanstalk::DEFAULT_PRIORITY, $delay);
125 1
        return null;
126 1
    }
127
128
    /**
129
     *
130 1
     * @return bool
131
     */
132 3
    public function isEmpty()
133
    {
134
        $stats = $this->getConnection()->getInstance()->statsTube($this->queueName);
135
        return (int) $stats->current_jobs_delayed === 0
0 ignored issues
show
Bug introduced by
Accessing current_jobs_delayed on the interface Pheanstalk\Contract\ResponseInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
136
        && (int) $stats->current_jobs_urgent === 0
0 ignored issues
show
Bug introduced by
Accessing current_jobs_urgent on the interface Pheanstalk\Contract\ResponseInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
137
        && (int) $stats->current_jobs_ready === 0;
0 ignored issues
show
Bug introduced by
Accessing current_jobs_ready on the interface Pheanstalk\Contract\ResponseInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
138 2
    }
139
140 2
    /**
141
     * @param BeanstalkdMailJob|MailJobInterface $mailJob
142 2
     *
143 2
     * @return string
144 2
     */
145
    protected function createPayload(MailJobInterface $mailJob)
146
    {
147
        return json_encode([
148
                'id' => $mailJob->isNewRecord() ? sha1(Random::string(32)) : $mailJob->getId(),
0 ignored issues
show
Bug introduced by
The method getId() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

148
                'id' => $mailJob->isNewRecord() ? sha1(Random::string(32)) : $mailJob->/** @scrutinizer ignore-call */ getId(),
Loading history...
149
                'attempt' => $mailJob->getAttempt(),
0 ignored issues
show
Bug introduced by
The method getAttempt() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

149
                'attempt' => $mailJob->/** @scrutinizer ignore-call */ getAttempt(),
Loading history...
150
                'message' => $mailJob->getMessage(),
151
            ]);
152 3
    }
153
}
154