1
|
|
|
<?php |
2
|
|
|
namespace Kir\Services\Cmd\Dispatcher\AttributeRepositories; |
3
|
|
|
|
4
|
|
|
use Generator; |
5
|
|
|
use PDO; |
6
|
|
|
use PDOStatement; |
7
|
|
|
use Kir\Services\Cmd\Dispatcher\AttributeRepository; |
8
|
|
|
use Kir\Services\Cmd\Dispatcher\Dispatchers\DefaultDispatcher\Service; |
9
|
|
|
use RuntimeException; |
10
|
|
|
use Throwable; |
11
|
|
|
|
12
|
|
|
class SqliteAttributeRepository implements AttributeRepository { |
13
|
|
|
/** @var PDO */ |
14
|
|
|
private $pdo; |
15
|
|
|
/** @var PDOStatement */ |
16
|
|
|
private $selectServices; |
17
|
|
|
/** @var PDOStatement */ |
18
|
|
|
private $hasService; |
19
|
|
|
/** @var PDOStatement */ |
20
|
|
|
private $insertService; |
21
|
|
|
/** @var PDOStatement */ |
22
|
|
|
private $updateService; |
23
|
|
|
/** @var PDOStatement */ |
24
|
|
|
private $updateTryDate; |
25
|
|
|
/** @var PDOStatement */ |
26
|
|
|
private $updateRunDate; |
27
|
|
|
/** @var array */ |
28
|
|
|
private $services = []; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @param PDO $pdo |
32
|
|
|
*/ |
33
|
|
|
public function __construct(PDO $pdo) { |
34
|
|
|
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); |
35
|
|
|
$pdo->exec('CREATE TABLE IF NOT EXISTS services (service_key STRING PRIMARY KEY, service_last_try DATETIME, service_last_run DATETIME, service_timeout INTEGER);'); |
36
|
|
|
|
37
|
|
|
$this->selectServices = $pdo->prepare('SELECT service_key FROM services WHERE datetime(datetime(\'now\'), \'-\'||service_timeout||\' seconds\') > service_last_run ORDER BY MAX(service_last_try, service_last_run) ASC;'); |
38
|
|
|
$this->hasService = $pdo->prepare('SELECT COUNT(*) FROM services WHERE service_key=:key;'); |
39
|
|
|
$this->insertService = $pdo->prepare('INSERT INTO services (service_key, service_last_try, service_last_run, service_timeout) VALUES (:key, :try, :run, :timeout);'); |
40
|
|
|
$this->updateService = $pdo->prepare('UPDATE services SET service_timeout=:timeout WHERE service_key=:key;'); |
41
|
|
|
$this->updateTryDate = $pdo->prepare('UPDATE services SET service_last_try=datetime(\'now\') WHERE service_key=:key;'); |
42
|
|
|
$this->updateRunDate = $pdo->prepare('UPDATE services SET service_last_run=datetime(\'now\') WHERE service_key=:key;'); |
43
|
|
|
$this->pdo = $pdo; |
44
|
|
|
} |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @param string $key |
48
|
|
|
* @return bool |
49
|
|
|
*/ |
50
|
|
|
public function has($key) { |
51
|
|
|
try { |
52
|
|
|
$this->hasService->execute(['key' => $key]); |
53
|
|
|
$count = $this->hasService->fetchColumn(0); |
54
|
|
|
return $count > 0; |
55
|
|
|
} finally { |
56
|
|
|
$this->hasService->closeCursor(); |
57
|
|
|
} |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @param string $key |
62
|
|
|
* @param int $timeout |
63
|
|
|
* @param array $data |
64
|
|
|
* @return $this |
65
|
|
|
*/ |
66
|
|
View Code Duplication |
public function store($key, $timeout, array $data = []) { |
|
|
|
|
67
|
|
|
$key = trim(strtolower($key)); |
68
|
|
|
if(!in_array($key, $this->services)) { |
69
|
|
|
$this->services[] = $key; |
70
|
|
|
} else { |
71
|
|
|
throw new RuntimeException("Duplicate service: {$key}"); |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
if($this->has($key)) { |
75
|
|
|
$this->updateService->execute(['key' => $key, 'timeout' => $timeout]); |
76
|
|
|
} else { |
77
|
|
|
$this->insertService->execute(['key' => $key, 'timeout' => $timeout, 'try' => '2000-01-01 00:00:00', 'run' => '2000-01-01 00:00:00']); |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
return $this; |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* @param callable $fn |
85
|
|
|
* @return int |
86
|
|
|
* @throws Throwable |
87
|
|
|
*/ |
88
|
|
|
public function lockAndIterateServices($fn) { |
89
|
|
|
$count = 0; |
90
|
|
|
$this->pdo->exec('BEGIN EXCLUSIVE TRANSACTION'); |
91
|
|
|
try { |
92
|
|
|
$services = $this->fetchServices(); |
93
|
|
|
foreach($services as $service) { |
94
|
|
|
$this->updateTryDate->execute(['key' => $service->getKey()]); |
95
|
|
|
$fn($service); |
96
|
|
|
$this->updateRunDate->execute(['key' => $service->getKey()]); |
97
|
|
|
$count++; |
98
|
|
|
} |
99
|
|
|
$this->pdo->exec('COMMIT'); |
100
|
|
|
return $count; |
101
|
|
|
} catch(Throwable $e) { |
102
|
|
|
$this->pdo->exec('ROLLBACK'); |
103
|
|
|
throw $e; |
104
|
|
|
} |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* @return Service[]|Generator |
109
|
|
|
*/ |
110
|
|
View Code Duplication |
public function fetchServices() { |
|
|
|
|
111
|
|
|
$this->selectServices->execute(); |
112
|
|
|
try { |
113
|
|
|
$services = $this->selectServices->fetchAll(PDO::FETCH_ASSOC); |
114
|
|
|
foreach($services as $service) { |
115
|
|
|
yield new Service($service['service_key']); |
116
|
|
|
} |
117
|
|
|
} finally { |
118
|
|
|
$this->selectServices->closeCursor(); |
119
|
|
|
} |
120
|
|
|
} |
121
|
|
|
} |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.