1 | <?php |
||
9 | class JobManager extends AbstractJobManager |
||
10 | { |
||
11 | const DEFAULT_RESERVE_TIMEOUT = 5; // seconds |
||
12 | |||
13 | /** @var Pheanstalk */ |
||
14 | protected $beanstalkd; |
||
15 | |||
16 | protected $tube; |
||
17 | |||
18 | protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT; |
||
19 | |||
20 | public function setBeanstalkd(Pheanstalk $beanstalkd) |
||
24 | |||
25 | public function setTube($tube) |
||
29 | |||
30 | public function setReserveTimeout($timeout) |
||
34 | |||
35 | public function save(\Dtc\QueueBundle\Model\Job $job) |
||
36 | { |
||
37 | /** @var Job $job */ |
||
38 | $message = $job->toMessage(); |
||
39 | $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()]; |
||
40 | $method = 'put'; |
||
41 | if ($this->tube) { |
||
42 | array_unshift($arguments, $this->tube); |
||
43 | $method .= 'InTube'; |
||
44 | } |
||
45 | $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments); |
||
46 | $job->setId($jobId); |
||
47 | |||
48 | // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here |
||
49 | $job->setBeanJob($this->getBeanJob($jobId, $message)); |
||
50 | |||
51 | return $job; |
||
52 | } |
||
53 | |||
54 | public function getBeanJob($jobId, $data) |
||
55 | { |
||
56 | return new \Pheanstalk\Job($jobId, $data); |
||
57 | } |
||
58 | |||
59 | public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
||
60 | { |
||
61 | if ($methodName) { |
||
62 | throw new \Exception('Unsupported'); |
||
63 | } |
||
64 | |||
65 | $beanstalkd = $this->beanstalkd; |
||
66 | if ($this->tube) { |
||
67 | $beanstalkd = $this->beanstalkd->watch($this->tube); |
||
68 | } |
||
69 | |||
70 | $expiredJob = false; |
||
71 | |||
72 | do { |
||
73 | $beanJob = $beanstalkd->reserve($this->reserveTimeout); |
||
74 | if ($beanJob) { |
||
75 | $job = new Job(); |
||
76 | $job->fromMessage($beanJob->getData()); |
||
77 | $job->setId($beanJob->getId()); |
||
78 | $job->setRunId($runId); |
||
79 | |||
80 | if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) { |
||
81 | $expiredJob = true; |
||
82 | $this->beanstalkd->delete($beanJob); |
||
83 | continue; |
||
84 | } |
||
85 | $job->setBeanJob($beanJob); |
||
86 | |||
87 | return $job; |
||
88 | } |
||
89 | } while ($expiredJob); |
||
90 | } |
||
91 | |||
92 | public function deleteJob(\Dtc\QueueBundle\Model\Job $job) |
||
93 | { |
||
94 | $this->beanstalkd |
||
95 | ->delete($job); |
||
96 | } |
||
97 | |||
98 | // Save History get called upon completion of the job |
||
99 | public function saveHistory(\Dtc\QueueBundle\Model\Job $job) |
||
100 | { |
||
101 | if (BaseJob::STATUS_SUCCESS === $job->getStatus()) { |
||
102 | $this->beanstalkd |
||
103 | ->delete($job); |
||
104 | } |
||
105 | } |
||
106 | |||
107 | public function getStats() |
||
111 | } |
||
112 |