1
|
|
|
<?php |
2
|
|
|
namespace Kir\Services\Cmd\Dispatcher\AttributeRepositories; |
3
|
|
|
|
4
|
|
|
use DateTimeInterface; |
5
|
|
|
use Generator; |
6
|
|
|
use PDO; |
7
|
|
|
use PDOStatement; |
8
|
|
|
use Kir\Services\Cmd\Dispatcher\AttributeRepository; |
9
|
|
|
use Kir\Services\Cmd\Dispatcher\Dispatchers\DefaultDispatcher\Service; |
10
|
|
|
use RuntimeException; |
11
|
|
|
|
12
|
|
|
class SqliteAttributeRepository implements AttributeRepository { |
13
|
|
|
private const SQLITE_DATETIME_FORMAT = 'Y-m-d\\TH:i:s'; |
14
|
|
|
|
15
|
|
|
/** @var PDO */ |
16
|
|
|
private $pdo; |
17
|
|
|
/** @var PDOStatement */ |
18
|
|
|
private $registerRow = null; |
19
|
|
|
/** @var PDOStatement */ |
20
|
|
|
private $selectServices; |
21
|
|
|
/** @var PDOStatement */ |
22
|
|
|
private $hasService; |
23
|
|
|
/** @var PDOStatement */ |
24
|
|
|
private $getData; |
25
|
|
|
/** @var PDOStatement */ |
26
|
|
|
private $insertService; |
27
|
|
|
/** @var PDOStatement */ |
28
|
|
|
private $setNextRun; |
29
|
|
|
/** @var PDOStatement */ |
30
|
|
|
private $setTryDate; |
31
|
|
|
/** @var PDOStatement */ |
32
|
|
|
private $setLastRun; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @param PDO $pdo |
36
|
|
|
*/ |
37
|
|
|
public function __construct(PDO $pdo) { |
38
|
|
|
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); |
39
|
|
|
$this->pdo = $pdo; |
40
|
|
|
|
41
|
|
|
// https://stackoverflow.com/a/8442173 |
42
|
|
|
$this->migrate(1, 'CREATE TABLE IF NOT EXISTS services (service_key STRING PRIMARY KEY, service_last_try DATETIME, service_last_run DATETIME, service_next_run DATETIME);'); |
43
|
|
|
|
44
|
|
|
$this->selectServices = $pdo->prepare("SELECT service_key, service_last_try, service_last_run FROM services WHERE service_next_run IS NULL OR DATETIME(service_next_run) <= DATETIME(:dt) ORDER BY MAX(service_last_try, service_last_run);"); |
45
|
|
|
$this->registerRow = $pdo->prepare('INSERT OR IGNORE INTO services (service_key) VALUES (:key);'); |
46
|
|
|
$this->hasService = $pdo->prepare('SELECT COUNT(*) FROM services WHERE service_key=:key;'); |
47
|
|
|
$this->getData = $pdo->prepare('SELECT service_key, service_last_try, service_last_run, service_next_run FROM services WHERE service_key=:key;'); |
48
|
|
|
$this->setTryDate = $pdo->prepare('INSERT INTO services (service_key, service_last_try) VALUES (:key, :dt) ON CONFLICT(service_key) DO UPDATE SET service_last_try=:dt'); |
49
|
|
|
$this->setLastRun = $pdo->prepare('INSERT INTO services (service_key, service_last_run) VALUES (:key, :dt) ON CONFLICT(service_key) DO UPDATE SET service_last_run=:dt'); |
50
|
|
|
$this->setNextRun = $pdo->prepare('INSERT INTO services (service_key, service_next_run) VALUES (:key, :dt) ON CONFLICT(service_key) DO UPDATE SET service_next_run=:dt'); |
51
|
|
|
} |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* @param string $key |
55
|
|
|
* @return bool |
56
|
|
|
*/ |
57
|
|
|
public function has(string $key) { |
58
|
|
|
try { |
59
|
|
|
$this->hasService->execute(['key' => $key]); |
60
|
|
|
$count = $this->hasService->fetchColumn(0); |
61
|
|
|
return $count > 0; |
62
|
|
|
} finally { |
63
|
|
|
$this->hasService->closeCursor(); |
64
|
|
|
} |
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* @param string $key |
69
|
|
|
* @return SqliteAttributeRepository|void |
70
|
|
|
*/ |
71
|
|
|
public function register(string $key) { |
72
|
|
|
$this->registerRow->execute(['key' => $key]); |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
/** |
76
|
|
|
* @param string $key |
77
|
|
|
* @return object |
78
|
|
|
*/ |
79
|
|
|
public function getRowByKey(string $key) { |
80
|
|
|
try { |
81
|
|
|
$this->getData->execute(['key' => $key]); |
82
|
|
|
$result = $this->getData->fetchObject(); |
83
|
|
|
if(!is_object($result)) { |
84
|
|
|
throw new RuntimeException('Row not found'); |
85
|
|
|
} |
86
|
|
|
return $result; |
87
|
|
|
} finally { |
88
|
|
|
$this->getData->closeCursor(); |
89
|
|
|
} |
90
|
|
|
} |
91
|
|
|
|
92
|
|
|
/** |
93
|
|
|
* @param DateTimeInterface $now |
94
|
|
|
* @param callable $fn |
95
|
|
|
* @return int |
96
|
|
|
*/ |
97
|
|
|
public function lockAndIterateServices(DateTimeInterface $now, callable $fn): int { |
98
|
|
|
$count = 0; |
99
|
|
|
$services = $this->fetchServices($now); |
100
|
|
|
foreach($services as $service) { |
101
|
|
|
$fn($service); |
102
|
|
|
} |
103
|
|
|
return $count; |
104
|
|
|
} |
105
|
|
|
|
106
|
|
|
/** |
107
|
|
|
* @param DateTimeInterface $now |
108
|
|
|
* @return Service[]|Generator |
109
|
|
|
*/ |
110
|
|
|
public function fetchServices(DateTimeInterface $now): Generator { |
111
|
|
|
$this->selectServices->execute(['dt' => $now->format(self::SQLITE_DATETIME_FORMAT)]); |
112
|
|
|
try { |
113
|
|
|
$services = $this->selectServices->fetchAll(PDO::FETCH_ASSOC); |
114
|
|
|
foreach($services as $service) { |
115
|
|
|
yield new Service($service['service_key']); |
116
|
|
|
} |
117
|
|
|
} finally { |
118
|
|
|
$this->selectServices->closeCursor(); |
119
|
|
|
} |
120
|
|
|
} |
121
|
|
|
|
122
|
|
|
/** |
123
|
|
|
* @param int $version |
124
|
|
|
* @param string $statement |
125
|
|
|
*/ |
126
|
|
|
private function migrate(int $version, string $statement): void { |
127
|
|
|
$currentVersion = (int) $this->pdo->query('PRAGMA user_version')->fetchColumn(0); |
128
|
|
|
if($currentVersion < $version) { |
129
|
|
|
$this->pdo->exec($statement); |
130
|
|
|
$this->pdo->exec("PRAGMA user_version={$version}"); |
131
|
|
|
} |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
/** |
135
|
|
|
* @param string $key |
136
|
|
|
* @param DateTimeInterface $datetime |
137
|
|
|
* @return void |
138
|
|
|
*/ |
139
|
|
|
public function setLastTryDate(string $key, DateTimeInterface $datetime): void { |
140
|
|
|
$this->setTryDate->execute(['key' => $key, 'dt' => $datetime->format(self::SQLITE_DATETIME_FORMAT)]); |
141
|
|
|
} |
142
|
|
|
|
143
|
|
|
/** |
144
|
|
|
* @param string $key |
145
|
|
|
* @param DateTimeInterface $datetime |
146
|
|
|
* @return void |
147
|
|
|
*/ |
148
|
|
|
public function setLastRunDate(string $key, DateTimeInterface $datetime): void { |
149
|
|
|
$this->setLastRun->execute(['key' => $key, 'dt' => $datetime->format(self::SQLITE_DATETIME_FORMAT)]); |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
/** |
153
|
|
|
* @param string $key |
154
|
|
|
* @param DateTimeInterface $datetime |
155
|
|
|
* @return void |
156
|
|
|
*/ |
157
|
|
|
public function setNextRunDate(string $key, DateTimeInterface $datetime): void { |
158
|
|
|
$this->setNextRun->execute(['key' => $key, 'dt' => $datetime->format(self::SQLITE_DATETIME_FORMAT)]); |
159
|
|
|
} |
160
|
|
|
} |