Completed
Pull Request — master (#41)
by
unknown
13:10
created

Queue::processed()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
namespace Disque\Queue;
3
4
use DateTime;
5
use DateTimeZone;
6
use Disque\Client;
7
use Disque\Queue\Marshal\JobMarshaler;
8
use Disque\Queue\Marshal\MarshalerInterface;
9
use InvalidArgumentException;
10
use Disque\Command\Response\JobsResponse AS Response;
11
use Disque\Command\Response\JobsWithCountersResponse AS Counters;
12
13
class Queue
14
{
15
    const DEFAULT_JOB_TIMEZONE = 'UTC';
16
17
    /**
18
     * Client
19
     *
20
     * @var Client
21
     */
22
    protected $client;
23
24
    /**
25
     * Name
26
     *
27
     * @var string
28
     */
29
    protected $name;
30
31
    /**
32
     * Job marshaler
33
     *
34
     * @var MarshalerInterface
35
     */
36
    private $marshaler;
37
38
    /**
39
     * Default time zone
40
     *
41
     * @var DateTimeZone
42
     */
43
    private $timeZone;
44
45
    /**
46
     * Create a queue
47
     *
48
     * @param Client $client Client
49
     * @param string $name Queue name
50
     */
51 26
    public function __construct(Client $client, $name)
52
    {
53 26
        $this->client = $client;
54 26
        $this->name = $name;
55 26
        $this->setMarshaler(new JobMarshaler());
56 26
    }
57
58
    /**
59
     * Set Job marshaler
60
     *
61
     * @param MarshalerInterface Marshaler
62
     * @return void
63
     */
64 26
    public function setMarshaler(MarshalerInterface $marshaler)
65
    {
66 26
        $this->marshaler = $marshaler;
67 26
    }
68
69
    /**
70
     * Pushes a job into the queue, setting it to be up for processing only at
71
     * the specific date & time.
72
     *
73
     * @param JobInterface $job Job
74
     * @param DateTime $when Date & time on when job should be ready for processing
75
     * @param array $options ADDJOB options sent to the client
76
     * @return JobInterface Job pushed
77
     * @throws InvalidArgumentException
78
     */
79 5
    public function schedule(JobInterface $job, DateTime $when, array $options = [])
80
    {
81 5
        if (!isset($this->timeZone)) {
82 5
            $this->timeZone = new DateTimeZone(self::DEFAULT_JOB_TIMEZONE);
83 5
        }
84
85 5
        $date = clone($when);
86 5
        $date->setTimeZone($this->timeZone);
87 5
        $now = new DateTime('now', $this->timeZone);
88 5
        if ($date < $now) {
89 1
            throw new InvalidArgumentException('Specified schedule time has passed');
90
        }
91
92 4
        $options['delay'] = ($date->getTimestamp() - $now->getTimestamp());
93 4
        return $this->push($job, $options);
94
    }
95
96
    /**
97
     * Pushes a job into the queue
98
     *
99
     * @param JobInterface $job Job
100
     * @param array $options ADDJOB options sent to the client
101
     * @return JobInterface Job pushed
102
     */
103 4
    public function push(JobInterface $job, array $options = [])
104
    {
105 4
        $this->checkConnected();
106 4
        $id = $this->client->addJob($this->name, $this->marshaler->marshal($job), $options);
107 4
        $job->setId($id);
108 4
        return $job;
109
    }
110
111
    /**
112
     * Pulls a single job from the queue (if none available, and if $timeout
113
     * specified, then wait only this much time for a job, otherwise return
114
     * `null`)
115
     *
116
     * @param int $timeout If specified, wait these many seconds
117
     * @return Job|null A job, or null if no job was found before timeout
118
     */
119 6
    public function pull($timeout = 0)
120
    {
121 6
        $this->checkConnected();
122 6
        $jobs = $this->client->getJob($this->name, [
123 6
            'timeout' => $timeout,
124 6
            'count' => 1,
125
            'withcounters' => true
126 6
        ]);
127 6
        if (empty($jobs)) {
128 2
            return null;
129
        }
130 4
        $jobData = $jobs[0];
131 4
        $job = $this->marshaler->unmarshal($jobData[Response::KEY_BODY]);
132
133 4
        $job->setId($jobData[Response::KEY_ID]);
134 4
        $job->setNacks($jobData[Counters::KEY_NACKS]);
135 4
        $job->setAdditionalDeliveries(
136 4
            $jobData[Counters::KEY_ADDITIONAL_DELIVERIES]
137 4
        );
138
139 4
        return $job;
140
    }
141
142
    /**
143
     * Marks that a Job is still being processed
144
     *
145
     * @param JobInterface $job Job
146
     * @return int Number of seconds that the job visibility was postponed
147
     */
148 2
    public function processing(JobInterface $job)
149
    {
150 2
        $this->checkConnected();
151 2
        return $this->client->working($job->getId());
152
    }
153
154
    /**
155
     * Acknowledges a Job as properly handled
156
     *
157
     * @param JobInterface $job Job
158
     * @return void
159
     */
160 2
    public function processed(JobInterface $job)
161
    {
162 2
        $this->checkConnected();
163 2
        $this->client->ackJob($job->getId());
164 2
    }
165
166
    /**
167
     * Marks the job as failed and returns it to the queue
168
     *
169
     * This increases the NACK counter of the job
170
     *
171
     * @param JobInterface $job
172
     * @return void
173
     */
174 2
    public function failed(JobInterface $job)
175
    {
176 2
        $this->checkConnected();
177 2
        $this->client->nack($job->getId());
178 2
    }
179
180
    /**
181
     * Check that we are connected to a node, and if not connect
182
     *
183
     * @return void
184
     * @throws Disque\Connection\ConnectionException
185
     */
186 16
    private function checkConnected()
187
    {
188 16
        if (!$this->client->isConnected()) {
189 5
            $this->client->connect();
190 5
        }
191 16
    }
192
    
193
    /**
194
     * @inheritdoc
195
     */
196
    public function getName()
197
    {
198
        return $this->name;
199
    }
200
}
201