Completed
Push — master ( 23286c...3055fa )
by Ron
02:51
created

MySQLAttributeRepository::markTry()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 1
1
<?php
2
namespace Kir\Services\Cmd\Dispatcher\AttributeRepositories;
3
4
use Generator;
5
use PDO;
6
use PDOStatement;
7
use Kir\Services\Cmd\Dispatcher\AttributeRepository;
8
use Kir\Services\Cmd\Dispatcher\Dispatchers\DefaultDispatcher\Service;
9
use RuntimeException;
10
use Throwable;
11
12
class MySQLAttributeRepository implements AttributeRepository {
13
	/** @var PDOStatement */
14
	private $selectServices;
15
	/** @var PDOStatement */
16
	private $hasService;
17
	/** @var PDOStatement */
18
	private $insertService;
19
	/** @var PDOStatement */
20
	private $updateService;
21
	/** @var PDOStatement */
22
	private $updateTryDate;
23
	/** @var PDOStatement */
24
	private $updateRunDate;
25
	/** @var array */
26
	private $services = [];
27
	/** @var PDO */
28
	private $pdo;
29
	
30
	/**
31
	 * @param PDO $pdo
32
	 * @param string $tableName
33
	 */
34
	public function __construct(PDO $pdo, $tableName = 'services') {
35
		$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
36
		$pdo->exec("CREATE TABLE IF NOT EXISTS `{$tableName}` (`service_key` VARCHAR(255) NOT NULL DEFAULT '', `service_last_try` DATETIME NULL DEFAULT '2000-01-01 00:00:00', `service_last_run` DATETIME NULL DEFAULT '2000-01-01 00:00:00', `service_timeout` INT UNSIGNED NULL DEFAULT '0', PRIMARY KEY (`service_key`));");
37
38
		$this->selectServices = $pdo->prepare("SELECT FOR UPDATE service_key FROM `{$tableName}` WHERE DATE_ADD(service_last_run, INTERVAL service_timeout SECOND) <= NOW() ORDER BY GREATEST(service_last_try, service_last_run) ASC;");
39
		$this->hasService = $pdo->prepare("SELECT COUNT(*) FROM `{$tableName}` WHERE service_key=:key;");
40
		$this->insertService = $pdo->prepare("INSERT INTO `{$tableName}` (service_key, service_last_try, service_last_run, service_timeout) VALUES (:key, :try, :run, :timeout);");
41
		$this->updateService = $pdo->prepare("UPDATE `{$tableName}` SET service_timeout=:timeout WHERE service_key=:key;");
42
		$this->updateTryDate = $pdo->prepare("UPDATE `{$tableName}` SET service_last_try=NOW() WHERE service_key=:key;");
43
		$this->updateRunDate = $pdo->prepare("UPDATE `{$tableName}` SET service_last_run=NOW() WHERE service_key=:key;");
44
		$this->pdo = $pdo;
45
	}
46
47
	/**
48
	 * @param string $key
49
	 * @return bool
50
	 */
51
	public function has($key) {
52
		$this->hasService->execute(['key' => $key]);
53
		$count = $this->hasService->fetchColumn(0);
54
		return $count > 0;
55
	}
56
57
	/**
58
	 * @param string $key
59
	 * @param int $timeout
60
	 * @param array $data
61
	 * @return $this
62
	 */
63 View Code Duplication
	public function store($key, $timeout, array $data = []) {
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
64
		$key = trim(strtolower($key));
65
		if(!in_array($key, $this->services)) {
66
			$this->services[] = $key;
67
		} else {
68
			throw new RuntimeException("Duplicate service: {$key}");
69
		}
70
71
		if($this->has($key)) {
72
			$this->updateService->execute(['key' => $key, 'timeout' => $timeout]);
73
		} else {
74
			$this->insertService->execute(['key' => $key, 'timeout' => $timeout, 'try' => '2000-01-01 00:00:00', 'run' => '2000-01-01 00:00:00']);
75
		}
76
77
		return $this;
78
	}
79
	
80
	/**
81
	 * @param callable $fn
82
	 * @return int
83
	 * @throws Throwable
84
	 */
85
	public function lockAndIterateServices($fn) {
86
		$count = 0;
87
		$this->pdo->exec('START TRANSACTION');
88
		try {
89
			$services = $this->fetchServices();
90
			foreach($services as $service) {
91
				$this->updateTryDate->execute(['key' => $service['service_key']]);
92
				$fn($service);
93
				$this->updateRunDate->execute(['key' => $service['service_key']]);
94
				$count++;
95
			}
96
			$this->pdo->exec('COMMIT');
97
			return $count;
98
		} catch(Throwable $e) {
99
			$this->pdo->exec('ROLLBACK');
100
			throw $e;
101
		}
102
	}
103
	
104
	/**
105
	 * @return Service[]|Generator
106
	 */
107 View Code Duplication
	private function fetchServices() {
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
108
		$this->selectServices->execute();
109
		try {
110
			$services = $this->selectServices->fetchAll(PDO::FETCH_ASSOC);
111
			foreach($services as $service) {
112
				yield new Service($service['service_key']);
113
			}
114
		} finally {
115
			$this->selectServices->closeCursor();
116
		}
117
	}
118
}
119