Completed
Push — master ( 84470e...c291b9 )
by Ron
03:30
created

MySQLAttributeRepository::setLastRunDate()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 2
1
<?php
2
namespace Kir\Services\Cmd\Dispatcher\AttributeRepositories;
3
4
use Closure;
5
use DateTimeInterface;
6
use PDO;
7
use PDOException;
8
use PDOStatement;
9
use Kir\Services\Cmd\Dispatcher\AttributeRepository;
10
use Kir\Services\Cmd\Dispatcher\Dispatchers\DefaultDispatcher\Service;
11
use RuntimeException;
12
use Throwable;
13
14
class MySQLAttributeRepository implements AttributeRepository {
15
	const MYSQL_ERR_TABLE_MSSING = '42S02';
16
	const MYSQL_ERR_TABLE_COLUMN_MSSING = '42S22';
17
	
18
	/** @var PDOStatement */
19
	private $selectOverdueServices = null;
20
	/** @var PDOStatement */
21
	private $registerRow = null;
22
	/** @var PDOStatement */
23
	private $hasService = null;
24
	/** @var PDOStatement */
25
	private $getServiceKeys = null;
26
	/** @var PDOStatement */
27
	private $getData = null;
28
	/** @var PDOStatement */
29
	private $updateTryDate = null;
30
	/** @var PDOStatement */
31
	private $updateRunDate = null;
32
	/** @var PDOStatement */
33
	private $updateNextDate = null;
34
	/** @var PDOStatement */
35
	private $lock;
36
	/** @var PDOStatement */
37
	private $unlock;
38
	/** @var PDO */
39
	private $pdo;
40
	/** @var string[]|null */
41
	private $services = null;
42
	/** @var string */
43
	private $tableName;
44
	/** @var array */
45
	private $options;
46
	
47
	/**
48
	 * @param PDO $pdo
49
	 * @param string $tableName
50
	 * @param array $options
51
	 */
52
	public function __construct(PDO $pdo, $tableName = 'services', array $options = []) {
53
		$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
54
		$this->tableName = $tableName;
55
		$this->options = $options;
56
		$this->pdo = $pdo;
57
		if($options['use-locking'] ?? true) {
58
			$this->lock = $pdo->prepare('SELECT GET_LOCK(:name, 0)');
59
			$this->unlock = $pdo->prepare('SELECT RELEASE_LOCK(:name)');
60
		}
61
	}
62
63
	/**
64
	 * @param string $key
65
	 * @return bool
66
	 */
67
	public function has(string $key) {
68
		if($this->hasService === null) {
69
			$this->hasService = $this->pdo->prepare("SELECT COUNT(*) FROM `{$this->tableName}` WHERE service_key=:key;");
70
		}
71
		$this->hasService->execute(['key' => $key]);
72
		$count = $this->hasService->fetchColumn(0);
73
		return $count > 0;
74
	}
75
	
76
	/**
77
	 * @param string $key
78
	 * @return MySQLAttributeRepository|void
79
	 */
80
	public function register(string $key) {
81
		$this->handleException(function () use ($key) {
82
			if($this->services === null) {
83
				if($this->getServiceKeys === null) {
84
					$this->getServiceKeys = $this->pdo->prepare("SELECT `service_key` FROM `{$this->tableName}`;");
85
				}
86
				try {
87
					$this->getServiceKeys->execute();
88
					$this->services = $this->getServiceKeys->fetchAll(PDO::FETCH_COLUMN);
89
				} finally {
90
					$this->getServiceKeys->closeCursor();
91
				}
92
			}
93
			if(!in_array($key, $this->services, true)) {
94
				if($this->registerRow === null) {
95
					$this->registerRow = $this->pdo->prepare("INSERT INTO `{$this->tableName}` (`service_key`) VALUES (:key)");
96
				}
97
				$this->registerRow->execute(['key' => $key]);
98
			}
99
		});
100
	}
101
102
	/**
103
	 * @param string $key
104
	 * @return object
105
	 */
106
	public function getRowByKey(string $key) {
107
		if($this->getData === null) {
108
			$this->getData = $this->pdo->prepare("SELECT service_key, service_last_try, service_last_run, service_next_run FROM `{$this->tableName}` WHERE service_key=:key;");
109
		}
110
		try {
111
			$this->handleException(function () use ($key) {
112
				$this->getData->execute(['key' => $key]);
113
			});
114
			$result = $this->getData->fetchObject();
115
			if(!is_object($result)) {
116
				throw new RuntimeException('Row not found');
117
			}
118
			return $result;
119
		} finally {
120
			$this->getData->closeCursor();
121
		}
122
	}
123
	
124
	/**
125
	 * @param string $key
126
	 * @param DateTimeInterface $datetime
127
	 * @return MySQLAttributeRepository|void
128
	 */
129
	public function setLastTryDate(string $key, DateTimeInterface $datetime) {
130
		if($this->updateTryDate === null) {
131
			$this->updateTryDate = $this->pdo->prepare("INSERT INTO `{$this->tableName}` (service_key, service_last_try) VALUES (:key, :dt) ON DUPLICATE KEY UPDATE service_last_try=:dt");
132
		}
133
		$this->handleException(function () use ($key, $datetime) {
134
			$this->updateTryDate->execute(['key' => $key, 'dt' => $datetime->format('Y-m-d H:i:s')]);
135
		});
136
	}
137
	
138
	/**
139
	 * @param string $key
140
	 * @param DateTimeInterface $datetime
141
	 * @return MySQLAttributeRepository|void
142
	 */
143
	public function setLastRunDate(string $key, DateTimeInterface $datetime) {
144
		if($this->updateRunDate === null) {
145
			$this->updateRunDate = $this->pdo->prepare("INSERT INTO `{$this->tableName}` (service_key, service_last_run) VALUES (:key, :dt) ON DUPLICATE KEY UPDATE service_last_run=:dt");
146
		}
147
		$this->handleException(function () use ($key, $datetime) {
148
			$this->updateRunDate->execute(['key' => $key, 'dt' => $datetime->format('Y-m-d H:i:s')]);
149
		});
150
	}
151
	
152
	/**
153
	 * @param string $key
154
	 * @param DateTimeInterface $datetime
155
	 * @return MySQLAttributeRepository|void
156
	 */
157
	public function setNextRunDate(string $key, DateTimeInterface $datetime) {
158
		if($this->updateNextDate === null) {
159
			$this->updateNextDate = $this->pdo->prepare("INSERT INTO `{$this->tableName}` (service_key, service_next_run) VALUES (:key, :dt) ON DUPLICATE KEY UPDATE service_next_run=:dt");
160
		}
161
		$this->handleException(function () use ($key, $datetime) {
162
			$this->updateNextDate->execute(['key' => $key, 'dt' => $datetime->format('Y-m-d H:i:s')]);
163
		});
164
	}
165
	
166
	/**
167
	 * @param DateTimeInterface|null $now
168
	 * @param callable $fn
169
	 * @return int
170
	 * @throws Throwable
171
	 */
172
	public function lockAndIterateServices(?DateTimeInterface $now, callable $fn): int {
173
		$data = (object) ['count' => 0];
174
			$services = $this->fetchServices($now);
0 ignored issues
show
Bug introduced by
It seems like $now defined by parameter $now on line 172 can be null; however, Kir\Services\Cmd\Dispatc...sitory::fetchServices() does not accept null, maybe add an additional type check?

It seems like you allow that null is being passed for a parameter, however the function which is called does not seem to accept null.

We recommend to add an additional type check (or disallow null for the parameter):

function notNullable(stdClass $x) { }

// Unsafe
function withoutCheck(stdClass $x = null) {
    notNullable($x);
}

// Safe - Alternative 1: Adding Additional Type-Check
function withCheck(stdClass $x = null) {
    if ($x instanceof stdClass) {
        notNullable($x);
    }
}

// Safe - Alternative 2: Changing Parameter
function withNonNullableParam(stdClass $x) {
    notNullable($x);
}
Loading history...
175
			foreach($services as $service) {
176
				try {
177
					$this->lock($service->getKey());
178
					$fn($service);
179
					$data->count++;
180
				} finally {
181
					$this->unlock($service->getKey());
182
				}
183
			}
184
			return $data->count;
185
	}
186
	
187
	/**
188
	 * @param DateTimeInterface $now
189
	 * @return Service[]
190
	 */
191
	private function fetchServices(DateTimeInterface $now) {
192
		if($this->selectOverdueServices === null) {
193
			$this->selectOverdueServices = $this->pdo->prepare("SELECT service_key, service_last_try, service_last_run, service_next_run FROM `{$this->tableName}` WHERE IFNULL(service_next_run, DATE('2000-01-01')) <= :dt;");
194
		}
195
		return $this->handleException(function () use ($now) {
196
			$this->selectOverdueServices->execute(['dt' => $now->format('Y-m-d H:i:d')]);
197
			try {
198
				$services = $this->selectOverdueServices->fetchAll(PDO::FETCH_ASSOC);
199
				$result = [];
200
				foreach($services as $service) {
201
					$result[] = new Service($service['service_key']);
202
				}
203
				return $result;
204
			} finally {
205
				$this->selectOverdueServices->closeCursor();
206
			}
207
		});
208
	}
209
	
210
	/**
211
	 * Is the lock can not be optained, an RuntimeException is thrown
212
	 *
213
	 * @param string $key
214
	 */
215
	public function lock(string $key): void {
216
		try {
217
			$lockName = sprintf('%s%s', $this->options['lock-prefix'] ?? '', $key);
218
			$this->lock->execute(['name' => $lockName]);
219
			$lockObtained = $this->lock->fetchColumn(0);
220
			if(!$lockObtained) {
221
				throw new RuntimeException(sprintf('Could not obtain lock "%s"', $lockName));
222
			}
223
		} finally {
224
			$this->lock->closeCursor();
225
		}
226
	}
227
	
228
	/**
229
	 * @param string $key
230
	 */
231
	public function unlock(string $key): void {
232
		$this->unlock->execute(['name' => sprintf('%s%s', $this->options['lock-prefix'] ?? '', $key)]);
233
	}
234
	
235
	/**
236
	 * @param Closure $fn
237
	 * @return mixed
238
	 */
239
	private function handleException(Closure $fn) {
240
		try {
241
			return $fn();
242
		} catch (PDOException $e) {
243
			if($e->getCode() === self::MYSQL_ERR_TABLE_MSSING) {
244
				// Field is missing, let's have a look what is going on...
245
				$this->pdo->exec("CREATE TABLE IF NOT EXISTS `{$this->tableName}` (`service_key` VARCHAR(255) NOT NULL DEFAULT '', `service_last_try` DATETIME NULL DEFAULT NULL, `service_last_run` DATETIME NULL DEFAULT NULL, `service_next_run` DATETIME NULL DEFAULT NULL, PRIMARY KEY (`service_key`));");
246
				return $this->retry($fn);
247
			}
248
			
249
			if($e->getCode() === self::MYSQL_ERR_TABLE_COLUMN_MSSING) {
250
				// Field is missing, let's have a look what is going on...
251
				$this->checkIfOldTableVersion();
252
				return $this->retry($fn);
253
			}
254
			
255
			throw $e;
256
		}
257
	}
258
	
259
	/**
260
	 * @param Closure $fn
261
	 * @return mixed
262
	 */
263
	private function retry(Closure $fn) {
264
		return $this->handleException(function () use ($fn) {
265
			return $fn();
266
		});
267
	}
268
	
269
	/**
270
	 */
271
	private function checkIfOldTableVersion() {
272
		$serviceNextRunField = $this->pdo->query("SHOW COLUMNS FROM `{$this->tableName}` LIKE 'service_next_run';")->fetchAll(PDO::FETCH_ASSOC);
273
274
		if(!count($serviceNextRunField)) {
275
			$this->pdo->exec("ALTER TABLE `{$this->tableName}` ADD COLUMN `service_next_run` DATETIME NULL DEFAULT NULL AFTER `service_last_run`;");
276
			$serviceTimeoutField = $this->pdo->query("SHOW COLUMNS FROM `{$this->tableName}` LIKE 'service_timeout';")->fetchAll(PDO::FETCH_ASSOC);
277
			if(count($serviceTimeoutField)) {
278
				$this->pdo->exec("UPDATE `{$this->tableName}` SET service_next_run = DATE_ADD(service_last_run, INTERVAL service_timeout SECOND)");
279
				$this->pdo->exec("ALTER TABLE `{$this->tableName}` DROP COLUMN `service_timeout`;");
280
			}
281
		}
282
	}
283
}
284