Completed
Push — master ( 3f6cab...790d9e )
by Mariano
02:45
created

Queue::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 2
crap 1
1
<?php
2
namespace Disque\Queue;
3
4
use DateTime;
5
use DateTimeZone;
6
use Disque\Client;
7
use Disque\Queue\Marshal\JobMarshaler;
8
use Disque\Queue\Marshal\MarshalerInterface;
9
use InvalidArgumentException;
10
use Disque\Command\Response\JobsResponse AS Response;
11
use Disque\Command\Response\JobsWithCountersResponse AS Counters;
12
13
class Queue
14
{
15
    const DEFAULT_JOB_TIMEZONE = 'UTC';
16
17
    /**
18
     * Client
19
     *
20
     * @var Client
21
     */
22
    protected $client;
23
24
    /**
25
     * Name
26
     *
27
     * @var string
28
     */
29
    protected $name;
30
31
    /**
32
     * Job marshaler
33
     *
34
     * @var MarshalerInterface
35
     */
36
    private $marshaler;
37
38
    /**
39
     * Default time zone
40
     *
41
     * @var DateTimeZone
42
     */
43
    private $timeZone;
44
45
    /**
46
     * Create a queue
47
     *
48
     * @param Client $client Client
49
     * @param string $name Queue name
50
     */
51 27
    public function __construct(Client $client, $name)
52
    {
53 27
        $this->client = $client;
54 27
        $this->name = $name;
55 27
        $this->setMarshaler(new JobMarshaler());
56 27
    }
57
    
58
    /**
59
     * Get the queue name
60
     *
61
     * @return string
62
     */
63 1
    public function getName()
64
    {
65 1
        return $this->name;
66
    }
67
68
    /**
69
     * Set Job marshaler
70
     *
71
     * @param MarshalerInterface Marshaler
72
     * @return void
73
     */
74 27
    public function setMarshaler(MarshalerInterface $marshaler)
75
    {
76 27
        $this->marshaler = $marshaler;
77 27
    }
78
79
    /**
80
     * Pushes a job into the queue, setting it to be up for processing only at
81
     * the specific date & time.
82
     *
83
     * @param JobInterface $job Job
84
     * @param DateTime $when Date & time on when job should be ready for processing
85
     * @param array $options ADDJOB options sent to the client
86
     * @return JobInterface Job pushed
87
     * @throws InvalidArgumentException
88
     */
89 5
    public function schedule(JobInterface $job, DateTime $when, array $options = [])
90
    {
91 5
        if (!isset($this->timeZone)) {
92 5
            $this->timeZone = new DateTimeZone(self::DEFAULT_JOB_TIMEZONE);
93
        }
94
95 5
        $date = clone($when);
96 5
        $date->setTimeZone($this->timeZone);
97 5
        $now = new DateTime('now', $this->timeZone);
98 5
        if ($date < $now) {
99 1
            throw new InvalidArgumentException('Specified schedule time has passed');
100
        }
101
102 4
        $options['delay'] = ($date->getTimestamp() - $now->getTimestamp());
103 4
        return $this->push($job, $options);
104
    }
105
106
    /**
107
     * Pushes a job into the queue
108
     *
109
     * @param JobInterface $job Job
110
     * @param array $options ADDJOB options sent to the client
111
     * @return JobInterface Job pushed
112
     */
113 4
    public function push(JobInterface $job, array $options = [])
114
    {
115 4
        $this->checkConnected();
116 4
        $id = $this->client->addJob($this->name, $this->marshaler->marshal($job), $options);
117 4
        $job->setId($id);
118 4
        return $job;
119
    }
120
121
    /**
122
     * Pulls a single job from the queue (if none available, and if $timeout
123
     * specified, then wait only this much time for a job, otherwise return
124
     * `null`)
125
     *
126
     * @param int $timeout If specified, wait these many seconds
127
     * @return Job|null A job, or null if no job was found before timeout
128
     */
129 6
    public function pull($timeout = 0)
130
    {
131 6
        $this->checkConnected();
132 6
        $jobs = $this->client->getJob($this->name, [
133 6
            'timeout' => $timeout,
134 6
            'count' => 1,
135
            'withcounters' => true
136
        ]);
137 6
        if (empty($jobs)) {
138 2
            return null;
139
        }
140 4
        $jobData = $jobs[0];
141 4
        $job = $this->marshaler->unmarshal($jobData[Response::KEY_BODY]);
142
143 4
        $job->setId($jobData[Response::KEY_ID]);
144 4
        $job->setNacks($jobData[Counters::KEY_NACKS]);
145 4
        $job->setAdditionalDeliveries(
146 4
            $jobData[Counters::KEY_ADDITIONAL_DELIVERIES]
147
        );
148
149 4
        return $job;
150
    }
151
152
    /**
153
     * Marks that a Job is still being processed
154
     *
155
     * @param JobInterface $job Job
156
     * @return int Number of seconds that the job visibility was postponed
157
     */
158 2
    public function processing(JobInterface $job)
159
    {
160 2
        $this->checkConnected();
161 2
        return $this->client->working($job->getId());
162
    }
163
164
    /**
165
     * Acknowledges a Job as properly handled
166
     *
167
     * @param JobInterface $job Job
168
     * @return void
169
     */
170 2
    public function processed(JobInterface $job)
171
    {
172 2
        $this->checkConnected();
173 2
        $this->client->ackJob($job->getId());
174 2
    }
175
176
    /**
177
     * Marks the job as failed and returns it to the queue
178
     *
179
     * This increases the NACK counter of the job
180
     *
181
     * @param JobInterface $job
182
     * @return void
183
     */
184 2
    public function failed(JobInterface $job)
185
    {
186 2
        $this->checkConnected();
187 2
        $this->client->nack($job->getId());
188 2
    }
189
190
    /**
191
     * Check that we are connected to a node, and if not connect
192
     *
193
     * @return void
194
     * @throws Disque\Connection\ConnectionException
195
     */
196 16
    private function checkConnected()
197
    {
198 16
        if (!$this->client->isConnected()) {
199 5
            $this->client->connect();
200
        }
201 16
    }
202
}
203