| @@ 115-170 (lines=56) @@ | ||
| 112 | * |
|
| 113 | * @return int The job id |
|
| 114 | */ |
|
| 115 | public function add($action, array $payload, $delay = null, $priority = null, $ttr = null) |
|
| 116 | { |
|
| 117 | if (false === $this->executorPool->hasExecutor($action)) { |
|
| 118 | throw new InvalidArgumentException( |
|
| 119 | sprintf( |
|
| 120 | 'Action "%s" is not defined in QueueManager', |
|
| 121 | $action |
|
| 122 | ) |
|
| 123 | ); |
|
| 124 | } |
|
| 125 | ||
| 126 | if (null === $delay) { |
|
| 127 | $delay = PheanstalkInterface::DEFAULT_DELAY; |
|
| 128 | } |
|
| 129 | ||
| 130 | if (null === $priority) { |
|
| 131 | $priority = PheanstalkInterface::DEFAULT_PRIORITY; |
|
| 132 | } |
|
| 133 | ||
| 134 | if (null === $ttr) { |
|
| 135 | $ttr = $this->defaultTtr; |
|
| 136 | } |
|
| 137 | ||
| 138 | if (!is_numeric($delay)) { |
|
| 139 | $delay = strtotime(sprintf('+ %s', $delay)) - time(); |
|
| 140 | } |
|
| 141 | ||
| 142 | if ($delay < 0) { |
|
| 143 | throw new InvalidArgumentException( |
|
| 144 | sprintf('You cannot schedule a job in the past (delay was %d)', $delay) |
|
| 145 | ); |
|
| 146 | } |
|
| 147 | ||
| 148 | if ($priority < 0) { |
|
| 149 | throw new InvalidArgumentException( |
|
| 150 | sprintf('The priority for a job cannot be negative (was %d)', $priority) |
|
| 151 | ); |
|
| 152 | } |
|
| 153 | ||
| 154 | $payload = json_encode($payload); |
|
| 155 | $jobId = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr); |
|
| 156 | ||
| 157 | $this->logJob( |
|
| 158 | $jobId, |
|
| 159 | sprintf( |
|
| 160 | 'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s', |
|
| 161 | $action, |
|
| 162 | $payload, |
|
| 163 | $priority, |
|
| 164 | $delay, |
|
| 165 | $ttr |
|
| 166 | ) |
|
| 167 | ); |
|
| 168 | ||
| 169 | return $jobId; |
|
| 170 | } |
|
| 171 | ||
| 172 | /** |
|
| 173 | * Adds a job to the queue for an object. |
|
| @@ 205-258 (lines=54) @@ | ||
| 202 | * |
|
| 203 | * @return int The job id |
|
| 204 | */ |
|
| 205 | public function add($action, array $payload, $delay = null, $priority = null, $ttr = null) |
|
| 206 | { |
|
| 207 | if (false === $this->hasExecutor($action)) { |
|
| 208 | throw new \InvalidArgumentException(sprintf( |
|
| 209 | 'Action "%s" is not defined in QueueManager', |
|
| 210 | $action |
|
| 211 | )); |
|
| 212 | } |
|
| 213 | ||
| 214 | if (null === $delay) { |
|
| 215 | $delay = PheanstalkInterface::DEFAULT_DELAY; |
|
| 216 | } |
|
| 217 | ||
| 218 | if (null === $priority) { |
|
| 219 | $priority = PheanstalkInterface::DEFAULT_PRIORITY; |
|
| 220 | } |
|
| 221 | ||
| 222 | if (null === $ttr) { |
|
| 223 | $ttr = $this->defaultTtr; |
|
| 224 | } |
|
| 225 | ||
| 226 | if (!is_numeric($delay)) { |
|
| 227 | $delay = strtotime(sprintf('+ %s', $delay)) - time(); |
|
| 228 | } |
|
| 229 | ||
| 230 | if ($delay < 0) { |
|
| 231 | throw new \InvalidArgumentException( |
|
| 232 | sprintf('You cannot schedule a job in the past (delay was %d)', $delay) |
|
| 233 | ); |
|
| 234 | } |
|
| 235 | ||
| 236 | if ($priority < 0) { |
|
| 237 | throw new \InvalidArgumentException( |
|
| 238 | sprintf('The priority for a job cannot be negative (was %d)', $priority) |
|
| 239 | ); |
|
| 240 | } |
|
| 241 | ||
| 242 | $payload = json_encode($payload); |
|
| 243 | $jobId = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr); |
|
| 244 | ||
| 245 | $this->logJob( |
|
| 246 | $jobId, |
|
| 247 | sprintf( |
|
| 248 | 'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s', |
|
| 249 | $action, |
|
| 250 | $payload, |
|
| 251 | $priority, |
|
| 252 | $delay, |
|
| 253 | $ttr |
|
| 254 | ) |
|
| 255 | ); |
|
| 256 | ||
| 257 | return $jobId; |
|
| 258 | } |
|
| 259 | ||
| 260 | /** |
|
| 261 | * Adds a job to the queue for an object. |
|