Completed
Push — master ( b0e1f8...ce2c58 )
by Dan
01:40
created

Yabot::startConnectionMonitor()   B

Complexity

Conditions 3
Paths 2

Size

Total Lines 27
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 27
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 14
nc 2
nop 0
1
<?php
2
3
namespace Nopolabs\Yabot;
4
5
use DateTime;
6
use Exception;
7
use Nopolabs\Yabot\Helpers\ConfigTrait;
8
use Nopolabs\Yabot\Helpers\LogTrait;
9
use Nopolabs\Yabot\Helpers\LoopTrait;
10
use Nopolabs\Yabot\Helpers\SlackTrait;
11
use Nopolabs\Yabot\Message\MessageFactory;
12
use Nopolabs\Yabot\Plugin\PluginInterface;
13
use Nopolabs\Yabot\Plugin\PluginManager;
14
use Nopolabs\Yabot\Slack\Client;
15
use Psr\Log\LoggerInterface;
16
use React\EventLoop\LoopInterface;
17
use React\EventLoop\Timer\TimerInterface;
18
use Slack\Payload;
19
use Slack\User;
20
use Throwable;
21
22
class Yabot
23
{
24
    use LogTrait;
25
    use LoopTrait;
26
    use SlackTrait;
27
    use ConfigTrait;
28
29
    /** @var MessageFactory */
30
    private $messageFactory;
31
32
    /** @var PluginManager */
33
    private $pluginManager;
34
35
    private $messageLog;
36
37
    /** @var TimerInterface */
38
    private $monitor;
39
40
    public function __construct(
41
        LoggerInterface $logger,
42
        LoopInterface $eventLoop,
43
        Client $slackClient,
44
        MessageFactory $messageFactory,
45
        PluginManager $pluginManager,
46
        array $config = []
47
    ) {
48
        $this->setLog($logger);
49
        $this->setLoop($eventLoop);
50
        $this->setSlack($slackClient);
51
        $this->setConfig($config);
52
        $this->messageFactory = $messageFactory;
53
        $this->pluginManager = $pluginManager;
54
        $this->messageLog = null;
55
    }
56
57
    public function getMessageLog()
58
    {
59
        return $this->messageLog;
60
    }
61
62
    public function setMessageLog(string $messageLog = null)
63
    {
64
        $this->messageLog = $messageLog ?? null;
65
    }
66
67
    public function init(array $plugins)
68
    {
69
        foreach ($plugins as $pluginId => $plugin) {
70
            /** @var PluginInterface $plugin */
71
72
            $this->info("loading $pluginId");
73
74
            try {
75
                $this->pluginManager->loadPlugin($pluginId, $plugin);
76
            } catch (Exception $e) {
77
                $this->warning("Unhandled Exception while loading $pluginId: ".$e->getMessage());
78
                $this->warning($e->getTraceAsString());
79
            }
80
        }
81
    }
82
83
    public function run()
84
    {
85
        $slack = $this->getSlack();
86
87
        $slack->init();
88
89
        $slack->connect()->then([$this, 'connected']);
90
91
        $this->addMemoryReporting();
92
93
        $this->getLoop()->run();
94
    }
95
96
    public function shutDown()
97
    {
98
        $this->getSlack()->disconnect();
99
        $this->getLoop()->stop();
100
    }
101
102
    public function connected()
103
    {
104
        $slack = $this->getSlack();
105
106
        $slack->update(function(User $authedUser) {
107
            $this->pluginManager->setAuthedUser($authedUser);
108
        });
109
110
        $slack->onEvent('message', [$this, 'onMessage']);
111
112
        $this->monitor = $this->startConnectionMonitor();
113
    }
114
115
    public function onMessage(Payload $payload)
116
    {
117
        $data = $payload->getData();
118
119
        $this->debug('Received message', $data);
120
121
        try {
122
            $this->logMessage($data);
123
            $message = $this->messageFactory->create($data);
124
        } catch (Throwable $throwable) {
125
            $errmsg = $throwable->getMessage()."\n"
126
                .$throwable->getTraceAsString()."\n"
127
                ."Payload data: ".json_encode($data);
128
            $this->warning($errmsg);
129
            return;
130
        }
131
132
        if ($message->isSelf()) {
133
            return;
134
        }
135
136
        $this->pluginManager->dispatchMessage($message);
137
    }
138
139
    public function getHelp() : string
140
    {
141
        return implode("\n", $this->pluginManager->getHelp());
142
    }
143
144
    public function getStatus() : string
145
    {
146
        $statuses = $this->pluginManager->getStatuses();
147
148
        array_unshift($statuses, $this->getFormattedMemoryUsage());
149
150
        return implode("\n", $statuses);
151
    }
152
153
    protected function addMemoryReporting()
154
    {
155
        $now = new DateTime();
156
        $then = new DateTime('+1 hour');
157
        $then->setTime($then->format('H'), 0, 0);
158
        $delay = $then->getTimestamp() - $now->getTimestamp();
159
160
        $this->addTimer($delay, function() {
161
            $this->info($this->getFormattedMemoryUsage());
162
            $this->addPeriodicTimer(3600, function() {
163
                $this->info($this->getFormattedMemoryUsage());
164
            });
165
        });
166
    }
167
168
    protected function getFormattedMemoryUsage() : string
169
    {
170
        $memory = memory_get_usage() / 1024;
171
        $formatted = number_format($memory, 3).'K';
172
        return "Current memory usage: {$formatted}";
173
    }
174
175
    protected function logMessage($data)
176
    {
177
        if ($this->messageLog !== null) {
178
            file_put_contents($this->messageLog, json_encode($data) . "\n", FILE_APPEND);
179
        }
180
    }
181
182
    protected function reconnect()
183
    {
184
        if ($this->monitor) {
185
            $this->loop->cancelTimer($this->monitor);
186
        }
187
188
        $this->getSlack()->reconnect()->then(
189
            function () {
190
                $this->getLog()->info('Reconnected');
191
                $this->monitor = $this->startConnectionMonitor();
192
            },
193
            function () {
194
                $this->getLog()->error('Reconnect failed, shutting down.');
195
                $this->shutDown();
196
            }
197
        );
198
    }
199
200
    /**
201
     * @return TimerInterface|null
202
     */
203
    protected function startConnectionMonitor()
204
    {
205
        if ($interval = $this->get('connection_monitor.interval')) {
206
207
            $this->getLog()->info("Monitoring websocket connection every $interval seconds.");
208
209
            return $this->loop->addPeriodicTimer($interval, function () use (&$timer) {
210
211
                static $pong = true;
212
213
                if (!$pong) {
214
                    $this->getLog()->error('No pong: reconnecting...');
215
                    $this->reconnect();
216
                }
217
218
                $this->getSlack()->ping()
219
                    ->then(
220
                        function (Payload $payload) use (&$pong) {
221
                            $this->getLog()->info($payload->toJson());
222
                            $pong = true;
223
                        }
224
                    );
225
226
                $pong = false;
227
            });
228
        }
229
    }
230
}
231