1
|
|
|
<?php |
2
|
|
|
namespace Kir\Services\Cmd\Dispatcher\AttributeRepositories; |
3
|
|
|
|
4
|
|
|
use Generator; |
5
|
|
|
use PDO; |
6
|
|
|
use PDOStatement; |
7
|
|
|
use Kir\Services\Cmd\Dispatcher\AttributeRepository; |
8
|
|
|
use Kir\Services\Cmd\Dispatcher\Dispatchers\DefaultDispatcher\Service; |
9
|
|
|
use RuntimeException; |
10
|
|
|
use Throwable; |
11
|
|
|
|
12
|
|
|
class MySQLAttributeRepository implements AttributeRepository { |
13
|
|
|
/** @var PDOStatement */ |
14
|
|
|
private $selectServices; |
15
|
|
|
/** @var PDOStatement */ |
16
|
|
|
private $hasService; |
17
|
|
|
/** @var PDOStatement */ |
18
|
|
|
private $insertService; |
19
|
|
|
/** @var PDOStatement */ |
20
|
|
|
private $updateService; |
21
|
|
|
/** @var PDOStatement */ |
22
|
|
|
private $updateTryDate; |
23
|
|
|
/** @var PDOStatement */ |
24
|
|
|
private $updateRunDate; |
25
|
|
|
/** @var array */ |
26
|
|
|
private $services = []; |
27
|
|
|
/** @var PDO */ |
28
|
|
|
private $pdo; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @param PDO $pdo |
32
|
|
|
* @param string $tableName |
33
|
|
|
*/ |
34
|
|
|
public function __construct(PDO $pdo, $tableName = 'services') { |
35
|
|
|
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); |
36
|
|
|
$pdo->exec("CREATE TABLE IF NOT EXISTS `{$tableName}` (`service_key` VARCHAR(255) NOT NULL DEFAULT '', `service_last_try` DATETIME NULL DEFAULT '2000-01-01 00:00:00', `service_last_run` DATETIME NULL DEFAULT '2000-01-01 00:00:00', `service_timeout` INT UNSIGNED NULL DEFAULT '0', PRIMARY KEY (`service_key`));"); |
37
|
|
|
|
38
|
|
|
$this->selectServices = $pdo->prepare("SELECT FOR UPDATE service_key FROM `{$tableName}` WHERE DATE_ADD(service_last_run, INTERVAL service_timeout SECOND) <= NOW() ORDER BY GREATEST(service_last_try, service_last_run) ASC;"); |
39
|
|
|
$this->hasService = $pdo->prepare("SELECT COUNT(*) FROM `{$tableName}` WHERE service_key=:key;"); |
40
|
|
|
$this->insertService = $pdo->prepare("INSERT INTO `{$tableName}` (service_key, service_last_try, service_last_run, service_timeout) VALUES (:key, :try, :run, :timeout);"); |
41
|
|
|
$this->updateService = $pdo->prepare("UPDATE `{$tableName}` SET service_timeout=:timeout WHERE service_key=:key;"); |
42
|
|
|
$this->updateTryDate = $pdo->prepare("UPDATE `{$tableName}` SET service_last_try=NOW() WHERE service_key=:key;"); |
43
|
|
|
$this->updateRunDate = $pdo->prepare("UPDATE `{$tableName}` SET service_last_run=NOW() WHERE service_key=:key;"); |
44
|
|
|
$this->pdo = $pdo; |
45
|
|
|
} |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* @param string $key |
49
|
|
|
* @return bool |
50
|
|
|
*/ |
51
|
|
|
public function has($key) { |
52
|
|
|
$this->hasService->execute(['key' => $key]); |
53
|
|
|
$count = $this->hasService->fetchColumn(0); |
54
|
|
|
return $count > 0; |
55
|
|
|
} |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* @param string $key |
59
|
|
|
* @param int $timeout |
60
|
|
|
* @param array $data |
61
|
|
|
* @return $this |
62
|
|
|
*/ |
63
|
|
View Code Duplication |
public function store($key, $timeout, array $data = []) { |
|
|
|
|
64
|
|
|
$key = trim(strtolower($key)); |
65
|
|
|
if(!in_array($key, $this->services)) { |
66
|
|
|
$this->services[] = $key; |
67
|
|
|
} else { |
68
|
|
|
throw new RuntimeException("Duplicate service: {$key}"); |
69
|
|
|
} |
70
|
|
|
|
71
|
|
|
if($this->has($key)) { |
72
|
|
|
$this->updateService->execute(['key' => $key, 'timeout' => $timeout]); |
73
|
|
|
} else { |
74
|
|
|
$this->insertService->execute(['key' => $key, 'timeout' => $timeout, 'try' => '2000-01-01 00:00:00', 'run' => '2000-01-01 00:00:00']); |
75
|
|
|
} |
76
|
|
|
|
77
|
|
|
return $this; |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
/** |
81
|
|
|
* @param callable $fn |
82
|
|
|
* @return int |
83
|
|
|
* @throws Throwable |
84
|
|
|
*/ |
85
|
|
|
public function lockAndIterateServices($fn) { |
86
|
|
|
$count = 0; |
87
|
|
|
$this->pdo->exec('START TRANSACTION'); |
88
|
|
|
try { |
89
|
|
|
$services = $this->fetchServices(); |
90
|
|
|
foreach($services as $service) { |
91
|
|
|
$this->updateTryDate->execute(['key' => $service['service_key']]); |
92
|
|
|
$fn($service); |
93
|
|
|
$this->updateRunDate->execute(['key' => $service['service_key']]); |
94
|
|
|
$count++; |
95
|
|
|
} |
96
|
|
|
$this->pdo->exec('COMMIT'); |
97
|
|
|
return $count; |
98
|
|
|
} catch(Throwable $e) { |
99
|
|
|
$this->pdo->exec('ROLLBACK'); |
100
|
|
|
throw $e; |
101
|
|
|
} |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* @return Service[]|Generator |
106
|
|
|
*/ |
107
|
|
View Code Duplication |
private function fetchServices() { |
|
|
|
|
108
|
|
|
$this->selectServices->execute(); |
109
|
|
|
try { |
110
|
|
|
$services = $this->selectServices->fetchAll(PDO::FETCH_ASSOC); |
111
|
|
|
foreach($services as $service) { |
112
|
|
|
yield new Service($service['service_key']); |
113
|
|
|
} |
114
|
|
|
} finally { |
115
|
|
|
$this->selectServices->closeCursor(); |
116
|
|
|
} |
117
|
|
|
} |
118
|
|
|
} |
119
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.