1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Beanstalkd; |
4
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Manager\RetryableJobManager; |
6
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
7
|
|
|
use Dtc\QueueBundle\Model\Job as BaseJob; |
8
|
|
|
use Dtc\QueueBundle\Exception\UnsupportedException; |
9
|
|
|
use Dtc\QueueBundle\Util\Util; |
10
|
|
|
use Pheanstalk\Pheanstalk; |
11
|
|
|
|
12
|
|
|
class JobManager extends RetryableJobManager |
13
|
|
|
{ |
14
|
|
|
const DEFAULT_RESERVE_TIMEOUT = 5; // seconds |
15
|
|
|
|
16
|
|
|
/** @var Pheanstalk */ |
17
|
|
|
protected $beanstalkd; |
18
|
|
|
|
19
|
|
|
protected $tube; |
20
|
|
|
|
21
|
|
|
protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT; |
22
|
|
|
|
23
|
2 |
|
public function setBeanstalkd(Pheanstalk $beanstalkd) |
24
|
|
|
{ |
25
|
2 |
|
$this->beanstalkd = $beanstalkd; |
26
|
2 |
|
} |
27
|
|
|
|
28
|
|
|
public function setTube($tube) |
29
|
|
|
{ |
30
|
|
|
$this->tube = $tube; |
31
|
|
|
} |
32
|
|
|
|
33
|
|
|
public function setReserveTimeout($timeout) |
34
|
|
|
{ |
35
|
|
|
$this->reserveTimeout = $timeout; |
36
|
|
|
} |
37
|
|
|
|
38
|
7 |
|
public function retryableSave(RetryableJob $job) |
39
|
|
|
{ |
40
|
7 |
|
if (!$job instanceof Job) { |
41
|
|
|
throw new \InvalidArgumentException('$job must be of type: '.Job::class); |
42
|
|
|
} |
43
|
|
|
|
44
|
7 |
|
return $this->putJob($job); |
45
|
|
|
} |
46
|
|
|
|
47
|
7 |
|
protected function putJob(Job $job) |
48
|
|
|
{ |
49
|
|
|
/** @var Job $job */ |
50
|
7 |
|
$message = $job->toMessage(); |
51
|
7 |
|
$arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()]; |
52
|
7 |
|
$method = 'put'; |
53
|
7 |
|
if ($this->tube) { |
54
|
|
|
array_unshift($arguments, $this->tube); |
55
|
|
|
$method .= 'InTube'; |
56
|
|
|
} |
57
|
7 |
|
$jobId = call_user_func_array([$this->beanstalkd, $method], $arguments); |
58
|
7 |
|
$job->setId($jobId); |
59
|
|
|
|
60
|
|
|
// Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here |
61
|
7 |
|
$job->setBeanJob($this->getBeanJob($jobId, $message)); |
62
|
|
|
|
63
|
7 |
|
return $job; |
64
|
|
|
} |
65
|
|
|
|
66
|
3 |
View Code Duplication |
protected function resetJob(RetryableJob $job) |
|
|
|
|
67
|
|
|
{ |
68
|
3 |
|
if (!$job instanceof Job) { |
69
|
|
|
throw new \InvalidArgumentException('$job must be instance of '.Job::class); |
70
|
|
|
} |
71
|
2 |
|
$job->setStatus(BaseJob::STATUS_NEW); |
72
|
2 |
|
$job->setMessage(null); |
73
|
2 |
|
$job->setStartedAt(null); |
74
|
2 |
|
$job->setRetries($job->getRetries() + 1); |
75
|
2 |
|
$job->setUpdatedAt(Util::getMicrotimeDateTime()); |
76
|
2 |
|
$this->putJob($job); |
77
|
|
|
|
78
|
2 |
|
return true; |
79
|
|
|
} |
80
|
|
|
|
81
|
7 |
|
public function getBeanJob($jobId, $data) |
82
|
|
|
{ |
83
|
7 |
|
return new \Pheanstalk\Job($jobId, $data); |
84
|
|
|
} |
85
|
|
|
|
86
|
|
|
/** |
87
|
|
|
* @param string|null $workerName |
88
|
|
|
* @param string|null $methodName |
89
|
|
|
* @param bool $prioritize |
90
|
|
|
* @param int|string|null $runId |
91
|
|
|
* |
92
|
|
|
* @return Job|null |
93
|
|
|
* |
94
|
|
|
* @throws UnsupportedException |
95
|
|
|
*/ |
96
|
6 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
97
|
|
|
{ |
98
|
6 |
|
if (null !== $methodName) { |
99
|
|
|
throw new UnsupportedException('Unsupported'); |
100
|
|
|
} |
101
|
6 |
|
if (null !== $workerName) { |
102
|
1 |
|
throw new UnsupportedException('Unsupported'); |
103
|
|
|
} |
104
|
|
|
|
105
|
5 |
|
$beanstalkd = $this->beanstalkd; |
106
|
5 |
|
if ($this->tube) { |
107
|
|
|
$beanstalkd = $this->beanstalkd->watch($this->tube); |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
do { |
111
|
5 |
|
$expiredJob = false; |
112
|
5 |
|
$job = $this->findJob($beanstalkd, $expiredJob, $runId); |
113
|
5 |
|
} while ($expiredJob); |
114
|
|
|
|
115
|
5 |
|
return $job; |
116
|
|
|
} |
117
|
|
|
|
118
|
|
|
/** |
119
|
|
|
* @param Pheanstalk $beanstalkd |
120
|
|
|
* @param bool $expiredJob |
121
|
|
|
* @param int|string|null $runId |
122
|
|
|
* |
123
|
|
|
* @return Job|null |
124
|
|
|
*/ |
125
|
5 |
|
protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId) |
126
|
|
|
{ |
127
|
5 |
|
$beanJob = $beanstalkd->reserve($this->reserveTimeout); |
128
|
5 |
|
if ($beanJob) { |
129
|
4 |
|
$job = new Job(); |
130
|
4 |
|
$job->fromMessage($beanJob->getData()); |
131
|
4 |
|
$job->setId($beanJob->getId()); |
132
|
4 |
|
$job->setRunId($runId); |
133
|
|
|
|
134
|
4 |
|
if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) { |
135
|
|
|
$expiredJob = true; |
136
|
|
|
$beanstalkd->delete($beanJob); |
137
|
|
|
|
138
|
|
|
return null; |
139
|
|
|
} |
140
|
4 |
|
$job->setBeanJob($beanJob); |
141
|
|
|
|
142
|
4 |
|
return $job; |
143
|
|
|
} |
144
|
|
|
|
145
|
3 |
|
return null; |
146
|
|
|
} |
147
|
|
|
|
148
|
2 |
|
public function deleteJob(\Dtc\QueueBundle\Model\Job $job) |
149
|
|
|
{ |
150
|
2 |
|
$this->beanstalkd |
151
|
2 |
|
->delete($job); |
152
|
2 |
|
} |
153
|
|
|
|
154
|
|
|
// Save History get called upon completion of the job |
155
|
3 |
|
public function retryableSaveHistory(RetryableJob $job, $retry) |
156
|
|
|
{ |
157
|
3 |
|
if (!$retry) { |
158
|
3 |
|
$this->beanstalkd |
159
|
3 |
|
->delete($job); |
160
|
3 |
|
} |
161
|
3 |
|
} |
162
|
|
|
|
163
|
1 |
|
public function getWaitingJobCount($workerName = null, $methodName = null) |
164
|
|
|
{ |
165
|
1 |
|
$stats = $this->getStats(); |
166
|
|
|
|
167
|
|
|
return isset($stats['current-jobs-ready']) ? $stats['current-jobs-ready'] : 0; |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
public function getStats() |
171
|
|
|
{ |
172
|
|
|
return $this->beanstalkd->stats(); |
173
|
|
|
} |
174
|
|
|
} |
175
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.