1 | <?php |
||
9 | class JobManager extends AbstractJobManager |
||
10 | { |
||
11 | const DEFAULT_RESERVE_TIMEOUT = 5; // seconds |
||
12 | |||
13 | /** @var Pheanstalk */ |
||
14 | protected $beanstalkd; |
||
15 | |||
16 | protected $tube; |
||
17 | |||
18 | protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT; |
||
19 | |||
20 | 2 | public function setBeanstalkd(Pheanstalk $beanstalkd) |
|
21 | { |
||
22 | 2 | $this->beanstalkd = $beanstalkd; |
|
23 | 2 | } |
|
24 | |||
25 | public function setTube($tube) |
||
29 | |||
30 | public function setReserveTimeout($timeout) |
||
34 | |||
35 | 5 | public function save(\Dtc\QueueBundle\Model\Job $job) |
|
36 | { |
||
37 | /** @var Job $job */ |
||
38 | 5 | $message = $job->toMessage(); |
|
39 | 5 | $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()]; |
|
40 | 5 | $method = 'put'; |
|
41 | 5 | if ($this->tube) { |
|
42 | array_unshift($arguments, $this->tube); |
||
43 | $method .= 'InTube'; |
||
44 | } |
||
45 | 5 | $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments); |
|
46 | 5 | $job->setId($jobId); |
|
47 | |||
48 | // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here |
||
49 | 5 | $job->setBeanJob($this->getBeanJob($jobId, $message)); |
|
50 | |||
51 | 5 | return $job; |
|
52 | } |
||
53 | |||
54 | 5 | public function getBeanJob($jobId, $data) |
|
58 | |||
59 | /** |
||
60 | * @param string|null $workerName |
||
61 | * @param string|null $methodName |
||
62 | * @param bool $prioritize |
||
63 | * @param int|string|null $runId |
||
64 | * |
||
65 | * @return Job|null |
||
66 | * |
||
67 | * @throws \Exception |
||
68 | */ |
||
69 | 6 | public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
70 | { |
||
71 | 6 | if (null !== $methodName) { |
|
72 | throw new \Exception('Unsupported'); |
||
73 | } |
||
74 | 6 | if (null !== $workerName) { |
|
75 | 1 | throw new \Exception('Unsupported'); |
|
76 | } |
||
77 | |||
78 | 5 | $beanstalkd = $this->beanstalkd; |
|
79 | 5 | if ($this->tube) { |
|
80 | $beanstalkd = $this->beanstalkd->watch($this->tube); |
||
81 | } |
||
82 | |||
83 | do { |
||
84 | 5 | $expiredJob = false; |
|
85 | 5 | $job = $this->findJob($beanstalkd, $expiredJob, $runId); |
|
86 | 5 | } while ($expiredJob); |
|
87 | |||
88 | 5 | return $job; |
|
89 | } |
||
90 | |||
91 | /** |
||
92 | * @param Pheanstalk $beanstalkd |
||
93 | * @param bool $expiredJob |
||
94 | * @param int|string|null $runId |
||
95 | * |
||
96 | * @return Job|null |
||
97 | */ |
||
98 | 5 | protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId) |
|
99 | { |
||
100 | 5 | $beanJob = $beanstalkd->reserve($this->reserveTimeout); |
|
101 | 5 | if ($beanJob) { |
|
102 | 4 | $job = new Job(); |
|
103 | 4 | $job->fromMessage($beanJob->getData()); |
|
104 | 4 | $job->setId($beanJob->getId()); |
|
105 | 4 | $job->setRunId($runId); |
|
106 | |||
107 | 4 | if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) { |
|
108 | $expiredJob = true; |
||
109 | $beanstalkd->delete($beanJob); |
||
110 | |||
111 | return null; |
||
112 | } |
||
113 | 4 | $job->setBeanJob($beanJob); |
|
114 | |||
115 | 4 | return $job; |
|
116 | } |
||
117 | |||
118 | 3 | return null; |
|
119 | } |
||
120 | |||
121 | 2 | public function deleteJob(\Dtc\QueueBundle\Model\Job $job) |
|
126 | |||
127 | // Save History get called upon completion of the job |
||
128 | 1 | public function saveHistory(\Dtc\QueueBundle\Model\Job $job) |
|
129 | { |
||
130 | 1 | if (BaseJob::STATUS_SUCCESS === $job->getStatus()) { |
|
131 | 1 | $this->beanstalkd |
|
132 | 1 | ->delete($job); |
|
133 | 1 | } |
|
134 | 1 | } |
|
135 | |||
136 | 1 | public function getStats() |
|
140 | } |
||
141 |