InputPoller   A
last analyzed

Complexity

Total Complexity 28

Size/Duplication

Total Lines 199
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Importance

Changes 0
Metric Value
dl 0
loc 199
rs 10
c 0
b 0
f 0
wmc 28
lcom 1
cbo 1

5 Methods

Rating   Name   Duplication   Size   Complexity  
B __construct() 0 29 1
C poll_SQS_queues() 0 43 8
B handle_message() 0 30 3
B start_job() 0 52 5
B validate_message() 0 14 11
1
#!/usr/bin/php
2
3
<?php
4
5
/* Copyright (C) 2015, Sport Archive Inc. */
6
7
/* This program is free software; you can redistribute it and/or modify */
8
/* it under the terms of the GNU General Public License as published by */
9
/* the Free Software Foundation; either version 2 of the License, or */
10
/* (at your option) any later version. */
11
12
/* This program is distributed in the hope that it will be useful, */
13
/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
14
/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the */
15
/* GNU General Public License for more details. */
16
17
/* You should have received a copy of the GNU General Public License along */
18
/* with this program; if not, write to the Free Software Foundation, Inc., */
19
/* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
20
21
/* Cloud Processing Engine, Copyright (C) 2015, Sport Archive Inc */
22
/* Cloud Processing Engine comes with ABSOLUTELY NO WARRANTY; */
23
/* This is free software, and you are welcome to redistribute it */
24
/* under certain conditions; */
25
26
/* June 29th 2015 */
27
/* Sport Archive Inc. */
28
/* [email protected] */
29
30
31
32
/**
33
 * This script listen to AWS SQS queues for incoming input commands
34
 * It opens the JSON input and starts a execute a callback corresponding to the command
35
 */
36
37
require_once __DIR__ . "/../vendor/autoload.php";
38
39
use Aws\Swf\Exception;
0 ignored issues
show
Bug introduced by
This use statement conflicts with another class in this namespace, Exception.

Let’s assume that you have a directory layout like this:

