1 | <?php |
||
9 | class JobManager extends AbstractJobManager |
||
10 | { |
||
11 | const DEFAULT_RESERVE_TIMEOUT = 5; // seconds |
||
12 | |||
13 | /** @var Pheanstalk */ |
||
14 | protected $beanstalkd; |
||
15 | |||
16 | protected $tube; |
||
17 | |||
18 | protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT; |
||
19 | |||
20 | 2 | public function setBeanstalkd(Pheanstalk $beanstalkd) |
|
21 | { |
||
22 | 2 | $this->beanstalkd = $beanstalkd; |
|
23 | 2 | } |
|
24 | |||
25 | public function setTube($tube) |
||
29 | |||
30 | public function setReserveTimeout($timeout) |
||
34 | |||
35 | 5 | public function save(\Dtc\QueueBundle\Model\Job $job) |
|
53 | |||
54 | 5 | public function getBeanJob($jobId, $data) |
|
58 | |||
59 | /** |
||
60 | * @param string|null $workerName |
||
61 | * @param string|null $methodName |
||
62 | * @param bool $prioritize |
||
63 | * @param int|string|null $runId |
||
64 | * |
||
65 | * @return Job|null |
||
66 | * |
||
67 | * @throws \Exception |
||
68 | */ |
||
69 | 6 | public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
90 | |||
91 | /** |
||
92 | * @param Pheanstalk $beanstalkd |
||
93 | * @param bool $expiredJob |
||
94 | * @param int|string|null $runId |
||
95 | * |
||
96 | * @return Job|null |
||
97 | */ |
||
98 | 5 | protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId) |
|
99 | { |
||
100 | 5 | $beanJob = $beanstalkd->reserve($this->reserveTimeout); |
|
101 | 5 | if ($beanJob) { |
|
102 | 4 | $job = new Job(); |
|
103 | 4 | $job->fromMessage($beanJob->getData()); |
|
104 | 4 | $job->setId($beanJob->getId()); |
|
105 | 4 | $job->setRunId($runId); |
|
106 | |||
107 | 4 | if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) { |
|
108 | $expiredJob = true; |
||
109 | $beanstalkd->delete($beanJob); |
||
110 | |||
111 | return null; |
||
112 | } |
||
113 | 4 | $job->setBeanJob($beanJob); |
|
114 | |||
115 | 4 | return $job; |
|
116 | } |
||
117 | |||
118 | 3 | return null; |
|
119 | } |
||
120 | |||
121 | 2 | public function deleteJob(\Dtc\QueueBundle\Model\Job $job) |
|
126 | |||
127 | // Save History get called upon completion of the job |
||
128 | 1 | public function saveHistory(\Dtc\QueueBundle\Model\Job $job) |
|
135 | |||
136 | 1 | public function getStats() |
|
140 | } |
||
141 |