1 | <?php |
||
9 | class JobManager extends AbstractJobManager |
||
10 | { |
||
11 | const DEFAULT_RESERVE_TIMEOUT = 5; // seconds |
||
12 | |||
13 | /** @var Pheanstalk */ |
||
14 | protected $beanstalkd; |
||
15 | |||
16 | protected $tube; |
||
17 | |||
18 | protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT; |
||
19 | |||
20 | public function setBeanstalkd(Pheanstalk $beanstalkd) |
||
24 | |||
25 | public function setTube($tube) |
||
29 | |||
30 | public function setReserveTimeout($timeout) |
||
34 | |||
35 | 5 | public function save(\Dtc\QueueBundle\Model\Job $job) |
|
36 | { |
||
37 | /** @var Job $job */ |
||
38 | 5 | $message = $job->toMessage(); |
|
39 | 5 | $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()]; |
|
40 | 5 | $method = 'put'; |
|
41 | 5 | if ($this->tube) { |
|
42 | array_unshift($arguments, $this->tube); |
||
43 | $method .= 'InTube'; |
||
44 | } |
||
45 | 5 | $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments); |
|
46 | 5 | $job->setId($jobId); |
|
47 | |||
48 | // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here |
||
49 | 5 | $job->setBeanJob($this->getBeanJob($jobId, $message)); |
|
50 | |||
51 | 5 | return $job; |
|
52 | } |
||
53 | |||
54 | 5 | public function getBeanJob($jobId, $data) |
|
58 | |||
59 | 5 | public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
60 | { |
||
61 | 5 | if ($methodName) { |
|
62 | throw new \Exception('Unsupported'); |
||
63 | } |
||
64 | |||
65 | 5 | $beanstalkd = $this->beanstalkd; |
|
66 | 5 | if ($this->tube) { |
|
67 | $beanstalkd = $this->beanstalkd->watch($this->tube); |
||
68 | 1 | } |
|
69 | |||
70 | 5 | $expiredJob = false; |
|
71 | |||
72 | do { |
||
73 | 5 | $beanJob = $beanstalkd->reserve($this->reserveTimeout); |
|
74 | 5 | if ($beanJob) { |
|
75 | 4 | $job = new Job(); |
|
76 | 4 | $job->fromMessage($beanJob->getData()); |
|
77 | 4 | $job->setId($beanJob->getId()); |
|
78 | 4 | $job->setRunId($runId); |
|
79 | |||
80 | 4 | if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) { |
|
81 | $expiredJob = true; |
||
82 | $this->beanstalkd->delete($beanJob); |
||
83 | continue; |
||
84 | } |
||
85 | 4 | $job->setBeanJob($beanJob); |
|
86 | |||
87 | 4 | return $job; |
|
88 | } |
||
89 | 2 | } while ($expiredJob); |
|
90 | 2 | } |
|
91 | |||
92 | 2 | public function deleteJob(\Dtc\QueueBundle\Model\Job $job) |
|
97 | |||
98 | // Save History get called upon completion of the job |
||
99 | public function saveHistory(\Dtc\QueueBundle\Model\Job $job) |
||
100 | { |
||
101 | if (BaseJob::STATUS_SUCCESS === $job->getStatus()) { |
||
102 | $this->beanstalkd |
||
103 | ->delete($job); |
||
104 | } |
||
105 | } |
||
106 | |||
107 | public function getStats() |
||
111 | } |
||
112 |