.
|-- OtherDir
|   |-- Bar.php
|   `-- Foo.php
`-- SomeDir
    `-- Foo.php

and let’s assume the following content of Bar.php:

// Bar.php
namespace OtherDir;

use SomeDir\Foo; // This now conflicts the class OtherDir\Foo

If both files OtherDir/Foo.php and SomeDir/Foo.php are loaded in the same runtime, you will see a PHP error such as the following:

PHP Fatal error:  Cannot use SomeDir\Foo as Foo because the name is already in use in OtherDir/Foo.php

However, as OtherDir/Foo.php does not necessarily have to be loaded and the error is only triggered if it is loaded before OtherDir/Bar.php, this problem might go unnoticed for a while. In order to prevent this error from surfacing, you must import the namespace with a different alias:

// Bar.php
namespace OtherDir;

use SomeDir\Foo as SomeDirFoo; // There is no conflict anymore.
Loading history...
40
use SA\CpeSdk;
41
42
class InputPoller
43
{
44
    private $debug;
45
    private $config;
46
    private $cpeSqsListener;
47
    private $cpeSqsWriter;
48
    private $cpeSwfHandler;
49
    private $cpeLogger;
50
    private $typesMap;
51
    
52
    const INVALID_JSON   = "INVALID_JSON"; 
53
    const INVALID_CONFIG = "INVALID_CONFIG";
54
    
55
    public function __construct($config)
56
    {
57
        global $debug;
58
        global $cpeLogger;
59
        
60
        $this->config    = $config;
61
        $this->debug     = $debug;
62
        $this->cpeLogger = $cpeLogger;
63
        
64
        // Mapping event/methods
65
        // Events come from clients using the CpeClientSDK
66
        $this->typesMap = [
67
            'START_JOB'            => 'start_job',
68
            'CANCEL_JOB'           => 'cancel_job',
69
            'CANCEL_ACTIVITY'      => 'cancel_activity',
70
            'GET_JOB_LIST'         => 'get_job_list',
71
            'GET_ACTIVITY_LIST'    => 'get_activity_list',
72
            'GET_JOB_STATUS'       => 'get_job_status',
73
            'GET_ACTIVITY_STATUS'  => 'get_activity_status',
74
        ];
75
        
76
        // For listening to the Input SQS queue
77
        $this->cpeSqsListener = new CpeSdk\Sqs\CpeSqsListener($this->debug, $cpeLogger);
78
        // For writing to SQS queue
79
        $this->cpeSqsWriter   = new CpeSdk\Sqs\CpeSqsWriter($this->debug, $cpeLogger);
80
81
        // For creating SWF object 
82
        $this->cpeSwfHandler  = new CpeSdk\Swf\CpeSwfHandler($this->debug, $cpeLogger);
83
    }
84
    
85
    // Poll from the 'input' SQS queue of all clients
86
    // If a msg is received, we pass it to 'handle_input' for processing
87
    public function poll_SQS_queues()
88
    {
89
        if (!isset($this->config->{'clients'}))
90
            throw new CpeSdk\CpeException("Clients configuration invalid. Check the config file or your parameters.",
91
                self::INVALID_CONFIG);
92
        
93
        // For all clients in config files
94
        // We poll from queues
95
        foreach ($this->config->{'clients'} as $client)
96
        {
97
            $msg = null;
98
            $this->cpeLogger->log_out("DEBUG", __DIR__, "Polling from client: " . print_r($client, true));
99
            
100
            // Long Polling messages from client input queue
101
            $queue = $client->{'queues'}->{'input'};
102
            try {
103
                if ($msg = $this->cpeSqsListener->receive_message($queue, 10))
104
                {
105
                    if (!($decoded = json_decode($msg['Body'])))
106
                        $this->cpeLogger->log_out(
107
                            "ERROR", 
108
                            basename(__FILE__), 
109
                            "JSON data invalid in queue: '$queue'");
110
                    else                    
111
                        $this->handle_message($decoded);
112
                }
113
            } catch (CpeSdk\CpeException $e) {
0 ignored issues
show
Bug introduced by
The class SA\CpeSdk\CpeException does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
114
                $this->cpeLogger->log_out(
115
                    "ERROR", 
116
                    basename(__FILE__), 
117
                    $e->getMessage().print_r($msg, true));
118
            } catch (\Exception $e) {
119
                $this->cpeLogger->log_out(
120
                    "ERROR", 
121
                    basename(__FILE__), 
122
                    $e->getMessage().print_r($msg, true));
123
            }
124
            
125
            // Message polled. Valid or not, we delete it from SQS
126
            if ($msg)
127
                $this->cpeSqsListener->delete_message($queue, $msg);
128
        }
129
    }
130
131
    // Receive an input, check if we know the command and exec the callback
132
    public function handle_message($message)
133
    {
134
        $this->validate_message($message);
135
136
        // Do we know this input ?
137
        if (!isset($this->typesMap[$message->{"type"}]))
138
        {
139
            $this->cpeLogger->log_out(
140
                "ERROR", 
141
                basename(__FILE__), 
142
                "Command '" . $message->{"type"} . "' is unknown! Ignoring ..."
143
            );
144
            return;
145
        }
146
147
        $this->cpeLogger->log_out(
148
            "INFO", 
149
            basename(__FILE__), 
150
            "Received message '" . $message->{"type"}  . "'"
151
        );
152
        if ($this->debug)
153
            $this->cpeLogger->log_out(
154
                "DEBUG", 
155
                basename(__FILE__), 
156
                "Details:\n" . json_encode($message, JSON_PRETTY_PRINT)
157
            );
158
159
        // We call the callback function that handles this message  
160
        $this->{$this->typesMap[$message->{"type"}]}($message);
161
    }
162
163
    
164
    /** 
165
     * CALLBACKS
166
     */
167
168
    // Start a new workflow in SWF to initiate new transcoding job
169
    private function start_job($message)
170
    {
171
        if ($this->debug)
172
            $this->cpeLogger->log_out(
173
                "DEBUG",
174
                basename(__FILE__),
175
                "Starting new workflow!"
176
            );
177
        
178
        // Workflow info
179
        $workflowType = array(
180
            "name"    => $message->{'data'}->{'workflow'}->{'name'},
181
            "version" => $message->{'data'}->{'workflow'}->{'version'});
182
        
183
        // Request start SWF workflow
184
        // We only pass $message->{'data'} as input for the WF
185
        // $message->{'data'}->{'workflow'}->{'domain'} MUST be contained in the JSON input
0 ignored issues
show
Unused Code Comprehensibility introduced by
47% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
186
        try {
187
            $workflowId = uniqid('', true);
188
            $payload = array(
189
                "domain"       => $message->{'data'}->{'workflow'}->{'domain'},
190
                "workflowId"   => $workflowId,
191
                "workflowType" => $workflowType,
192
                "taskList"     => array("name" => $message->{'data'}->{'workflow'}->{'taskList'}),
193
                "input"        => json_encode($message->{'data'})
194
            );
195
            if (isset($message->{'data'}->{'workflow'}->{'executionStartToCloseTimeout'})) 
196
                $payload["executionStartToCloseTimeout"] =
197
                    $message->{'data'}->{'workflow'}->{'executionStartToCloseTimeout'};
198
            $workflowRunId = $this->cpeSwfHandler->swf->startWorkflowExecution($payload);
199
200
            $this->cpeLogger->log_out(
201
                "INFO",
202
                basename(__FILE__),
203
                "New workflow submitted to SWF: ".$workflowRunId->get('runId'));
204
205
            // Send WORKFLOW_SCHEDULED message back to client
206
            $this->cpeSqsWriter->workflow_scheduled($workflowType, $workflowRunId->get('runId'), $workflowId, $message);
207
                
208
        } catch (\Aws\Swf\Exception\SwfException $e) {
209
            $this->cpeLogger->log_out(
210
                "ERROR",
211
                basename(__FILE__),
212
                "Unable to start workflow!"
213
                . $e->getMessage());
214
        } catch (\Exception $e) {
215
            $this->cpeLogger->log_out(
216
                "ERROR", 
217
                basename(__FILE__), 
218
                $e->getMessage());
219
        }
220
    }
221
222
    /**
223
     * UTILS
224
     */ 
225
226
    private function validate_message($message)
227
    {
228
        if (!isset($message) || 
229
            !isset($message->{"time"})   || $message->{"time"} == "" || 
230
            !isset($message->{"jobId"})  || $message->{"jobId"} == "" || 
231
            !isset($message->{"type"})   || $message->{"type"} == "" || 
232
            !isset($message->{"data"})   || $message->{"data"} == "")
233
            throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!",
234
                self::INVALID_JSON);
235
        
236
        if (!isset($message->{'data'}->{'workflow'}))
237
            throw new CpeSdk\CpeException("Input doesn't contain any workflow information. You must provide the workflow you want to send this job to!",
238
                self::INVALID_JSON);
239
    }
240
}
241
242
243
/**
244
 * INPUT POLLER START
245
 */
246
247
$debug = false;
248
$cpeLogger;
249
250 View Code Duplication
function usage($defaultConfigFile)
0 ignored issues
show
Best Practice introduced by
The function usage() has been defined more than once; this definition is ignored, only the first definition in client_example/ClientCommander.php (L110-123) is considered.

This check looks for functions that have already been defined in other files.

Some Codebases, like WordPress, make a practice of defining functions multiple times. This may lead to problems with the detection of function parameters and types. If you really need to do this, you can mark the duplicate definition with the @ignore annotation.

/**
 * @ignore
 */
function getUser() {

}

function getUser($id, $realm) {

}

See also the PhpDoc documentation for @ignore.

Loading history...
Unused Code introduced by
The parameter $defaultConfigFile is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Duplication introduced by
This function seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
251
{
252
    echo("# Description\nThe InputPoller connects your client applications to your stack. It handles communication over the two SQS channels you created for your client application.\n\n");
253
    echo("Usage: php ". basename(__FILE__) . " [-h] [-d] -n <client_name> [-l <log path>]\n");
254
    echo("-h: Print this help\n");
255
    echo("-d: Debug mode\n");
256
    echo("-l <log_path>: Location where logs will be dumped in (folder).\n");
257
    echo("-n <client_name> [mandatory]: The name of the client application that will use this Poller to communicate with the stack. The client will be sending and listenening to the Stack through SQS. We expect the INPUT_QUEUE and OUTPUT_QUEUE environment variables set to the URL of the SQS queues.\n");
258
    exit(0);
259
}
260
261
function check_input_parameters()
0 ignored issues
show
Best Practice introduced by
The function check_input_parameters() has been defined more than once; this definition is ignored, only the first definition in client_example/ClientCommander.php (L63-108) is considered.

This check looks for functions that have already been defined in other files.

Some Codebases, like WordPress, make a practice of defining functions multiple times. This may lead to problems with the detection of function parameters and types. If you really need to do this, you can mark the duplicate definition with the @ignore annotation.

/**
 * @ignore
 */
function getUser() {

}

function getUser($id, $realm) {

}

See also the PhpDoc documentation for @ignore.

Loading history...
262
{
263
    global $debug;
264
    global $cpeLogger;
265
    
266
    // Handle input parameters
267
    $options = getopt("l:hdn:");
268
269
    if (isset($options['h']))
270
        usage();
271
    
272
    if (isset($options['d'])) 
273
        $debug = true;
274
        
275
    $logPath = null;
276
    if (isset($options['l']))
277
    {
278
        $logPath = $options['l'];
279
    }
280
    
281
    if (!isset($options['n']))
282
    {
283
        print "[ERROR] You must provide a [-n client_name] parameter to provide your client application name.\n";
284
        exit(1);
285
    }
286
    
287
    $cpeLogger = new CpeSdk\CpeLogger($logPath, $options['n'], $debug);
288
        
289
    $config = new \stdClass;
290
    $config->clients = [(object)[
291
            'name' => $options['n'],
292
            'queues' => (object)[
293
                'input' => getenv('INPUT_QUEUE'),
294
                'output' => getenv('OUTPUT_QUEUE'),
295
            ],
296
        ]];
297
298
    # Validate against JSON Schemas
299
    # if (($err = validate_json($config, "config/mainConfig.json")))
0 ignored issues
show
Unused Code Comprehensibility introduced by
62% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
300
    # exit("JSON main configuration file invalid! Details:\n".$err);
0 ignored issues
show
Unused Code Comprehensibility introduced by
75% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
301
302
    return $config;
303
}
304
305
$config = check_input_parameters();
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $config is correct as check_input_parameters() seems to always returns null.
Loading history...
306
$cpeLogger->log_out("INFO", basename(__FILE__), $config);
307
308
// Create InputPoller object
309
try {
310
    $inputPoller = new InputPoller($config);
311
}
312
catch (CpeSdk\CpeException $e) {
0 ignored issues
show
Bug introduced by
The class SA\CpeSdk\CpeException does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
313
    echo $e->getMessage();
314
    $cpeLogger->log_out(
315
        "FATAL", 
316
        basename(__FILE__), 
317
        $e->getMessage()
318
    );
319
    exit(1);
320
}
321
322
$cpeLogger->log_out("INFO", __DIR__, "Start Listening.");
323
print "Start polling ...\n";
324
325
// Start polling loop to get incoming commands from SQS input queues
326
while (42)
327
    $inputPoller->poll_SQS_queues();
328