2amigos /
mailer-library
| 1 | <?php |
||||
| 2 | |||||
| 3 | namespace Da\Mailer\Queue\Backend\Beanstalkd; |
||||
| 4 | |||||
| 5 | use Da\Mailer\Exception\InvalidCallException; |
||||
| 6 | use Da\Mailer\Queue\Backend\MailJobInterface; |
||||
| 7 | use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface; |
||||
| 8 | use Pheanstalk\Job as PheanstalkJob; |
||||
| 9 | use Pheanstalk\Pheanstalk; |
||||
| 10 | use phpseclib3\Crypt\Random; |
||||
| 11 | |||||
| 12 | class BeanstalkdQueueStoreAdapter implements QueueStoreAdapterInterface |
||||
| 13 | { |
||||
| 14 | /** |
||||
| 15 | * @var string the queue name |
||||
| 16 | */ |
||||
| 17 | protected $queueName; |
||||
| 18 | /** |
||||
| 19 | * @var int the time to run. Defaults to Pheanstalkd::DEFAULT_TTR. |
||||
| 20 | */ |
||||
| 21 | protected $timeToRun; |
||||
| 22 | /** |
||||
| 23 | * @var BeanstalkdQueueStoreConnection |
||||
| 24 | */ |
||||
| 25 | protected $connection; |
||||
| 26 | /** |
||||
| 27 | * @var int Reserves/locks a ready job in a watched tube. A timeout value of 0 will cause the server to immediately |
||||
| 28 | * return either a response or TIMED_OUT. A positive value of timeout will limit the amount of time the client will |
||||
| 29 | * block on the reserve request until a job becomes available. |
||||
| 30 | * |
||||
| 31 | * We highly recommend a non-zero value. Defaults to 5. |
||||
| 32 | */ |
||||
| 33 | protected $reserveTimeout; |
||||
| 34 | /** |
||||
| 35 | * BeanstalkdQueueStoreAdapter constructor. |
||||
| 36 | * |
||||
| 37 | * @param BeanstalkdQueueStoreConnection $connection |
||||
| 38 | * @param string $queueName |
||||
| 39 | * @param int $timeToRun |
||||
| 40 | * @param int $reserveTimeOut |
||||
| 41 | */ |
||||
| 42 | 4 | public function __construct(BeanstalkdQueueStoreConnection $connection, $queueName = 'mail_queue', $timeToRun = Pheanstalk::DEFAULT_TTR, $reserveTimeOut = 5) |
|||
| 43 | { |
||||
| 44 | $this->connection = $connection; |
||||
| 45 | $this->queueName = $queueName; |
||||
| 46 | $this->timeToRun = $timeToRun; |
||||
| 47 | $this->reserveTimeout = $reserveTimeOut; |
||||
| 48 | 4 | $this->init(); |
|||
| 49 | 4 | } |
|||
| 50 | 4 | ||||
| 51 | 4 | /** |
|||
| 52 | 4 | * @return BeanstalkdQueueStoreAdapter |
|||
| 53 | 4 | */ |
|||
| 54 | public function init() |
||||
| 55 | { |
||||
| 56 | $this->getConnection()->connect(); |
||||
| 57 | return $this; |
||||
| 58 | 4 | } |
|||
| 59 | |||||
| 60 | 4 | /** |
|||
| 61 | * @return BeanstalkdQueueStoreConnection |
||||
| 62 | 4 | */ |
|||
| 63 | public function getConnection() |
||||
| 64 | { |
||||
| 65 | return $this->connection; |
||||
| 66 | } |
||||
| 67 | |||||
| 68 | 4 | /** |
|||
| 69 | * @param BeanstalkdMailJob|MailJobInterface $mailJob |
||||
| 70 | 4 | * |
|||
| 71 | * @return \Pheanstalk\Job |
||||
| 72 | */ |
||||
| 73 | public function enqueue(MailJobInterface $mailJob) |
||||
| 74 | { |
||||
| 75 | $timestamp = $mailJob->getTimeToSend(); |
||||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||
| 76 | $payload = $this->createPayload($mailJob); |
||||
| 77 | $delay = (int) max(Pheanstalk::DEFAULT_DELAY, $timestamp - time()); |
||||
| 78 | 3 | return $this->getConnection() |
|||
|
0 ignored issues
–
show
The expression
return $this->getConnect...elay, $this->timeToRun) returns the type Pheanstalk\Job which is incompatible with the return type mandated by Da\Mailer\Queue\Backend\...terInterface::enqueue() of boolean.
In the issue above, the returned value is violating the contract defined by the mentioned interface. Let's take a look at an example: interface HasName {
/** @return string */
public function getName();
}
class Name {
public $name;
}
class User implements HasName {
/** @return string|Name */
public function getName() {
return new Name('foo'); // This is a violation of the ``HasName`` interface
// which only allows a string value to be returned.
}
}
Loading history...
|
|||||
| 79 | ->getInstance() |
||||
| 80 | 3 | ->useTube($this->queueName) |
|||
| 81 | 3 | ->put($payload, Pheanstalk::DEFAULT_PRIORITY, $delay, $this->timeToRun); |
|||
| 82 | 3 | } |
|||
| 83 | |||||
| 84 | 3 | /** |
|||
| 85 | 3 | * @return BeanstalkdMailJob|null |
|||
| 86 | 3 | * @throws \Pheanstalk\Exception\DeadlineSoonException |
|||
| 87 | 3 | */ |
|||
| 88 | public function dequeue() |
||||
| 89 | { |
||||
| 90 | $job = $this->getConnection()->getInstance()->watch($this->queueName)->reserveWithTimeout($this->reserveTimeout); |
||||
| 91 | if ($job instanceof PheanstalkJob) { |
||||
| 92 | $data = json_decode($job->getData(), true); |
||||
| 93 | 3 | return new BeanstalkdMailJob([ |
|||
| 94 | 'id' => $data['id'], |
||||
| 95 | 3 | 'attempt' => $data['attempt'], |
|||
| 96 | 3 | 'message' => $data['message'], |
|||
| 97 | 3 | 'pheanstalkJob' => $job, |
|||
| 98 | ]); |
||||
| 99 | 3 | } |
|||
| 100 | |||||
| 101 | 3 | return null; |
|||
| 102 | 3 | } |
|||
| 103 | 3 | ||||
| 104 | 3 | /** |
|||
| 105 | * @param BeanstalkdMailJob|MailJobInterface $mailJob |
||||
| 106 | 3 | * @return null |
|||
| 107 | */ |
||||
| 108 | public function ack(MailJobInterface $mailJob) |
||||
| 109 | 2 | { |
|||
| 110 | if ($mailJob->isNewRecord()) { |
||||
| 111 | throw new InvalidCallException('BeanstalkdMailJob cannot be a new object to be acknowledged'); |
||||
| 112 | } |
||||
| 113 | |||||
| 114 | $pheanstalk = $this->getConnection()->getInstance()->useTube($this->queueName); |
||||
| 115 | 4 | if ($mailJob->isCompleted()) { |
|||
| 116 | $pheanstalk->delete($mailJob->getPheanstalkJob()); |
||||
|
0 ignored issues
–
show
The method
getPheanstalkJob() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. It seems like you code against a sub-type of Da\Mailer\Queue\Backend\MailJobInterface such as Da\Mailer\Queue\Backend\...talkd\BeanstalkdMailJob.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 117 | 4 | return null; |
|||
| 118 | 1 | } |
|||
| 119 | |||||
| 120 | $timestamp = $mailJob->getTimeToSend(); |
||||
| 121 | 3 | $delay = max(0, $timestamp - time()); |
|||
| 122 | 3 | // add back to the queue as it wasn't completed maybe due to some transitory error |
|||
| 123 | 2 | // could also be failed. |
|||
| 124 | 2 | $pheanstalk->release($mailJob->getPheanstalkJob(), Pheanstalk::DEFAULT_PRIORITY, $delay); |
|||
| 125 | 1 | return null; |
|||
| 126 | 1 | } |
|||
| 127 | |||||
| 128 | /** |
||||
| 129 | * |
||||
| 130 | 1 | * @return bool |
|||
| 131 | */ |
||||
| 132 | 3 | public function isEmpty() |
|||
| 133 | { |
||||
| 134 | $stats = $this->getConnection()->getInstance()->statsTube($this->queueName); |
||||
| 135 | return (int) $stats->current_jobs_delayed === 0 |
||||
|
0 ignored issues
–
show
|
|||||
| 136 | && (int) $stats->current_jobs_urgent === 0 |
||||
|
0 ignored issues
–
show
|
|||||
| 137 | && (int) $stats->current_jobs_ready === 0; |
||||
|
0 ignored issues
–
show
|
|||||
| 138 | 2 | } |
|||
| 139 | |||||
| 140 | 2 | /** |
|||
| 141 | * @param BeanstalkdMailJob|MailJobInterface $mailJob |
||||
| 142 | 2 | * |
|||
| 143 | 2 | * @return string |
|||
| 144 | 2 | */ |
|||
| 145 | protected function createPayload(MailJobInterface $mailJob) |
||||
| 146 | { |
||||
| 147 | return json_encode([ |
||||
| 148 | 'id' => $mailJob->isNewRecord() ? sha1(Random::string(32)) : $mailJob->getId(), |
||||
|
0 ignored issues
–
show
The method
getId() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 149 | 'attempt' => $mailJob->getAttempt(), |
||||
|
0 ignored issues
–
show
The method
getAttempt() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 150 | 'message' => $mailJob->getMessage(), |
||||
| 151 | ]); |
||||
| 152 | 3 | } |
|||
| 153 | } |
||||
| 154 |