Completed
Push — master ( 84470e...c291b9 )
by Ron
03:30
created

SqliteAttributeRepository::fetchServices()   A

Complexity

Conditions 2
Paths 4

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 11
rs 9.9
c 0
b 0
f 0
cc 2
nc 4
nop 1
1
<?php
2
namespace Kir\Services\Cmd\Dispatcher\AttributeRepositories;
3
4
use DateTimeInterface;
5
use Generator;
6
use PDO;
7
use PDOStatement;
8
use Kir\Services\Cmd\Dispatcher\AttributeRepository;
9
use Kir\Services\Cmd\Dispatcher\Dispatchers\DefaultDispatcher\Service;
10
use RuntimeException;
11
12
class SqliteAttributeRepository implements AttributeRepository {
13
	private const SQLITE_DATETIME_FORMAT = 'Y-m-d\\TH:i:s';
14
	
15
	/** @var PDO */
16
	private $pdo;
17
	/** @var PDOStatement */
18
	private $registerRow = null;
19
	/** @var PDOStatement */
20
	private $selectServices;
21
	/** @var PDOStatement */
22
	private $hasService;
23
	/** @var PDOStatement */
24
	private $getData;
25
	/** @var PDOStatement */
26
	private $insertService;
27
	/** @var PDOStatement */
28
	private $setNextRun;
29
	/** @var PDOStatement */
30
	private $setTryDate;
31
	/** @var PDOStatement */
32
	private $setLastRun;
33
34
	/**
35
	 * @param PDO $pdo
36
	 */
37
	public function __construct(PDO $pdo) {
38
		$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
39
		$this->pdo = $pdo;
40
		
41
		// https://stackoverflow.com/a/8442173
42
		$this->migrate(1, 'CREATE TABLE IF NOT EXISTS services (service_key STRING PRIMARY KEY, service_last_try DATETIME, service_last_run DATETIME, service_next_run DATETIME);');
43
		
44
		$this->selectServices = $pdo->prepare("SELECT service_key, service_last_try, service_last_run FROM services WHERE service_next_run IS NULL OR DATETIME(service_next_run) <= DATETIME(:dt) ORDER BY MAX(service_last_try, service_last_run);");
45
		$this->registerRow = $pdo->prepare('INSERT OR IGNORE INTO services (service_key) VALUES (:key);');
46
		$this->hasService = $pdo->prepare('SELECT COUNT(*) FROM services WHERE service_key=:key;');
47
		$this->getData = $pdo->prepare('SELECT service_key, service_last_try, service_last_run, service_next_run FROM services WHERE service_key=:key;');
48
		$this->setTryDate = $pdo->prepare('INSERT INTO services (service_key, service_last_try) VALUES (:key, :dt) ON CONFLICT(service_key) DO UPDATE SET service_last_try=:dt');
49
		$this->setLastRun = $pdo->prepare('INSERT INTO services (service_key, service_last_run) VALUES (:key, :dt) ON CONFLICT(service_key) DO UPDATE SET service_last_run=:dt');
50
		$this->setNextRun = $pdo->prepare('INSERT INTO services (service_key, service_next_run) VALUES (:key, :dt) ON CONFLICT(service_key) DO UPDATE SET service_next_run=:dt');
51
	}
52
53
	/**
54
	 * @param string $key
55
	 * @return bool
56
	 */
57
	public function has(string $key) {
58
		try {
59
			$this->hasService->execute(['key' => $key]);
60
			$count = $this->hasService->fetchColumn(0);
61
			return $count > 0;
62
		} finally {
63
			$this->hasService->closeCursor();
64
		}
65
	}
66
	
67
	/**
68
	 * @param string $key
69
	 * @return SqliteAttributeRepository|void
70
	 */
71
	public function register(string $key) {
72
		$this->registerRow->execute(['key' => $key]);
73
	}
74
75
	/**
76
	 * @param string $key
77
	 * @return object
78
	 */
79
	public function getRowByKey(string $key) {
80
		try {
81
			$this->getData->execute(['key' => $key]);
82
			$result = $this->getData->fetchObject();
83
			if(!is_object($result)) {
84
				throw new RuntimeException('Row not found');
85
			}
86
			return $result;
87
		} finally {
88
			$this->getData->closeCursor();
89
		}
90
	}
91
	
92
	/**
93
	 * @param DateTimeInterface $now
94
	 * @param callable $fn
95
	 * @return int
96
	 */
97
	public function lockAndIterateServices(DateTimeInterface $now, callable $fn): int {
98
		$count = 0;
99
		$services = $this->fetchServices($now);
100
		foreach($services as $service) {
101
			$fn($service);
102
		}
103
		return $count;
104
	}
105
	
106
	/**
107
	 * @param DateTimeInterface $now
108
	 * @return Service[]|Generator
109
	 */
110
	public function fetchServices(DateTimeInterface $now): Generator {
111
		$this->selectServices->execute(['dt' => $now->format(self::SQLITE_DATETIME_FORMAT)]);
112
		try {
113
			$services = $this->selectServices->fetchAll(PDO::FETCH_ASSOC);
114
			foreach($services as $service) {
115
				yield new Service($service['service_key']);
116
			}
117
		} finally {
118
			$this->selectServices->closeCursor();
119
		}
120
	}
121
	
122
	/**
123
	 * @param int $version
124
	 * @param string $statement
125
	 */
126
	private function migrate(int $version, string $statement): void {
127
		$currentVersion = (int) $this->pdo->query('PRAGMA user_version')->fetchColumn(0);
128
		if($currentVersion < $version) {
129
			$this->pdo->exec($statement);
130
			$this->pdo->exec("PRAGMA user_version={$version}");
131
		}
132
	}
133
	
134
	/**
135
	 * @param string $key
136
	 * @param DateTimeInterface $datetime
137
	 * @return void
138
	 */
139
	public function setLastTryDate(string $key, DateTimeInterface $datetime): void {
140
		$this->setTryDate->execute(['key' => $key, 'dt' => $datetime->format(self::SQLITE_DATETIME_FORMAT)]);
141
	}
142
	
143
	/**
144
	 * @param string $key
145
	 * @param DateTimeInterface $datetime
146
	 * @return void
147
	 */
148
	public function setLastRunDate(string $key, DateTimeInterface $datetime): void {
149
		$this->setLastRun->execute(['key' => $key, 'dt' => $datetime->format(self::SQLITE_DATETIME_FORMAT)]);
150
	}
151
	
152
	/**
153
	 * @param string $key
154
	 * @param DateTimeInterface $datetime
155
	 * @return void
156
	 */
157
	public function setNextRunDate(string $key, DateTimeInterface $datetime): void {
158
		$this->setNextRun->execute(['key' => $key, 'dt' => $datetime->format(self::SQLITE_DATETIME_FORMAT)]);
159
	}
160
}