QueueTask::eventsAction()   A
last analyzed

Complexity

Conditions 6
Paths 1

Size

Total Lines 31
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
cc 6
eloc 13
nc 1
nop 0
dl 0
loc 31
ccs 0
cts 17
cp 0
crap 42
rs 9.2222
c 0
b 0
f 0
1
<?php
2
3
namespace Canvas\Cli\Tasks;
4
5
use Canvas\Contracts\Queue\QueueableJobInterface;
6
use Phalcon\Cli\Task as PhTask;
0 ignored issues
show
Bug introduced by
The type Phalcon\Cli\Task was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
7
use Canvas\Models\Users;
8
use Canvas\Queue\Queue;
9
use Phalcon\Mvc\Model;
0 ignored issues
show
Bug introduced by
The type Phalcon\Mvc\Model was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use Throwable;
11
12
/**
13
 * CLI To send push notification and pusher msg.
14
 *
15
 * @package Canvas\Cli\Tasks
16
 *
17
 * @property Config $config
18
 * @property \Pusher\Pusher $pusher
19
 * @property \Monolog\Logger $log
20
 * @property Channel $channel
21
 * @property Queue $queue
22
 *
23
 */
24
class QueueTask extends PhTask
25
{
26
    /**
27
     * Queue action for mobile notifications.
28
     * @return void
29
     */
30
    public function mainAction(array $params): void
31
    {
32
        echo 'Canvas Ecosystem Queue Jobs: events | notifications | jobs' . PHP_EOL;
33
    }
34
35
    /**
36
     * Queue to process internal Canvas Events.
37
     *
38
     * @return void
39
     */
40
    public function eventsAction()
41
    {
42
        $callback = function ($msg) {
43
            //check the db before running anything
44
            if (!$this->isDbConnected('db')) {
45
                return ;
46
            }
47
48
            if ($this->di->has('dblocal')) {
49
                if (!$this->isDbConnected('dblocal')) {
50
                    return ;
51
                }
52
            }
53
54
            //we get the data from our event trigger and unserialize
55
            $event = unserialize($msg->body);
56
57
            //overwrite the user who is running this process
58
            if (isset($event['userData']) && $event['userData'] instanceof Users) {
59
                $this->di->setShared('userData', $event['userData']);
60
            }
61
62
            //lets fire the event
63
            $this->events->fire($event['event'], $event['source'], $event['data']);
64
65
            $this->log->info(
66
                "Notification ({$event['event']}) - Process ID " . $msg->delivery_info['consumer_tag']
67
            );
68
        };
69
70
        Queue::process(QUEUE::EVENTS, $callback);
71
    }
72
73
    /**
74
     * Queue to process internal Canvas Events.
75
     *
76
     * @return void
77
     */
78
    public function notificationsAction()
79
    {
80
        $callback = function ($msg) {
81
            //check the db before running anything
82
            if (!$this->isDbConnected('db')) {
83
                return ;
84
            }
85
86
            if ($this->di->has('dblocal')) {
87
                if (!$this->isDbConnected('dblocal')) {
88
                    return ;
89
                }
90
            }
91
92
            //we get the data from our event trigger and unserialize
93
            $notification = unserialize($msg->body);
94
95
            //overwrite the user who is running this process
96
            if ($notification['from'] instanceof Users) {
97
                $this->di->setShared('userData', $notification['from']);
98
            }
99
100
            if (!$notification['to'] instanceof Users) {
101
                echo 'Attribute TO has to be a User' . PHP_EOL;
102
                return;
103
            }
104
105
            if (!class_exists($notification['notification'])) {
106
                echo 'Attribute notification has to be a Notificatoin' . PHP_EOL;
107
                return;
108
            }
109
            $notificationClass = $notification['notification'];
110
111
            if (!$notification['entity'] instanceof Model) {
112
                echo 'Attribute entity has to be a Model' . PHP_EOL;
113
                return;
114
            }
115
116
            $user = $notification['to'];
117
118
            //instance notification and pass the entity
119
            $notification = new $notification['notification']($notification['entity']);
120
            //disable the queue so we process it now
121
            $notification->disableQueue();
122
123
            //run notify for the specifiy user
124
            $user->notify($notification);
125
126
            $this->log->info(
127
                "Notification ({$notificationClass}) sent to {$user->email} - Process ID " . $msg->delivery_info['consumer_tag']
128
            );
129
        };
130
131
        Queue::process(QUEUE::NOTIFICATIONS, $callback);
132
    }
133
134
    /**
135
     * Queue to process Canvas Jobs.
136
     *
137
     * @return void
138
     */
139
    public function jobsAction(array $params)
140
    {
141
        $queue = !isset($params[0]) ? QUEUE::JOBS : $params[0];
142
143
        $callback = function ($msg) {
144
            //check the db before running anything
145
            if (!$this->isDbConnected('db')) {
146
                return ;
147
            }
148
149
            if ($this->di->has('dblocal')) {
150
                if (!$this->isDbConnected('dblocal')) {
151
                    return ;
152
                }
153
            }
154
155
            //we get the data from our event trigger and unserialize
156
            $job = unserialize($msg->body);
157
158
            //overwrite the user who is running this process
159
            if ($job['userData'] instanceof Users) {
160
                $this->di->setShared('userData', $job['userData']);
161
            }
162
163
            if (!class_exists($job['class'])) {
164
                echo 'No Job class found' . PHP_EOL;
165
                $this->log->error('No Job class found ' . $job['class']);
166
                return;
167
            }
168
169
            if (!$job['job'] instanceof QueueableJobInterface) {
170
                echo 'This Job is not queable ' . $msg->delivery_info['consumer_tag'] ;
171
                $this->log->error('This Job is not queable ' . $msg->delivery_info['consumer_tag']);
172
                return;
173
            }
174
175
            /**
176
             * swoole coroutine.
177
             */
178
            go(function () use ($job, $msg) {
0 ignored issues
show
Bug introduced by
The function go was not found. Maybe you did not declare it correctly or list all dependencies? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

178
            /** @scrutinizer ignore-call */ 
179
            go(function () use ($job, $msg) {
Loading history...
179
                //instance notification and pass the entity
180
                $result = $job['job']->handle();
181
182
                $this->log->info(
183
                    "Job ({$job['class']}) ran for {$job['userData']->getEmail()} - Process ID " . $msg->delivery_info['consumer_tag'],
184
                    [$result]
185
                );
186
            });
187
        };
188
189
        Queue::process($queue, $callback);
190
    }
191
192
    /**
193
     * Confirm if the db is connected.
194
     *
195
     * @return boolean
196
     */
197
    protected function isDbConnected(string $dbProvider): bool
198
    {
199
        try {
200
            $this->di->get($dbProvider)->fetchAll('SELECT 1');
201
        } catch (Throwable $e) {
202
            if (strpos($e->getMessage(), 'MySQL server has gone away') !== false) {
203
                $this->di->get($dbProvider)->connect();
204
                return true;
205
            }
206
            return false;
207
        }
208
        return true;
209
    }
210
}
211