@@ -100,7 +100,7 @@ discard block |
||
100 | 100 | // Poll from all the taskList registered for each activities |
101 | 101 | if ($this->debug) |
102 | 102 | $this->cpeLogger->log_out("DEBUG", basename(__FILE__), |
103 | - "Polling activity taskList '" . $this->taskList . "' ... "); |
|
103 | + "Polling activity taskList '" . $this->taskList . "' ... "); |
|
104 | 104 | |
105 | 105 | try { |
106 | 106 | // Call SWF and poll for incoming tasks |
@@ -201,9 +201,9 @@ discard block |
||
201 | 201 | if (!file_exists($activityToHandle->{"file"})) |
202 | 202 | { |
203 | 203 | $this->cpeLogger->log_out("ERROR", basename(__FILE__), |
204 | - "The code file '".$activityToHandle->{"file"}."' for activity: name=" |
|
204 | + "The code file '" . $activityToHandle->{"file"} . "' for activity: name=" |
|
205 | 205 | . $activityToHandle->{"name"} . ",version=" |
206 | - . $activityToHandle->{"version"}." doesn't exists! Check if the file is accessible and if the path is correct in your config file."); |
|
206 | + . $activityToHandle->{"version"} . " doesn't exists! Check if the file is accessible and if the path is correct in your config file."); |
|
207 | 207 | return false; |
208 | 208 | } |
209 | 209 | |
@@ -242,7 +242,7 @@ discard block |
||
242 | 242 | $this->cpeLogger->log_out("ERROR", basename(__FILE__), |
243 | 243 | "No Activity handler was found for: name=" |
244 | 244 | . $this->activityName . ",version=" |
245 | - . $this->activityVersion.". Check your config file and ensure your 'activity' name AND 'version' is there."); |
|
245 | + . $this->activityVersion . ". Check your config file and ensure your 'activity' name AND 'version' is there."); |
|
246 | 246 | return false; |
247 | 247 | } |
248 | 248 | } |
@@ -262,7 +262,7 @@ discard block |
||
262 | 262 | // Usage |
263 | 263 | function usage($defaultConfigFile) |
264 | 264 | { |
265 | - echo("Usage: php ". basename(__FILE__) . " -D <domain> -A <activity_name> -V <activity_version> [-T <task_list>] [-h] [-d] [-c <config_file path>] [-l <log path>]\n"); |
|
265 | + echo("Usage: php " . basename(__FILE__) . " -D <domain> -A <activity_name> -V <activity_version> [-T <task_list>] [-h] [-d] [-c <config_file path>] [-l <log path>]\n"); |
|
266 | 266 | echo("-h: Print this help\n"); |
267 | 267 | echo("-d: Debug mode\n"); |
268 | 268 | echo("-c <config_file path>: Optional parameter to override the default configuration file: '$defaultConfigFile'.\n"); |
@@ -331,7 +331,7 @@ discard block |
||
331 | 331 | // Tasklist |
332 | 332 | if (!isset($options['T'])) |
333 | 333 | { |
334 | - $taskList = $options['A'].'-'.$options['V']; |
|
334 | + $taskList = $options['A'] . '-' . $options['V']; |
|
335 | 335 | } else { |
336 | 336 | $taskList = $options['T']; |
337 | 337 | } |
@@ -98,9 +98,10 @@ discard block |
||
98 | 98 | public function poll_for_activities() |
99 | 99 | { |
100 | 100 | // Poll from all the taskList registered for each activities |
101 | - if ($this->debug) |
|
102 | - $this->cpeLogger->log_out("DEBUG", basename(__FILE__), |
|
101 | + if ($this->debug) { |
|
102 | + $this->cpeLogger->log_out("DEBUG", basename(__FILE__), |
|
103 | 103 | "Polling activity taskList '" . $this->taskList . "' ... "); |
104 | + } |
|
104 | 105 | |
105 | 106 | try { |
106 | 107 | // Call SWF and poll for incoming tasks |
@@ -124,8 +125,9 @@ discard block |
||
124 | 125 | { |
125 | 126 | // Get activityType and WorkflowExecution info |
126 | 127 | if (!($activityType = $activityTask->get("activityType")) || |
127 | - !($workflowExecution = $activityTask->get("workflowExecution"))) |
|
128 | - return false; |
|
128 | + !($workflowExecution = $activityTask->get("workflowExecution"))) { |
|
129 | + return false; |
|
130 | + } |
|
129 | 131 | |
130 | 132 | $this->cpeLogger->log_out("INFO", |
131 | 133 | basename(__FILE__), |
@@ -152,16 +154,18 @@ discard block |
||
152 | 154 | // Perform input validation |
153 | 155 | $this->activityHandler->do_input_validation(); |
154 | 156 | |
155 | - if ($this->debug) |
|
156 | - $this->cpeLogger->log_out("DEBUG", basename(__FILE__), |
|
157 | + if ($this->debug) { |
|
158 | + $this->cpeLogger->log_out("DEBUG", basename(__FILE__), |
|
157 | 159 | "Activity input:\n" . print_r($this->activityHandler->input, true)); |
160 | + } |
|
158 | 161 | |
159 | 162 | // Run activity task |
160 | 163 | $result = $this->activityHandler->do_activity($activityTask); |
161 | 164 | |
162 | - if ($this->debug && $result) |
|
163 | - $this->cpeLogger->log_out("DEBUG", basename(__FILE__), |
|
165 | + if ($this->debug && $result) { |
|
166 | + $this->cpeLogger->log_out("DEBUG", basename(__FILE__), |
|
164 | 167 | "Activity output:\n" . print_r($result, true)); |
168 | + } |
|
165 | 169 | |
166 | 170 | } catch (CpeSdk\CpeException $e) { |
167 | 171 | $reason = $e->ref; |
@@ -216,8 +220,9 @@ discard block |
||
216 | 220 | // Instantiate the Activity class that will process Tasks |
217 | 221 | |
218 | 222 | if (!isset($this->cpeLogger) || |
219 | - !$this->cpeLogger) |
|
220 | - print "EMPTY !!!\n"; |
|
223 | + !$this->cpeLogger) { |
|
224 | + print "EMPTY !!!\n"; |
|
225 | + } |
|
221 | 226 | |
222 | 227 | $this->activityHandler = |
223 | 228 | new $activityToHandle->{"class"}( |
@@ -287,15 +292,18 @@ discard block |
||
287 | 292 | global $activityVersion; |
288 | 293 | |
289 | 294 | // Handle input parameters |
290 | - if (!($options = getopt("D:T:A:V:c:l:hd"))) |
|
291 | - usage($defaultConfigFile); |
|
295 | + if (!($options = getopt("D:T:A:V:c:l:hd"))) { |
|
296 | + usage($defaultConfigFile); |
|
297 | + } |
|
292 | 298 | |
293 | - if (!count($options) || isset($options['h'])) |
|
294 | - usage($defaultConfigFile); |
|
299 | + if (!count($options) || isset($options['h'])) { |
|
300 | + usage($defaultConfigFile); |
|
301 | + } |
|
295 | 302 | |
296 | 303 | // Debug |
297 | - if (isset($options['d'])) |
|
298 | - $debug = true; |
|
304 | + if (isset($options['d'])) { |
|
305 | + $debug = true; |
|
306 | + } |
|
299 | 307 | |
300 | 308 | // Domain |
301 | 309 | if (!isset($options['D'])) |
@@ -353,8 +361,7 @@ discard block |
||
353 | 361 | print "\n[CONFIG ISSUE]\nConfiguration file '$defaultConfigFile' invalid or non-existant.\n\n[EASY FIX]\nGo to the directory mentioned in the error above and rename the template file 'cpeConfigTemplate.json' to 'cpeConfig.json'. Configure your Activities. As example you have Activities for CloudTranscode already setup in the template. You can declare your Activities and start executing tasks in an SWF workflow.\n"; |
354 | 362 | exit(1); |
355 | 363 | } |
356 | - } |
|
357 | - catch (Exception $e) { |
|
364 | + } catch (Exception $e) { |
|
358 | 365 | print $e; |
359 | 366 | } |
360 | 367 | |
@@ -374,8 +381,7 @@ discard block |
||
374 | 381 | // Instantiate ActivityPoller |
375 | 382 | try { |
376 | 383 | $activityPoller = new ActivityPoller($config); |
377 | -} |
|
378 | -catch (CpeSdk\CpeException $e) { |
|
384 | +} catch (CpeSdk\CpeException $e) { |
|
379 | 385 | $cpeLogger->log_out("FATAL", |
380 | 386 | basename(__FILE__), $e->getMessage()); |
381 | 387 | exit(1); |
@@ -384,6 +390,7 @@ discard block |
||
384 | 390 | $cpeLogger->log_out("INFO", basename(__FILE__), "Starting activity tasks polling"); |
385 | 391 | |
386 | 392 | // Start polling loop |
387 | -while (42) |
|
393 | +while (42) { |
|
388 | 394 | $activityPoller->poll_for_activities(); |
395 | +} |
|
389 | 396 |
@@ -114,12 +114,12 @@ discard block |
||
114 | 114 | $this->cpeLogger->log_out( |
115 | 115 | "ERROR", |
116 | 116 | basename(__FILE__), |
117 | - $e->getMessage().print_r($msg, true)); |
|
117 | + $e->getMessage() . print_r($msg, true)); |
|
118 | 118 | } catch (\Exception $e) { |
119 | 119 | $this->cpeLogger->log_out( |
120 | 120 | "ERROR", |
121 | 121 | basename(__FILE__), |
122 | - $e->getMessage().print_r($msg, true)); |
|
122 | + $e->getMessage() . print_r($msg, true)); |
|
123 | 123 | } |
124 | 124 | |
125 | 125 | // Message polled. Valid or not, we delete it from SQS |
@@ -147,7 +147,7 @@ discard block |
||
147 | 147 | $this->cpeLogger->log_out( |
148 | 148 | "INFO", |
149 | 149 | basename(__FILE__), |
150 | - "Received message '" . $message->{"type"} . "'" |
|
150 | + "Received message '" . $message->{"type"} . "'" |
|
151 | 151 | ); |
152 | 152 | if ($this->debug) |
153 | 153 | $this->cpeLogger->log_out( |
@@ -196,7 +196,7 @@ discard block |
||
196 | 196 | $this->cpeLogger->log_out( |
197 | 197 | "INFO", |
198 | 198 | basename(__FILE__), |
199 | - "New workflow submitted to SWF: ".$workflowRunId->get('runId')); |
|
199 | + "New workflow submitted to SWF: " . $workflowRunId->get('runId')); |
|
200 | 200 | |
201 | 201 | // Send WORKFLOW_SCHEDULED message back to client |
202 | 202 | $this->cpeSqsWriter->workflow_scheduled($workflowType, $workflowRunId->get('runId'), $workflowId, $message); |
@@ -222,10 +222,10 @@ discard block |
||
222 | 222 | private function validate_message($message) |
223 | 223 | { |
224 | 224 | if (!isset($message) || |
225 | - !isset($message->{"time"}) || $message->{"time"} == "" || |
|
226 | - !isset($message->{"jobId"}) || $message->{"jobId"} == "" || |
|
227 | - !isset($message->{"type"}) || $message->{"type"} == "" || |
|
228 | - !isset($message->{"data"}) || $message->{"data"} == "") |
|
225 | + !isset($message->{"time"}) || $message->{"time"} == "" || |
|
226 | + !isset($message->{"jobId"}) || $message->{"jobId"} == "" || |
|
227 | + !isset($message->{"type"}) || $message->{"type"} == "" || |
|
228 | + !isset($message->{"data"}) || $message->{"data"} == "") |
|
229 | 229 | throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!", |
230 | 230 | self::INVALID_JSON); |
231 | 231 | |
@@ -246,7 +246,7 @@ discard block |
||
246 | 246 | function usage($defaultConfigFile) |
247 | 247 | { |
248 | 248 | echo("# Description\nThe InputPoller connects your client applications to your stack. It handles communication over the two SQS channels you created for your client application.\n\n"); |
249 | - echo("Usage: php ". basename(__FILE__) . " [-h] [-d] -n <client_name> [-l <log path>]\n"); |
|
249 | + echo("Usage: php " . basename(__FILE__) . " [-h] [-d] -n <client_name> [-l <log path>]\n"); |
|
250 | 250 | echo("-h: Print this help\n"); |
251 | 251 | echo("-d: Debug mode\n"); |
252 | 252 | echo("-l <log_path>: Location where logs will be dumped in (folder).\n"); |
@@ -283,9 +283,9 @@ discard block |
||
283 | 283 | $cpeLogger = new CpeSdk\CpeLogger($logPath, $options['n'], $debug); |
284 | 284 | |
285 | 285 | $config = new \stdClass; |
286 | - $config->clients = [(object)[ |
|
286 | + $config->clients = [(object) [ |
|
287 | 287 | 'name' => $options['n'], |
288 | - 'queues' => (object)[ |
|
288 | + 'queues' => (object) [ |
|
289 | 289 | 'input' => getenv('INPUT_QUEUE'), |
290 | 290 | 'output' => getenv('OUTPUT_QUEUE'), |
291 | 291 | ], |
@@ -86,9 +86,10 @@ discard block |
||
86 | 86 | // If a msg is received, we pass it to 'handle_input' for processing |
87 | 87 | public function poll_SQS_queues() |
88 | 88 | { |
89 | - if (!isset($this->config->{'clients'})) |
|
90 | - throw new CpeSdk\CpeException("Clients configuration invalid. Check the config file or your parameters.", |
|
89 | + if (!isset($this->config->{'clients'})) { |
|
90 | + throw new CpeSdk\CpeException("Clients configuration invalid. Check the config file or your parameters.", |
|
91 | 91 | self::INVALID_CONFIG); |
92 | + } |
|
92 | 93 | |
93 | 94 | // For all clients in config files |
94 | 95 | // We poll from queues |
@@ -102,13 +103,14 @@ discard block |
||
102 | 103 | try { |
103 | 104 | if ($msg = $this->cpeSqsListener->receive_message($queue, 10)) |
104 | 105 | { |
105 | - if (!($decoded = json_decode($msg['Body']))) |
|
106 | - $this->cpeLogger->log_out( |
|
106 | + if (!($decoded = json_decode($msg['Body']))) { |
|
107 | + $this->cpeLogger->log_out( |
|
107 | 108 | "ERROR", |
108 | 109 | basename(__FILE__), |
109 | 110 | "JSON data invalid in queue: '$queue'"); |
110 | - else |
|
111 | - $this->handle_message($decoded); |
|
111 | + } else { |
|
112 | + $this->handle_message($decoded); |
|
113 | + } |
|
112 | 114 | } |
113 | 115 | } catch (CpeSdk\CpeException $e) { |
114 | 116 | $this->cpeLogger->log_out( |
@@ -123,8 +125,9 @@ discard block |
||
123 | 125 | } |
124 | 126 | |
125 | 127 | // Message polled. Valid or not, we delete it from SQS |
126 | - if ($msg) |
|
127 | - $this->cpeSqsListener->delete_message($queue, $msg); |
|
128 | + if ($msg) { |
|
129 | + $this->cpeSqsListener->delete_message($queue, $msg); |
|
130 | + } |
|
128 | 131 | } |
129 | 132 | } |
130 | 133 | |
@@ -149,12 +152,13 @@ discard block |
||
149 | 152 | basename(__FILE__), |
150 | 153 | "Received message '" . $message->{"type"} . "'" |
151 | 154 | ); |
152 | - if ($this->debug) |
|
153 | - $this->cpeLogger->log_out( |
|
155 | + if ($this->debug) { |
|
156 | + $this->cpeLogger->log_out( |
|
154 | 157 | "DEBUG", |
155 | 158 | basename(__FILE__), |
156 | 159 | "Details:\n" . json_encode($message, JSON_PRETTY_PRINT) |
157 | 160 | ); |
161 | + } |
|
158 | 162 | |
159 | 163 | // We call the callback function that handles this message |
160 | 164 | $this->{$this->typesMap[$message->{"type"}]}($message); |
@@ -168,12 +172,13 @@ discard block |
||
168 | 172 | // Start a new workflow in SWF to initiate new transcoding job |
169 | 173 | private function start_job($message) |
170 | 174 | { |
171 | - if ($this->debug) |
|
172 | - $this->cpeLogger->log_out( |
|
175 | + if ($this->debug) { |
|
176 | + $this->cpeLogger->log_out( |
|
173 | 177 | "DEBUG", |
174 | 178 | basename(__FILE__), |
175 | 179 | "Starting new workflow!" |
176 | 180 | ); |
181 | + } |
|
177 | 182 | |
178 | 183 | // Workflow info |
179 | 184 | $workflowType = array( |
@@ -225,13 +230,15 @@ discard block |
||
225 | 230 | !isset($message->{"time"}) || $message->{"time"} == "" || |
226 | 231 | !isset($message->{"jobId"}) || $message->{"jobId"} == "" || |
227 | 232 | !isset($message->{"type"}) || $message->{"type"} == "" || |
228 | - !isset($message->{"data"}) || $message->{"data"} == "") |
|
229 | - throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!", |
|
233 | + !isset($message->{"data"}) || $message->{"data"} == "") { |
|
234 | + throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!", |
|
230 | 235 | self::INVALID_JSON); |
236 | + } |
|
231 | 237 | |
232 | - if (!isset($message->{'data'}->{'workflow'})) |
|
233 | - throw new CpeSdk\CpeException("Input doesn't contain any workflow information. You must provide the workflow you want to send this job to!", |
|
238 | + if (!isset($message->{'data'}->{'workflow'})) { |
|
239 | + throw new CpeSdk\CpeException("Input doesn't contain any workflow information. You must provide the workflow you want to send this job to!", |
|
234 | 240 | self::INVALID_JSON); |
241 | + } |
|
235 | 242 | } |
236 | 243 | } |
237 | 244 | |
@@ -262,11 +269,13 @@ discard block |
||
262 | 269 | // Handle input parameters |
263 | 270 | $options = getopt("l:hdn:"); |
264 | 271 | |
265 | - if (isset($options['h'])) |
|
266 | - usage(); |
|
272 | + if (isset($options['h'])) { |
|
273 | + usage(); |
|
274 | + } |
|
267 | 275 | |
268 | - if (isset($options['d'])) |
|
269 | - $debug = true; |
|
276 | + if (isset($options['d'])) { |
|
277 | + $debug = true; |
|
278 | + } |
|
270 | 279 | |
271 | 280 | $logPath = null; |
272 | 281 | if (isset($options['l'])) |
@@ -304,8 +313,7 @@ discard block |
||
304 | 313 | // Create InputPoller object |
305 | 314 | try { |
306 | 315 | $inputPoller = new InputPoller($config); |
307 | -} |
|
308 | -catch (CpeSdk\CpeException $e) { |
|
316 | +} catch (CpeSdk\CpeException $e) { |
|
309 | 317 | echo $e->getMessage(); |
310 | 318 | $cpeLogger->log_out( |
311 | 319 | "FATAL", |
@@ -319,5 +327,6 @@ discard block |
||
319 | 327 | print "Start polling ...\n"; |
320 | 328 | |
321 | 329 | // Start polling loop to get incoming commands from SQS input queues |
322 | -while (42) |
|
330 | +while (42) { |
|
323 | 331 | $inputPoller->poll_SQS_queues(); |
332 | +} |