DatabaseConnection::push()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 7
nc 2
nop 2
dl 0
loc 12
ccs 8
cts 8
cp 1
crap 2
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
namespace WP_Queue\Connections;
4
5
use Carbon\Carbon;
6
use Exception;
7
use WP_Queue\Job;
8
9
class DatabaseConnection implements ConnectionInterface {
10
11
	/**
12
	 * @var wpdb
13
	 */
14
	protected $database;
15
16
	/**
17
	 * @var string
18
	 */
19
	protected $jobs_table;
20
21
	/**
22
	 * @var string
23
	 */
24
	protected $failures_table;
25
26
	/**
27
	 * DatabaseQueue constructor.
28
	 *
29
	 * @param wpdb $wpdb
30
	 */
31 14
	public function __construct( $wpdb ) {
32 14
		$this->database       = $wpdb;
33 14
		$this->jobs_table     = $this->database->prefix . 'queue_jobs';
34 14
		$this->failures_table = $this->database->prefix . 'queue_failures';
35 14
	}
36
37
	/**
38
	 * Push a job onto the queue.
39
	 *
40
	 * @param Job $job
41
	 * @param int $delay
42
	 *
43
	 * @return bool|int
44
	 */
45 2
	public function push( Job $job, $delay = 0 ) {
46 2
		$result = $this->database->insert( $this->jobs_table, array(
47 2
			'job'          => serialize( $job ),
48 2
			'available_at' => $this->datetime( $delay ),
49 2
			'created_at'   => $this->datetime(),
50
		) );
51
52 2
		if ( ! $result ) {
53 1
			return false;
54
		}
55
56 1
		return $this->database->insert_id;
57
	}
58
59
	/**
60
	 * Retrieve a job from the queue.
61
	 *
62
	 * @return bool|Job
63
	 */
64 2
	public function pop() {
65 2
		$this->release_reserved();
66
67 2
		$sql = $this->database->prepare( "
68 2
			SELECT * FROM {$this->jobs_table}
69
			WHERE reserved_at IS NULL
70
			AND available_at <= %s
71
			ORDER BY available_at
72
			LIMIT 1
73 2
		", $this->datetime() );
74
75 2
		$raw_job = $this->database->get_row( $sql );
76
77 2
		if ( is_null( $raw_job ) ) {
78 1
			return false;
79
		}
80
81 1
		$job = $this->vitalize_job( $raw_job );
82
83 1
		$this->reserve( $job );
84
85 1
		return $job;
86
	}
87
88
	/**
89
	 * Delete a job from the queue.
90
	 *
91
	 * @param Job $job
92
	 *
93
	 * @return bool
94
	 */
95 3
	public function delete( $job ) {
96
		$where = array(
97 3
			'id' => $job->id(),
98
		);
99
100 3
		if ( $this->database->delete( $this->jobs_table, $where ) ) {
101 1
			return true;
102
		}
103
104 2
		return false;
105
	}
106
107
	/**
108
	 * Release a job back onto the queue.
109
	 *
110
	 * @param Job $job
111
	 *
112
	 * @return bool
113
	 */
114 2
	public function release( $job ) {
115
		$data  = array(
116 2
			'job'         => serialize( $job ),
117 2
			'attempts'    => $job->attempts(),
118
			'reserved_at' => null,
119
		);
120
		$where = array(
121 2
			'id' => $job->id(),
122
		);
123
124 2
		if ( $this->database->update( $this->jobs_table, $data, $where ) ) {
125 1
			return true;
126
		}
127
128 1
		return false;
129
	}
130
131
	/**
132
	 * Push a job onto the failure queue.
133
	 *
134
	 * @param Job       $job
135
	 * @param Exception $exception
136
	 *
137
	 * @return bool
138
	 */
139 2
	public function failure( $job, Exception $exception ) {
140 2
		$insert = $this->database->insert( $this->failures_table, array(
141 2
			'job'       => serialize( $job ),
142 2
			'error'     => $this->format_exception( $exception ),
143 2
			'failed_at' => $this->datetime(),
144
		) );
145
146 2
		if ( $insert ) {
147 1
			$this->delete( $job );
148
149 1
			return true;
150
		}
151
152 1
		return false;
153
	}
154
155
	/**
156
	 * Get total jobs in the queue.
157
	 *
158
	 * @return int
159
	 */
160 1
	public function jobs() {
161 1
		$sql = "SELECT COUNT(*) FROM {$this->jobs_table}";
162
		
163 1
		return (int) $this->database->get_var( $sql );
164
	}
165
166
	/**
167
	 * Get total jobs in the failures queue.
168
	 *
169
	 * @return int
170
	 */
171 1
	public function failed_jobs() {
172 1
		$sql = "SELECT COUNT(*) FROM {$this->failures_table}";
173
174 1
		return (int) $this->database->get_var( $sql );
175
	}
176
177
	/**
178
	 * Reserve a job in the queue.
179
	 *
180
	 * @param Job $job
181
	 */
182 1
	protected function reserve( $job ) {
183
		$data = array(
184 1
			'reserved_at' => $this->datetime(),
185
		);
186
187 1
		$this->database->update( $this->jobs_table, $data, array(
188 1
			'id' => $job->id(),
189
		) );
190 1
	}
191
192
	/**
193
	 * Release reserved jobs back onto the queue.
194
	 */
195 2
	protected function release_reserved() {
196 2
		$expired = $this->datetime( -300 );
197
198 2
		$sql = $this->database->prepare( "
199 2
				UPDATE {$this->jobs_table}
200
				SET attempts = attempts + 1, reserved_at = NULL
201 2
				WHERE reserved_at <= %s", $expired );
202
203 2
		$this->database->query( $sql );
204 2
	}
205
206
	/**
207
	 * Vitalize Job with latest data.
208
	 *
209
	 * @param mixed $raw_job
210
	 *
211
	 * @return Job
212
	 */
213 1
	protected function vitalize_job( $raw_job ) {
214 1
		$job = unserialize( $raw_job->job );
215
216 1
		$job->set_id( $raw_job->id );
217 1
		$job->set_attempts( $raw_job->attempts );
218 1
		$job->set_reserved_at( empty( $raw_job->reserved_at ) ? null : new Carbon( $raw_job->reserved_at ) );
219 1
		$job->set_available_at( new Carbon( $raw_job->available_at ) );
220 1
		$job->set_created_at( new Carbon( $raw_job->created_at ) );
221
222 1
		return $job;
223
	}
224
225
	/**
226
	 * Get MySQL datetime.
227
	 *
228
	 * @param int $offset Seconds, can pass negative int.
229
	 *
230
	 * @return string
231
	 */
232 6
	protected function datetime( $offset = 0 ) {
233 6
		$timestamp = time() + $offset;
234
235 6
		return gmdate( 'Y-m-d H:i:s', $timestamp );
236
	}
237
238
	/**
239
	 * Format an exception error string.
240
	 *
241
	 * @param Exception $exception
242
	 *
243
	 * @return string
244
	 */
245 2
	protected function format_exception( Exception $exception ) {
246 2
		$string = get_class( $exception );
247
248 2
		if ( ! empty( $exception->getMessage() ) ) {
249
			$string .= " : {$exception->getMessage()}";
250
		}
251
252 2
		if ( ! empty( $exception->getCode() ) ) {
253
			$string .= " (#{$exception->getCode()})";
254
		}
255
256 2
		return $string;
257
	}
258
259
}