1
|
|
|
#!/usr/bin/php |
2
|
|
|
|
3
|
|
|
<?php |
4
|
|
|
|
5
|
|
|
/* Copyright (C) 2015, Sport Archive Inc. */ |
6
|
|
|
|
7
|
|
|
/* This program is free software; you can redistribute it and/or modify */ |
8
|
|
|
/* it under the terms of the GNU General Public License as published by */ |
9
|
|
|
/* the Free Software Foundation; either version 2 of the License, or */ |
10
|
|
|
/* (at your option) any later version. */ |
11
|
|
|
|
12
|
|
|
/* This program is distributed in the hope that it will be useful, */ |
13
|
|
|
/* but WITHOUT ANY WARRANTY; without even the implied warranty of */ |
14
|
|
|
/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ |
15
|
|
|
/* GNU General Public License for more details. */ |
16
|
|
|
|
17
|
|
|
/* You should have received a copy of the GNU General Public License along */ |
18
|
|
|
/* with this program; if not, write to the Free Software Foundation, Inc., */ |
19
|
|
|
/* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ |
20
|
|
|
|
21
|
|
|
/* Cloud Processing Engine, Copyright (C) 2015, Sport Archive Inc */ |
22
|
|
|
/* Cloud Processing Engine comes with ABSOLUTELY NO WARRANTY; */ |
23
|
|
|
/* This is free software, and you are welcome to redistribute it */ |
24
|
|
|
/* under certain conditions; */ |
25
|
|
|
|
26
|
|
|
/* June 29th 2015 */ |
27
|
|
|
/* Sport Archive Inc. */ |
28
|
|
|
/* [email protected] */ |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* This script listen to AWS SQS queues for incoming input commands |
34
|
|
|
* It opens the JSON input and starts a execute a callback corresponding to the command |
35
|
|
|
*/ |
36
|
|
|
|
37
|
|
|
require_once __DIR__ . "/../vendor/autoload.php"; |
38
|
|
|
|
39
|
|
|
use Aws\Swf\Exception; |
|
|
|
|
40
|
|
|
use SA\CpeSdk; |
41
|
|
|
|
42
|
|
|
class InputPoller |
43
|
|
|
{ |
44
|
|
|
private $debug; |
45
|
|
|
private $config; |
46
|
|
|
private $cpeSqsListener; |
47
|
|
|
private $cpeSqsWriter; |
48
|
|
|
private $cpeSwfHandler; |
49
|
|
|
private $cpeLogger; |
50
|
|
|
private $typesMap; |
51
|
|
|
|
52
|
|
|
const INVALID_JSON = "INVALID_JSON"; |
53
|
|
|
const INVALID_CONFIG = "INVALID_CONFIG"; |
54
|
|
|
|
55
|
|
|
public function __construct($config) |
56
|
|
|
{ |
57
|
|
|
global $debug; |
58
|
|
|
global $cpeLogger; |
59
|
|
|
|
60
|
|
|
$this->config = $config; |
61
|
|
|
$this->debug = $debug; |
62
|
|
|
$this->cpeLogger = $cpeLogger; |
63
|
|
|
|
64
|
|
|
// Mapping event/methods |
65
|
|
|
// Events come from clients using the CpeClientSDK |
66
|
|
|
$this->typesMap = [ |
67
|
|
|
'START_JOB' => 'start_job', |
68
|
|
|
'CANCEL_JOB' => 'cancel_job', |
69
|
|
|
'CANCEL_ACTIVITY' => 'cancel_activity', |
70
|
|
|
'GET_JOB_LIST' => 'get_job_list', |
71
|
|
|
'GET_ACTIVITY_LIST' => 'get_activity_list', |
72
|
|
|
'GET_JOB_STATUS' => 'get_job_status', |
73
|
|
|
'GET_ACTIVITY_STATUS' => 'get_activity_status', |
74
|
|
|
]; |
75
|
|
|
|
76
|
|
|
// For listening to the Input SQS queue |
77
|
|
|
$this->cpeSqsListener = new CpeSdk\Sqs\CpeSqsListener($this->debug, $cpeLogger); |
78
|
|
|
// For writing to SQS queue |
79
|
|
|
$this->cpeSqsWriter = new CpeSdk\Sqs\CpeSqsWriter($this->debug, $cpeLogger); |
80
|
|
|
|
81
|
|
|
// For creating SWF object |
82
|
|
|
$this->cpeSwfHandler = new CpeSdk\Swf\CpeSwfHandler($this->debug, $cpeLogger); |
83
|
|
|
} |
84
|
|
|
|
85
|
|
|
// Poll from the 'input' SQS queue of all clients |
86
|
|
|
// If a msg is received, we pass it to 'handle_input' for processing |
87
|
|
|
public function poll_SQS_queues() |
88
|
|
|
{ |
89
|
|
|
if (!isset($this->config->{'clients'})) |
90
|
|
|
throw new CpeSdk\CpeException("Clients configuration invalid. Check the config file or your parameters.", |
91
|
|
|
self::INVALID_CONFIG); |
92
|
|
|
|
93
|
|
|
// For all clients in config files |
94
|
|
|
// We poll from queues |
95
|
|
|
foreach ($this->config->{'clients'} as $client) |
96
|
|
|
{ |
97
|
|
|
$msg = null; |
98
|
|
|
$this->cpeLogger->log_out("DEBUG", __DIR__, "Polling from client: " . print_r($client, true)); |
99
|
|
|
|
100
|
|
|
// Long Polling messages from client input queue |
101
|
|
|
$queue = $client->{'queues'}->{'input'}; |
102
|
|
|
try { |
103
|
|
|
if ($msg = $this->cpeSqsListener->receive_message($queue, 10)) |
104
|
|
|
{ |
105
|
|
|
if (!($decoded = json_decode($msg['Body']))) |
106
|
|
|
$this->cpeLogger->log_out( |
107
|
|
|
"ERROR", |
108
|
|
|
basename(__FILE__), |
109
|
|
|
"JSON data invalid in queue: '$queue'"); |
110
|
|
|
else |
111
|
|
|
$this->handle_message($decoded); |
112
|
|
|
} |
113
|
|
|
} catch (CpeSdk\CpeException $e) { |
|
|
|
|
114
|
|
|
$this->cpeLogger->log_out( |
115
|
|
|
"ERROR", |
116
|
|
|
basename(__FILE__), |
117
|
|
|
$e->getMessage().print_r($msg, true)); |
118
|
|
|
} catch (\Exception $e) { |
119
|
|
|
$this->cpeLogger->log_out( |
120
|
|
|
"ERROR", |
121
|
|
|
basename(__FILE__), |
122
|
|
|
$e->getMessage().print_r($msg, true)); |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
// Message polled. Valid or not, we delete it from SQS |
126
|
|
|
if ($msg) |
127
|
|
|
$this->cpeSqsListener->delete_message($queue, $msg); |
128
|
|
|
} |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
// Receive an input, check if we know the command and exec the callback |
132
|
|
|
public function handle_message($message) |
133
|
|
|
{ |
134
|
|
|
$this->validate_message($message); |
135
|
|
|
|
136
|
|
|
// Do we know this input ? |
137
|
|
|
if (!isset($this->typesMap[$message->{"type"}])) |
138
|
|
|
{ |
139
|
|
|
$this->cpeLogger->log_out( |
140
|
|
|
"ERROR", |
141
|
|
|
basename(__FILE__), |
142
|
|
|
"Command '" . $message->{"type"} . "' is unknown! Ignoring ..." |
143
|
|
|
); |
144
|
|
|
return; |
145
|
|
|
} |
146
|
|
|
|
147
|
|
|
$this->cpeLogger->log_out( |
148
|
|
|
"INFO", |
149
|
|
|
basename(__FILE__), |
150
|
|
|
"Received message '" . $message->{"type"} . "'" |
151
|
|
|
); |
152
|
|
|
if ($this->debug) |
153
|
|
|
$this->cpeLogger->log_out( |
154
|
|
|
"DEBUG", |
155
|
|
|
basename(__FILE__), |
156
|
|
|
"Details:\n" . json_encode($message, JSON_PRETTY_PRINT) |
157
|
|
|
); |
158
|
|
|
|
159
|
|
|
// We call the callback function that handles this message |
160
|
|
|
$this->{$this->typesMap[$message->{"type"}]}($message); |
161
|
|
|
} |
162
|
|
|
|
163
|
|
|
|
164
|
|
|
/** |
165
|
|
|
* CALLBACKS |
166
|
|
|
*/ |
167
|
|
|
|
168
|
|
|
// Start a new workflow in SWF to initiate new transcoding job |
169
|
|
|
private function start_job($message) |
170
|
|
|
{ |
171
|
|
|
if ($this->debug) |
172
|
|
|
$this->cpeLogger->log_out( |
173
|
|
|
"DEBUG", |
174
|
|
|
basename(__FILE__), |
175
|
|
|
"Starting new workflow!" |
176
|
|
|
); |
177
|
|
|
|
178
|
|
|
// Workflow info |
179
|
|
|
$workflowType = array( |
180
|
|
|
"name" => $message->{'data'}->{'workflow'}->{'name'}, |
181
|
|
|
"version" => $message->{'data'}->{'workflow'}->{'version'}); |
182
|
|
|
|
183
|
|
|
// Request start SWF workflow |
184
|
|
|
// We only pass $message->{'data'} as input for the WF |
185
|
|
|
// $message->{'data'}->{'workflow'}->{'domain'} MUST be contained in the JSON input |
|
|
|
|
186
|
|
|
try { |
187
|
|
|
$workflowId = uniqid('', true); |
188
|
|
|
$payload = array( |
189
|
|
|
"domain" => $message->{'data'}->{'workflow'}->{'domain'}, |
190
|
|
|
"workflowId" => $workflowId, |
191
|
|
|
"workflowType" => $workflowType, |
192
|
|
|
"taskList" => array("name" => $message->{'data'}->{'workflow'}->{'taskList'}), |
193
|
|
|
"input" => json_encode($message->{'data'}) |
194
|
|
|
); |
195
|
|
|
if (isset($message->{'data'}->{'workflow'}->{'executionStartToCloseTimeout'})) |
196
|
|
|
$payload["executionStartToCloseTimeout"] = |
197
|
|
|
$message->{'data'}->{'workflow'}->{'executionStartToCloseTimeout'}; |
198
|
|
|
$workflowRunId = $this->cpeSwfHandler->swf->startWorkflowExecution($payload); |
199
|
|
|
|
200
|
|
|
$this->cpeLogger->log_out( |
201
|
|
|
"INFO", |
202
|
|
|
basename(__FILE__), |
203
|
|
|
"New workflow submitted to SWF: ".$workflowRunId->get('runId')); |
204
|
|
|
|
205
|
|
|
// Send WORKFLOW_SCHEDULED message back to client |
206
|
|
|
$this->cpeSqsWriter->workflow_scheduled($workflowType, $workflowRunId->get('runId'), $workflowId, $message); |
207
|
|
|
|
208
|
|
|
} catch (\Aws\Swf\Exception\SwfException $e) { |
209
|
|
|
$this->cpeLogger->log_out( |
210
|
|
|
"ERROR", |
211
|
|
|
basename(__FILE__), |
212
|
|
|
"Unable to start workflow!" |
213
|
|
|
. $e->getMessage()); |
214
|
|
|
} catch (\Exception $e) { |
215
|
|
|
$this->cpeLogger->log_out( |
216
|
|
|
"ERROR", |
217
|
|
|
basename(__FILE__), |
218
|
|
|
$e->getMessage()); |
219
|
|
|
} |
220
|
|
|
} |
221
|
|
|
|
222
|
|
|
/** |
223
|
|
|
* UTILS |
224
|
|
|
*/ |
225
|
|
|
|
226
|
|
|
private function validate_message($message) |
227
|
|
|
{ |
228
|
|
|
if (!isset($message) || |
229
|
|
|
!isset($message->{"time"}) || $message->{"time"} == "" || |
230
|
|
|
!isset($message->{"jobId"}) || $message->{"jobId"} == "" || |
231
|
|
|
!isset($message->{"type"}) || $message->{"type"} == "" || |
232
|
|
|
!isset($message->{"data"}) || $message->{"data"} == "") |
233
|
|
|
throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!", |
234
|
|
|
self::INVALID_JSON); |
235
|
|
|
|
236
|
|
|
if (!isset($message->{'data'}->{'workflow'})) |
237
|
|
|
throw new CpeSdk\CpeException("Input doesn't contain any workflow information. You must provide the workflow you want to send this job to!", |
238
|
|
|
self::INVALID_JSON); |
239
|
|
|
} |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
|
243
|
|
|
/** |
244
|
|
|
* INPUT POLLER START |
245
|
|
|
*/ |
246
|
|
|
|
247
|
|
|
$debug = false; |
248
|
|
|
$cpeLogger; |
249
|
|
|
|
250
|
|
View Code Duplication |
function usage($defaultConfigFile) |
|
|
|
|
251
|
|
|
{ |
252
|
|
|
echo("# Description\nThe InputPoller connects your client applications to your stack. It handles communication over the two SQS channels you created for your client application.\n\n"); |
253
|
|
|
echo("Usage: php ". basename(__FILE__) . " [-h] [-d] -n <client_name> [-l <log path>]\n"); |
254
|
|
|
echo("-h: Print this help\n"); |
255
|
|
|
echo("-d: Debug mode\n"); |
256
|
|
|
echo("-l <log_path>: Location where logs will be dumped in (folder).\n"); |
257
|
|
|
echo("-n <client_name> [mandatory]: The name of the client application that will use this Poller to communicate with the stack. The client will be sending and listenening to the Stack through SQS. We expect the INPUT_QUEUE and OUTPUT_QUEUE environment variables set to the URL of the SQS queues.\n"); |
258
|
|
|
exit(0); |
259
|
|
|
} |
260
|
|
|
|
261
|
|
|
function check_input_parameters() |
|
|
|
|
262
|
|
|
{ |
263
|
|
|
global $debug; |
264
|
|
|
global $cpeLogger; |
265
|
|
|
|
266
|
|
|
// Handle input parameters |
267
|
|
|
$options = getopt("l:hdn:"); |
268
|
|
|
|
269
|
|
|
if (isset($options['h'])) |
270
|
|
|
usage(); |
271
|
|
|
|
272
|
|
|
if (isset($options['d'])) |
273
|
|
|
$debug = true; |
274
|
|
|
|
275
|
|
|
$logPath = null; |
276
|
|
|
if (isset($options['l'])) |
277
|
|
|
{ |
278
|
|
|
$logPath = $options['l']; |
279
|
|
|
} |
280
|
|
|
|
281
|
|
|
if (!isset($options['n'])) |
282
|
|
|
{ |
283
|
|
|
print "[ERROR] You must provide a [-n client_name] parameter to provide your client application name.\n"; |
284
|
|
|
exit(1); |
285
|
|
|
} |
286
|
|
|
|
287
|
|
|
$cpeLogger = new CpeSdk\CpeLogger($logPath, $options['n'], $debug); |
288
|
|
|
|
289
|
|
|
$config = new \stdClass; |
290
|
|
|
$config->clients = [(object)[ |
291
|
|
|
'name' => $options['n'], |
292
|
|
|
'queues' => (object)[ |
293
|
|
|
'input' => getenv('INPUT_QUEUE'), |
294
|
|
|
'output' => getenv('OUTPUT_QUEUE'), |
295
|
|
|
], |
296
|
|
|
]]; |
297
|
|
|
|
298
|
|
|
# Validate against JSON Schemas |
299
|
|
|
# if (($err = validate_json($config, "config/mainConfig.json"))) |
|
|
|
|
300
|
|
|
# exit("JSON main configuration file invalid! Details:\n".$err); |
|
|
|
|
301
|
|
|
|
302
|
|
|
return $config; |
303
|
|
|
} |
304
|
|
|
|
305
|
|
|
$config = check_input_parameters(); |
|
|
|
|
306
|
|
|
$cpeLogger->log_out("INFO", basename(__FILE__), $config); |
307
|
|
|
|
308
|
|
|
// Create InputPoller object |
309
|
|
|
try { |
310
|
|
|
$inputPoller = new InputPoller($config); |
311
|
|
|
} |
312
|
|
|
catch (CpeSdk\CpeException $e) { |
|
|
|
|
313
|
|
|
echo $e->getMessage(); |
314
|
|
|
$cpeLogger->log_out( |
315
|
|
|
"FATAL", |
316
|
|
|
basename(__FILE__), |
317
|
|
|
$e->getMessage() |
318
|
|
|
); |
319
|
|
|
exit(1); |
320
|
|
|
} |
321
|
|
|
|
322
|
|
|
$cpeLogger->log_out("INFO", __DIR__, "Start Listening."); |
323
|
|
|
print "Start polling ...\n"; |
324
|
|
|
|
325
|
|
|
// Start polling loop to get incoming commands from SQS input queues |
326
|
|
|
while (42) |
327
|
|
|
$inputPoller->poll_SQS_queues(); |
328
|
|
|
|
Let’s assume that you have a directory layout like this:
and let’s assume the following content of
Bar.php
:If both files
OtherDir/Foo.php
andSomeDir/Foo.php
are loaded in the same runtime, you will see a PHP error such as the following:PHP Fatal error: Cannot use SomeDir\Foo as Foo because the name is already in use in OtherDir/Foo.php
However, as
OtherDir/Foo.php
does not necessarily have to be loaded and the error is only triggered if it is loaded beforeOtherDir/Bar.php
, this problem might go unnoticed for a while. In order to prevent this error from surfacing, you must import the namespace with a different alias: