Completed
Push — master ( ce2c58...225145 )
by Dan
01:38
created

Yabot::notify()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 3
nc 2
nop 1
1
<?php
2
3
namespace Nopolabs\Yabot;
4
5
use DateTime;
6
use Exception;
7
use Nopolabs\Yabot\Helpers\ConfigTrait;
8
use Nopolabs\Yabot\Helpers\LogTrait;
9
use Nopolabs\Yabot\Helpers\LoopTrait;
10
use Nopolabs\Yabot\Helpers\SlackTrait;
11
use Nopolabs\Yabot\Message\MessageFactory;
12
use Nopolabs\Yabot\Plugin\PluginInterface;
13
use Nopolabs\Yabot\Plugin\PluginManager;
14
use Nopolabs\Yabot\Slack\Client;
15
use Psr\Log\LoggerInterface;
16
use React\EventLoop\LoopInterface;
17
use React\EventLoop\Timer\TimerInterface;
18
use Slack\Payload;
19
use Slack\User;
20
use Throwable;
21
22
class Yabot
23
{
24
    use LogTrait;
25
    use LoopTrait;
26
    use SlackTrait;
27
    use ConfigTrait;
28
29
    /** @var MessageFactory */
30
    private $messageFactory;
31
32
    /** @var PluginManager */
33
    private $pluginManager;
34
35
    /** @var string */
36
    private $messageLog;
37
38
    /** @var TimerInterface */
39
    private $monitor;
40
41
    /** @var bool */
42
    private $pong;
43
44
    public function __construct(
45
        LoggerInterface $logger,
46
        LoopInterface $eventLoop,
47
        Client $slackClient,
48
        MessageFactory $messageFactory,
49
        PluginManager $pluginManager,
50
        array $config = []
51
    ) {
52
        $this->setLog($logger);
53
        $this->setLoop($eventLoop);
54
        $this->setSlack($slackClient);
55
        $this->setConfig($config);
56
        $this->messageFactory = $messageFactory;
57
        $this->pluginManager = $pluginManager;
58
        $this->messageLog = null;
59
    }
60
61
    public function getMessageLog()
62
    {
63
        return $this->messageLog;
64
    }
65
66
    public function setMessageLog(string $messageLog = null)
67
    {
68
        $this->messageLog = $messageLog ?? null;
69
    }
70
71
    public function init(array $plugins)
72
    {
73
        foreach ($plugins as $pluginId => $plugin) {
74
            /** @var PluginInterface $plugin */
75
76
            $this->info("loading $pluginId");
77
78
            try {
79
                $this->pluginManager->loadPlugin($pluginId, $plugin);
80
            } catch (Exception $e) {
81
                $this->warning("Unhandled Exception while loading $pluginId: ".$e->getMessage());
82
                $this->warning($e->getTraceAsString());
83
            }
84
        }
85
    }
86
87
    public function run()
88
    {
89
        $slack = $this->getSlack();
90
91
        $slack->init();
92
93
        $slack->connect()->then([$this, 'connected']);
94
95
        $this->addMemoryReporting();
96
97
        $this->getLoop()->run();
98
    }
99
100
    public function shutDown()
101
    {
102
        $this->getSlack()->disconnect();
103
        $this->getLoop()->stop();
104
    }
105
106
    public function connected()
107
    {
108
        $slack = $this->getSlack();
109
110
        $slack->update(function(User $authedUser) {
111
            $this->pluginManager->setAuthedUser($authedUser);
112
        });
113
114
        $slack->onEvent('message', [$this, 'onMessage']);
115
116
        $this->monitor = $this->startConnectionMonitor();
117
    }
118
119
    public function onMessage(Payload $payload)
120
    {
121
        $data = $payload->getData();
122
123
        $this->debug('Received message', $data);
124
125
        try {
126
            $this->logMessage($data);
127
            $message = $this->messageFactory->create($data);
128
        } catch (Throwable $throwable) {
129
            $errmsg = $throwable->getMessage()."\n"
130
                .$throwable->getTraceAsString()."\n"
131
                ."Payload data: ".json_encode($data);
132
            $this->warning($errmsg);
133
            return;
134
        }
135
136
        if ($message->isSelf()) {
137
            return;
138
        }
139
140
        $this->pluginManager->dispatchMessage($message);
141
    }
142
143
    public function getHelp() : string
144
    {
145
        return implode("\n", $this->pluginManager->getHelp());
146
    }
147
148
    public function getStatus() : string
149
    {
150
        $statuses = $this->pluginManager->getStatuses();
151
152
        array_unshift($statuses, $this->getFormattedMemoryUsage());
153
154
        return implode("\n", $statuses);
155
    }
156
157
    protected function addMemoryReporting()
158
    {
159
        $now = new DateTime();
160
        $then = new DateTime('+1 hour');
161
        $then->setTime($then->format('H'), 0, 0);
162
        $delay = $then->getTimestamp() - $now->getTimestamp();
163
164
        $this->addTimer($delay, function() {
165
            $this->info($this->getFormattedMemoryUsage());
166
            $this->addPeriodicTimer(3600, function() {
167
                $this->info($this->getFormattedMemoryUsage());
168
            });
169
        });
170
    }
171
172
    protected function getFormattedMemoryUsage() : string
173
    {
174
        $memory = memory_get_usage() / 1024;
175
        $formatted = number_format($memory, 3).'K';
176
        return "Current memory usage: {$formatted}";
177
    }
178
179
    protected function logMessage($data)
180
    {
181
        if ($this->messageLog !== null) {
182
            file_put_contents($this->messageLog, json_encode($data) . "\n", FILE_APPEND);
183
        }
184
    }
185
186
    /**
187
     * @return TimerInterface|null
188
     */
189
    protected function startConnectionMonitor()
190
    {
191
        if ($interval = $this->get('connection_monitor.interval')) {
192
193
            $this->getLog()->info("Monitoring websocket connection every $interval seconds.");
194
            $this->notify("Monitoring websocket connection every $interval seconds.");
195
196
            $this->ping();
197
198
            return $this->loop->addPeriodicTimer($interval, function () {
199
200
                if (!$this->pong) {
201
                    $this->getLog()->error('No pong: reconnecting...');
202
                    $this->reconnect();
203
                }
204
205
                $this->ping();
206
            });
207
        }
208
    }
209
210
    protected function ping()
211
    {
212
        $this->pong = false;
213
214
        $this->getSlack()->ping()
215
            ->then(
216
                function (Payload $payload) {
217
                    $this->getLog()->info($payload->toJson());
218
                    $this->pong = true;
219
                }
220
            );
221
    }
222
223
    protected function reconnect()
224
    {
225
        if ($this->monitor) {
226
            $this->loop->cancelTimer($this->monitor);
227
        }
228
229
        $this->getSlack()->reconnect()->then(
230
            function () {
231
                $this->getLog()->info('Reconnected');
232
                $this->monitor = $this->startConnectionMonitor();
233
            },
234
            function () {
235
                $this->getLog()->error('Reconnect failed, shutting down.');
236
                $this->shutDown();
237
            }
238
        );
239
    }
240
241
    protected function notify(string $message)
242
    {
243
        if ($user = $this->get('notify.user')) {
244
            $this->getSlack()->directMessage($message, $user);
245
        }
246
    }
247
}
248