Test Failed
Push — master ( be437e...866f6c )
by Julien
07:25
created

Replication   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 132
Duplicated Lines 0 %

Test Coverage

Coverage 48.78%

Importance

Changes 0
Metric Value
eloc 33
c 0
b 0
f 0
dl 0
loc 132
ccs 20
cts 41
cp 0.4878
rs 10
wmc 12

8 Methods

Rating   Name   Duplication   Size   Complexity  
A isReplicationReady() 0 9 3
A addReadWriteConnectionBehavior() 0 14 1
A setReplicationLag() 0 3 1
A getReplicationLag() 0 3 1
A initializeReplication() 0 14 2
A setReplicationReadyAt() 0 3 1
A selectReadConnection() 0 13 2
A getReplicationReadyAt() 0 3 1
1
<?php
2
3
/**
4
 * This file is part of the Zemit Framework.
5
 *
6
 * (c) Zemit Team <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE.txt
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Zemit\Mvc\Model\Traits;
13
14
use Phalcon\Config\ConfigInterface;
15
use Phalcon\Db\Adapter\AdapterInterface;
16
use Phalcon\Events\ManagerInterface;
17
use Zemit\Mvc\Model\Traits\Abstracts\AbstractEventsManager;
18
use Zemit\Mvc\Model\Traits\Abstracts\AbstractInjectable;
19
20
/**
21
 * Replica Lag Workaround
22
 * Prevents Phalcon to use read connection while
23
 * it might be lagging behind the master db
24
 */
25
trait Replication
26
{
27
    abstract public function setConnectionService(string $connectionService): void;
28
    abstract public function setReadConnectionService(string $connectionService): void;
29
    abstract public function setWriteConnectionService(string $connectionService): void;
30
    abstract public function getWriteConnectionService(): string;
31
    abstract public function getReadConnectionService(): string;
32
    
33
    use AbstractEventsManager;
34
    use AbstractInjectable;
35
    use Options;
36
    
37
    /**
38
     * Replica Lag in milliseconds
39
     */
40
    protected static ?int $replicationLag = null;
41
    
42
    /**
43
     * Timestamp until we can use replication
44
     */
45
    protected static ?int $replicationReadyAt = null;
46
    
47
    /**
48
     * Get the replication lag value in milliseconds
49
     */
50
    public static function getReplicationLag(): ?int
51
    {
52
        return self::$replicationLag;
53
    }
54
    
55
    /**
56
     * Set the replication lag value in milliseconds
57
     */
58
    public static function setReplicationLag(?int $replicationLag = null): void
59
    {
60
        self::$replicationLag = $replicationLag;
61
    }
62
    
63
    /**
64
     * Get the replication lag value in milliseconds
65
     */
66 2
    public static function getReplicationReadyAt(): ?int
67
    {
68 2
        return self::$replicationReadyAt;
69
    }
70
    
71
    /**
72
     * Set the replication lag value in milliseconds
73
     */
74 2
    public static function setReplicationReadyAt(?int $replicationReadyAt = null): void
75
    {
76 2
        self::$replicationReadyAt = $replicationReadyAt;
77
    }
78
    
79
    /**
80
     * Replication Trait Initialization
81
     * - Set Read & Write Connection Service
82
     * - Add Replication Behavior
83
     */
84 53
    public function initializeReplication(?array $options = null): void
85
    {
86 53
        $options ??= $this->getOptionsManager()->get('replication') ?? [];
87
        
88 53
        $config = $this->getDI()->get('config');
89 53
        assert($config instanceof ConfigInterface);
90
        
91 53
        $enabled = $config->path('database.drivers.mysql.readonly.enable', false);
92 53
        if ($enabled) {
93
            self::setReplicationLag($options['lag'] ?? 1000);
94
            $this->setConnectionService($options['connectionService'] ?? 'db');
95
            $this->setReadConnectionService($options['readConnectionService'] ?? 'dbr');
96
            $this->setWriteConnectionService($options['writeConnectionService'] ?? 'db');
97
            $this->addReadWriteConnectionBehavior();
98
        }
99
    }
100
    
101
    /**
102
     * Dynamically selects a shard
103
     * - Prefer to read on the write master during the replica delay
104
     * 
105
     * Possible parameters which can be added if required
106
     * ?array $intermediate = null, array $bindParams = [], array $bindTypes = []
107
     * 
108
     * @return AdapterInterface
109
     */
110 2
    public function selectReadConnection(): AdapterInterface
111
    {
112 2
        $di = $this->getDI();
113
        
114
        // Check if the replication is ready
115 2
        if ($this->isReplicationReady()) {
116
            
117
            // Use the read connection service
118 2
            $di->get($this->getReadConnectionService());
119
        }
120
        
121
        // Use write connection service
122 2
        return $this->getDI()->get($this->getWriteConnectionService());
123
    }
124
    
125
    /**
126
     * Force write connection service to master if the model was previously saved
127
     */
128
    public function addReadWriteConnectionBehavior(): void
129
    {
130
        $forceMasterConnectionService = function () {
131
            self::setReplicationReadyAt((int)round(microtime(true) * 1000) + self::getReplicationLag());
132
        };
133
        
134
        // @todo change to behavior or check if this is added multiple times
135
        $eventsManager = $this->getEventsManager();
136
        assert($eventsManager instanceof ManagerInterface);
137
        $eventsManager->attach('model:afterSave', $forceMasterConnectionService);
138
        $eventsManager->attach('model:afterCreate', $forceMasterConnectionService);
139
        $eventsManager->attach('model:afterUpdate', $forceMasterConnectionService);
140
        $eventsManager->attach('model:afterDelete', $forceMasterConnectionService);
141
        $eventsManager->attach('model:afterRestore', $forceMasterConnectionService);
142
    }
143
    
144
    /**
145
     * Check whether the replica should be ready or not
146
     * @return bool true if replica should be ready
147
     */
148 2
    public function isReplicationReady(): bool
149
    {
150 2
        $replicationReadyAt = self::getReplicationReadyAt();
151 2
        if (empty($replicationReadyAt) || $replicationReadyAt < microtime(true) * 1000) {
152 2
            self::setReplicationReadyAt(null);
153 2
            return true;
154
        }
155
        
156
        return false;
157
    }
158
}
159