Test Failed
Push — master ( ce60e5...378563 )
by Julien
12:41 queued 07:49
created

Replication::setReplicationLag()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
c 2
b 0
f 0
nc 1
nop 1
dl 0
loc 3
ccs 0
cts 2
cp 0
crap 2
rs 10
1
<?php
2
3
/**
4
 * This file is part of the Zemit Framework.
5
 *
6
 * (c) Zemit Team <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE.txt
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Zemit\Mvc\Model;
13
14
use Phalcon\Config\ConfigInterface;
15
use Phalcon\Db\Adapter\AdapterInterface;
16
use Zemit\Mvc\Model\AbstractTrait\AbstractEventsManager;
17
use Zemit\Mvc\Model\AbstractTrait\AbstractInjectable;
18
19
/**
20
 * Replica Lag Workaround
21
 * Prevents Phalcon to use read connection while
22
 * it might be lagging behind the master db
23
 */
24
trait Replication
25
{
26
    abstract public function setConnectionService(string $connectionService): void;
27
    abstract public function setReadConnectionService(string $connectionService): void;
28
    abstract public function setWriteConnectionService(string $connectionService): void;
29
    abstract public function getWriteConnectionService(): string;
30
    abstract public function getReadConnectionService(): string;
31
    
32
    use AbstractEventsManager;
33
    use AbstractInjectable;
34
    use Options;
35
    
36
    /**
37
     * Replica Lag in milliseconds
38
     */
39
    protected static ?int $replicationLag = null;
40
    
41
    /**
42
     * Timestamp until we can use replication
43
     */
44
    protected static ?int $replicationReadyAt = null;
45
    
46
    /**
47
     * Get the replication lag value in milliseconds
48
     */
49
    public static function getReplicationLag(): ?int
50
    {
51
        return self::$replicationLag;
52
    }
53
    
54
    /**
55
     * Set the replication lag value in milliseconds
56
     */
57
    public static function setReplicationLag(?int $replicationLag = null): void
58
    {
59
        self::$replicationLag = $replicationLag;
60
    }
61
    
62
    /**
63
     * Get the replication lag value in milliseconds
64
     */
65
    public static function getReplicationReadyAt(): ?int
66
    {
67
        return self::$replicationReadyAt;
68
    }
69
    
70
    /**
71
     * Set the replication lag value in milliseconds
72
     */
73
    public static function setReplicationReadyAt(?int $replicationReadyAt = null): void
74
    {
75
        self::$replicationReadyAt = $replicationReadyAt;
76
    }
77
    
78
    /**
79
     * Replication Trait Initialization
80
     * - Set Read & Write Connection Service
81
     * - Add Replication Behavior
82
     */
83 2
    public function initializeReplication(?array $options = null): void
84
    {
85 2
        $options ??= $this->getOptionsManager()->get('replication') ?? [];
86
        
87 2
        $config = $this->getDI()->get('config');
88 2
        assert($config instanceof ConfigInterface);
89
        
90 2
        $enabled = $config->path('database.drivers.mysql.readonly.enable', false);
91 2
        if ($enabled) {
92
            self::setReplicationLag($options['lag'] ?? 1000);
93
            $this->setConnectionService($options['connectionService'] ?? 'db');
94
            $this->setReadConnectionService($options['readConnectionService'] ?? 'dbr');
95
            $this->setWriteConnectionService($options['writeConnectionService'] ?? 'db');
96
            $this->addReadWriteConnectionBehavior();
97
        }
98
    }
99
    
100
    /**
101
     * Dynamically selects a shard
102
     * - Prefer to read on the write master during the replica delay
103
     * 
104
     * Possible parameters which can be added if required
105
     * ?array $intermediate = null, array $bindParams = [], array $bindTypes = []
106
     * 
107
     * @return AdapterInterface
108
     */
109
    public function selectReadConnection(): AdapterInterface
110
    {
111
        $di = $this->getDI();
112
        
113
        // Check if the replication is ready
114
        if ($this->isReplicationReady()) {
115
            
116
            // Use the read connection service
117
            $di->get($this->getReadConnectionService());
118
        }
119
        
120
        // Use write connection service
121
        return $this->getDI()->get($this->getWriteConnectionService());
122
    }
123
    
124
    /**
125
     * Force write connection service to master if the model was previously saved
126
     */
127
    public function addReadWriteConnectionBehavior(): void
128
    {
129
        $forceMasterConnectionService = function () {
130
            self::setReplicationReadyAt((int)round(microtime(true) * 1000) + self::getReplicationLag());
131
        };
132
        
133
        // @todo change to behavior or check if this is added multiple times
134
        $eventsManager = $this->getEventsManager();
135
        $eventsManager->attach('model:afterSave', $forceMasterConnectionService);
136
        $eventsManager->attach('model:afterCreate', $forceMasterConnectionService);
137
        $eventsManager->attach('model:afterUpdate', $forceMasterConnectionService);
138
        $eventsManager->attach('model:afterDelete', $forceMasterConnectionService);
139
        $eventsManager->attach('model:afterRestore', $forceMasterConnectionService);
140
    }
141
    
142
    /**
143
     * Check whether the replica should be ready or not
144
     * @return bool true if replica should be ready
145
     */
146
    public function isReplicationReady(): bool
147
    {
148
        $replicationReadyAt = self::getReplicationReadyAt();
149
        if (empty($replicationReadyAt) || $replicationReadyAt < microtime(true) * 1000) {
150
            self::setReplicationReadyAt(null);
151
            return true;
152
        }
153
        
154
        return false;
155
    }
156
}
157