Completed
Push — master ( c0f953...e85667 )
by Alexey
37:08
created

CommandTrait::actionProcess()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 2
1
<?php
2
3
namespace yiicod\socketio\commands;
4
5
use Symfony\Component\Process\Process;
6
use Yii;
7
use yii\helpers\ArrayHelper;
8
use yii\helpers\Console;
9
use yii\helpers\Json;
10
use yiicod\socketio\Broadcast;
11
12
trait CommandTrait
13
{
14
    /**
15
     * @var string
16
     */
17
    public $server = 'locahost:1212';
18
19
    /**
20
     * [
21
     *     key => 'path to key',
22
     *     cert => 'path to cert',
23
     * ]
24
     *
25
     * @var array
26
     */
27
    public $ssl = [];}
28
29
    /**
30
     * Process job by id and connection
31
     */
32
    public function actionProcess($handler, $data)
0 ignored issues
show
Bug introduced by
This code did not parse for me. Apparently, there is an error somewhere around this line:

Syntax error, unexpected T_PUBLIC
Loading history...
33
    {
34
        Broadcast::process($handler, @unserialize($data) ?? []);
35
    }
36
37
    public function nodejs()
38
    {
39
        // Automatically send every new message to available log routes
40
        Yii::getLogger()->flushInterval = 1;
41
42
        $cmd = sprintf('node %s/%s', realpath(dirname(__FILE__) . '/../server'), 'index.js');
43
        $args = array_filter([
44
            'server' => $this->server,
45
            'pub' => json_encode(array_filter([
46
                'host' => Broadcast::getDriver()->hostname,
47
                'port' => Broadcast::getDriver()->port,
48
                'password' => Broadcast::getDriver()->password,
49
            ])),
50
            'sub' => json_encode(array_filter([
51
                'host' => Broadcast::getDriver()->hostname,
52
                'port' => Broadcast::getDriver()->port,
53
                'password' => Broadcast::getDriver()->password,
54
            ])),
55
            'channels' => implode(',', Broadcast::channels()),
56
            'nsp' => Broadcast::getManager()->nsp,
57
            'ssl' => empty($this->ssl) ? null : json_encode($this->ssl),
58
            'runtime' => Yii::getAlias('@runtime/logs'),
59
        ], 'strlen');
60
        foreach ($args as $key => $value) {
61
            $cmd .= ' -' . $key . '=\'' . $value . '\'';
62
        }
63
64
        $process = new Process($cmd);
65
66
        return $process;
67
    }
68
69
    /**
70
     * Predis proccess
71
     */
72
    public function predis()
73
    {
74
        $pubSubLoop = function () {
75
            $client = Broadcast::getDriver()->getConnection(true);
76
77
            // Initialize a new pubsub consumer.
78
            $pubsub = $client->pubSubLoop();
79
80
            $channels = [];
81
            foreach (Broadcast::channels() as $key => $channel) {
82
                $channels[$key] = $channel . '.io';
83
            }
84
85
            // Subscribe to your channels
86
            $pubsub->subscribe(ArrayHelper::merge(['control_channel'], $channels));
87
88
            // Start processing the pubsup messages. Open a terminal and use redis-cli
89
            // to push messages to the channels. Examples:
90
            //   ./redis-cli PUBLISH notifications "this is a test"
91
            //   ./redis-cli PUBLISH control_channel quit_loop
92
            foreach ($pubsub as $message) {
93
                switch ($message->kind) {
94
                    case 'subscribe':
95
                        $this->output("Subscribed to {$message->channel}\n");
96
                        break;
97
                    case 'message':
98
                        if ('control_channel' == $message->channel) {
99
                            if ('quit_loop' == $message->payload) {
100
                                $this->output("Aborting pubsub loop...\n", Console::FG_RED);
101
                                $pubsub->unsubscribe();
102
                            } else {
103
                                $this->output("Received an unrecognized command: {$message->payload}\n", Console::FG_RED);
104
                            }
105
                        } else {
106
                            $payload = Json::decode($message->payload);
107
                            $data = $payload['data'] ?? [];
108
109
//                            $pid = pcntl_fork();
110
//                            if ($pid == -1) {
111
//                                exit('Error while forking process.');
112
//                            } elseif ($pid) {
113
//                                //parent. Wait for the child and continues
114
//                                pcntl_wait($status);
115
//                                $exitStatus = pcntl_wexitstatus($status);
116
//                                if ($exitStatus !== 0) {
117
//                                    //put job back to queue or other stuff
118
//                                }
119
//                            }else {
120
                            Broadcast::on($payload['name'], $data);
121
//                                Yii::$app->end();
122
//                            }
123
                            // Received the following message from {$message->channel}:") {$message->payload}";
124
                        }
125
                        break;
126
                }
127
            }
128
129
            // Always unset the pubsub consumer instance when you are done! The
130
            // class destructor will take care of cleanups and prevent protocol
131
            // desynchronizations between the client and the server.
132
            unset($pubsub);
133
        };
134
135
        // Auto recconnect on redis timeout
136
        try {
137
            $pubSubLoop();
138
        } catch (\Predis\Connection\ConnectionException $e) {
139
            $pubSubLoop();
140
        }
141
142
        return true;
143
    }
144
}
145