@@ 115-170 (lines=56) @@ | ||
112 | * |
|
113 | * @return int The job id |
|
114 | */ |
|
115 | public function add($action, array $payload, $delay = null, $priority = null, $ttr = null) |
|
116 | { |
|
117 | if (false === $this->executorPool->hasExecutor($action)) { |
|
118 | throw new InvalidArgumentException( |
|
119 | sprintf( |
|
120 | 'Action "%s" is not defined in QueueManager', |
|
121 | $action |
|
122 | ) |
|
123 | ); |
|
124 | } |
|
125 | ||
126 | if (null === $delay) { |
|
127 | $delay = PheanstalkInterface::DEFAULT_DELAY; |
|
128 | } |
|
129 | ||
130 | if (null === $priority) { |
|
131 | $priority = PheanstalkInterface::DEFAULT_PRIORITY; |
|
132 | } |
|
133 | ||
134 | if (null === $ttr) { |
|
135 | $ttr = $this->defaultTtr; |
|
136 | } |
|
137 | ||
138 | if (!is_numeric($delay)) { |
|
139 | $delay = strtotime(sprintf('+ %s', $delay)) - time(); |
|
140 | } |
|
141 | ||
142 | if ($delay < 0) { |
|
143 | throw new InvalidArgumentException( |
|
144 | sprintf('You cannot schedule a job in the past (delay was %d)', $delay) |
|
145 | ); |
|
146 | } |
|
147 | ||
148 | if ($priority < 0) { |
|
149 | throw new InvalidArgumentException( |
|
150 | sprintf('The priority for a job cannot be negative (was %d)', $priority) |
|
151 | ); |
|
152 | } |
|
153 | ||
154 | $payload = json_encode($payload); |
|
155 | $jobId = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr); |
|
156 | ||
157 | $this->logJob( |
|
158 | $jobId, |
|
159 | sprintf( |
|
160 | 'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s', |
|
161 | $action, |
|
162 | $payload, |
|
163 | $priority, |
|
164 | $delay, |
|
165 | $ttr |
|
166 | ) |
|
167 | ); |
|
168 | ||
169 | return $jobId; |
|
170 | } |
|
171 | ||
172 | /** |
|
173 | * Adds a job to the queue for an object. |
@@ 205-258 (lines=54) @@ | ||
202 | * |
|
203 | * @return int The job id |
|
204 | */ |
|
205 | public function add($action, array $payload, $delay = null, $priority = null, $ttr = null) |
|
206 | { |
|
207 | if (false === $this->hasExecutor($action)) { |
|
208 | throw new \InvalidArgumentException(sprintf( |
|
209 | 'Action "%s" is not defined in QueueManager', |
|
210 | $action |
|
211 | )); |
|
212 | } |
|
213 | ||
214 | if (null === $delay) { |
|
215 | $delay = PheanstalkInterface::DEFAULT_DELAY; |
|
216 | } |
|
217 | ||
218 | if (null === $priority) { |
|
219 | $priority = PheanstalkInterface::DEFAULT_PRIORITY; |
|
220 | } |
|
221 | ||
222 | if (null === $ttr) { |
|
223 | $ttr = $this->defaultTtr; |
|
224 | } |
|
225 | ||
226 | if (!is_numeric($delay)) { |
|
227 | $delay = strtotime(sprintf('+ %s', $delay)) - time(); |
|
228 | } |
|
229 | ||
230 | if ($delay < 0) { |
|
231 | throw new \InvalidArgumentException( |
|
232 | sprintf('You cannot schedule a job in the past (delay was %d)', $delay) |
|
233 | ); |
|
234 | } |
|
235 | ||
236 | if ($priority < 0) { |
|
237 | throw new \InvalidArgumentException( |
|
238 | sprintf('The priority for a job cannot be negative (was %d)', $priority) |
|
239 | ); |
|
240 | } |
|
241 | ||
242 | $payload = json_encode($payload); |
|
243 | $jobId = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr); |
|
244 | ||
245 | $this->logJob( |
|
246 | $jobId, |
|
247 | sprintf( |
|
248 | 'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s', |
|
249 | $action, |
|
250 | $payload, |
|
251 | $priority, |
|
252 | $delay, |
|
253 | $ttr |
|
254 | ) |
|
255 | ); |
|
256 | ||
257 | return $jobId; |
|
258 | } |
|
259 | ||
260 | /** |
|
261 | * Adds a job to the queue for an object. |