Completed
Push — master ( 0c490c...c0443b )
by Nicolas
02:10
created
pollers/src/ActivityPoller.php 2 patches
Spacing   +6 added lines, -6 removed lines patch added patch discarded remove patch
@@ -100,7 +100,7 @@  discard block
 block discarded – undo
100 100
         // Poll from all the taskList registered for each activities 
101 101
         if ($this->debug)
102 102
             $this->cpeLogger->log_out("DEBUG", basename(__FILE__),
103
-                "Polling activity taskList '" . $this->taskList  . "' ... ");
103
+                "Polling activity taskList '" . $this->taskList . "' ... ");
104 104
             
105 105
         try {
106 106
             // Call SWF and poll for incoming tasks
@@ -201,9 +201,9 @@  discard block
 block discarded – undo
201 201
                 if (!file_exists($activityToHandle->{"file"}))
202 202
                 {
203 203
                     $this->cpeLogger->log_out("ERROR", basename(__FILE__),
204
-                        "The code file '".$activityToHandle->{"file"}."' for activity: name=" 
204
+                        "The code file '" . $activityToHandle->{"file"} . "' for activity: name=" 
205 205
                         . $activityToHandle->{"name"} . ",version=" 
206
-                        . $activityToHandle->{"version"}." doesn't exists! Check if the file is accessible and if the path is correct in your config file.");
206
+                        . $activityToHandle->{"version"} . " doesn't exists! Check if the file is accessible and if the path is correct in your config file.");
207 207
                     return false;
208 208
                 }
209 209
                 
@@ -242,7 +242,7 @@  discard block
 block discarded – undo
242 242
         $this->cpeLogger->log_out("ERROR", basename(__FILE__),
243 243
             "No Activity handler was found for: name=" 
244 244
             . $this->activityName . ",version=" 
245
-            . $this->activityVersion.". Check your config file and ensure your 'activity' name AND 'version' is there.");    
245
+            . $this->activityVersion . ". Check your config file and ensure your 'activity' name AND 'version' is there.");    
246 246
         return false;
247 247
     }
248 248
 }
@@ -262,7 +262,7 @@  discard block
 block discarded – undo
262 262
 // Usage
263 263
 function usage($defaultConfigFile)
264 264
 {
265
-    echo("Usage: php ". basename(__FILE__) . " -D <domain> -A <activity_name> -V <activity_version> [-T <task_list>] [-h] [-d] [-c <config_file path>] [-l <log path>]\n");
265
+    echo("Usage: php " . basename(__FILE__) . " -D <domain> -A <activity_name> -V <activity_version> [-T <task_list>] [-h] [-d] [-c <config_file path>] [-l <log path>]\n");
266 266
     echo("-h: Print this help\n");
267 267
     echo("-d: Debug mode\n");
268 268
     echo("-c <config_file path>: Optional parameter to override the default configuration file: '$defaultConfigFile'.\n");
@@ -331,7 +331,7 @@  discard block
 block discarded – undo
331 331
     // Tasklist
332 332
     if (!isset($options['T']))
333 333
     {
334
-        $taskList = $options['A'].'-'.$options['V'];
334
+        $taskList = $options['A'] . '-' . $options['V'];
335 335
     } else {
336 336
         $taskList = $options['T'];
337 337
     }
Please login to merge, or discard this patch.
Braces   +28 added lines, -21 removed lines patch added patch discarded remove patch
@@ -98,9 +98,10 @@  discard block
 block discarded – undo
98 98
     public function poll_for_activities()
99 99
     {
100 100
         // Poll from all the taskList registered for each activities 
101
-        if ($this->debug)
102
-            $this->cpeLogger->log_out("DEBUG", basename(__FILE__),
101
+        if ($this->debug) {
102
+                    $this->cpeLogger->log_out("DEBUG", basename(__FILE__),
103 103
                 "Polling activity taskList '" . $this->taskList  . "' ... ");
104
+        }
104 105
             
105 106
         try {
106 107
             // Call SWF and poll for incoming tasks
@@ -124,8 +125,9 @@  discard block
 block discarded – undo
124 125
     {
125 126
         // Get activityType and WorkflowExecution info
126 127
         if (!($activityType      = $activityTask->get("activityType")) ||
127
-            !($workflowExecution = $activityTask->get("workflowExecution")))
128
-            return false;
128
+            !($workflowExecution = $activityTask->get("workflowExecution"))) {
129
+                    return false;
130
+        }
129 131
         
130 132
         $this->cpeLogger->log_out("INFO",
131 133
             basename(__FILE__),
@@ -152,16 +154,18 @@  discard block
 block discarded – undo
152 154
             // Perform input validation
153 155
             $this->activityHandler->do_input_validation();
154 156
 
155
-            if ($this->debug)
156
-                $this->cpeLogger->log_out("DEBUG", basename(__FILE__), 
157
+            if ($this->debug) {
158
+                            $this->cpeLogger->log_out("DEBUG", basename(__FILE__), 
157 159
                     "Activity input:\n" . print_r($this->activityHandler->input, true));
160
+            }
158 161
         
159 162
             // Run activity task
160 163
             $result = $this->activityHandler->do_activity($activityTask);
161 164
 
162
-            if ($this->debug && $result)
163
-                $this->cpeLogger->log_out("DEBUG", basename(__FILE__), 
165
+            if ($this->debug && $result) {
166
+                            $this->cpeLogger->log_out("DEBUG", basename(__FILE__), 
164 167
                     "Activity output:\n" . print_r($result, true));
168
+            }
165 169
         
166 170
         } catch (CpeSdk\CpeException $e) {
167 171
             $reason  = $e->ref;
@@ -216,8 +220,9 @@  discard block
 block discarded – undo
216 220
                 // Instantiate the Activity class that will process Tasks
217 221
 
218 222
                 if (!isset($this->cpeLogger) ||
219
-                    !$this->cpeLogger)
220
-                    print "EMPTY !!!\n";
223
+                    !$this->cpeLogger) {
224
+                                    print "EMPTY !!!\n";
225
+                }
221 226
                 
222 227
                 $this->activityHandler = 
223 228
                     new $activityToHandle->{"class"}(
@@ -287,15 +292,18 @@  discard block
 block discarded – undo
287 292
     global $activityVersion;
288 293
     
289 294
     // Handle input parameters
290
-    if (!($options = getopt("D:T:A:V:c:l:hd")))
291
-        usage($defaultConfigFile);
295
+    if (!($options = getopt("D:T:A:V:c:l:hd"))) {
296
+            usage($defaultConfigFile);
297
+    }
292 298
     
293
-    if (!count($options) || isset($options['h']))
294
-        usage($defaultConfigFile);
299
+    if (!count($options) || isset($options['h'])) {
300
+            usage($defaultConfigFile);
301
+    }
295 302
 
296 303
     // Debug
297
-    if (isset($options['d']))
298
-        $debug = true;
304
+    if (isset($options['d'])) {
305
+            $debug = true;
306
+    }
299 307
 
300 308
     // Domain
301 309
     if (!isset($options['D']))
@@ -353,8 +361,7 @@  discard block
 block discarded – undo
353 361
             print "\n[CONFIG ISSUE]\nConfiguration file '$defaultConfigFile' invalid or non-existant.\n\n[EASY FIX]\nGo to the directory mentioned in the error above and rename the template file 'cpeConfigTemplate.json' to 'cpeConfig.json'. Configure your Activities. As example you have Activities for CloudTranscode already setup in the template. You can declare your Activities and start executing tasks in an SWF workflow.\n";
354 362
             exit(1);
355 363
         }
356
-    }
357
-    catch (Exception $e) {
364
+    } catch (Exception $e) {
358 365
         print $e;
359 366
     }
360 367
             
@@ -374,8 +381,7 @@  discard block
 block discarded – undo
374 381
 // Instantiate ActivityPoller
375 382
 try {
376 383
     $activityPoller = new ActivityPoller($config);
377
-} 
378
-catch (CpeSdk\CpeException $e) {
384
+} catch (CpeSdk\CpeException $e) {
379 385
     $cpeLogger->log_out("FATAL",
380 386
         basename(__FILE__), $e->getMessage());
381 387
     exit(1);
@@ -384,6 +390,7 @@  discard block
 block discarded – undo
384 390
 $cpeLogger->log_out("INFO", basename(__FILE__), "Starting activity tasks polling");
385 391
 
386 392
 // Start polling loop
387
-while (42)
393
+while (42) {
388 394
     $activityPoller->poll_for_activities();
395
+}
389 396
 
Please login to merge, or discard this patch.
pollers/src/InputPoller.php 2 patches
Spacing   +11 added lines, -11 removed lines patch added patch discarded remove patch
@@ -114,12 +114,12 @@  discard block
 block discarded – undo
114 114
                 $this->cpeLogger->log_out(
115 115
                     "ERROR", 
116 116
                     basename(__FILE__), 
117
-                    $e->getMessage().print_r($msg, true));
117
+                    $e->getMessage() . print_r($msg, true));
118 118
             } catch (\Exception $e) {
119 119
                 $this->cpeLogger->log_out(
120 120
                     "ERROR", 
121 121
                     basename(__FILE__), 
122
-                    $e->getMessage().print_r($msg, true));
122
+                    $e->getMessage() . print_r($msg, true));
123 123
             }
124 124
             
125 125
             // Message polled. Valid or not, we delete it from SQS
@@ -147,7 +147,7 @@  discard block
 block discarded – undo
147 147
         $this->cpeLogger->log_out(
148 148
             "INFO", 
149 149
             basename(__FILE__), 
150
-            "Received message '" . $message->{"type"}  . "'"
150
+            "Received message '" . $message->{"type"} . "'"
151 151
         );
152 152
         if ($this->debug)
153 153
             $this->cpeLogger->log_out(
@@ -196,7 +196,7 @@  discard block
 block discarded – undo
196 196
             $this->cpeLogger->log_out(
197 197
                 "INFO",
198 198
                 basename(__FILE__),
199
-                "New workflow submitted to SWF: ".$workflowRunId->get('runId'));
199
+                "New workflow submitted to SWF: " . $workflowRunId->get('runId'));
200 200
 
201 201
             // Send WORKFLOW_SCHEDULED message back to client
202 202
             $this->cpeSqsWriter->workflow_scheduled($workflowType, $workflowRunId->get('runId'), $workflowId, $message);
@@ -222,10 +222,10 @@  discard block
 block discarded – undo
222 222
     private function validate_message($message)
223 223
     {
224 224
         if (!isset($message) || 
225
-            !isset($message->{"time"})   || $message->{"time"} == "" || 
226
-            !isset($message->{"jobId"})  || $message->{"jobId"} == "" || 
227
-            !isset($message->{"type"})   || $message->{"type"} == "" || 
228
-            !isset($message->{"data"})   || $message->{"data"} == "")
225
+            !isset($message->{"time"}) || $message->{"time"} == "" || 
226
+            !isset($message->{"jobId"}) || $message->{"jobId"} == "" || 
227
+            !isset($message->{"type"}) || $message->{"type"} == "" || 
228
+            !isset($message->{"data"}) || $message->{"data"} == "")
229 229
             throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!",
230 230
                 self::INVALID_JSON);
231 231
         
@@ -246,7 +246,7 @@  discard block
 block discarded – undo
246 246
 function usage($defaultConfigFile)
247 247
 {
248 248
     echo("# Description\nThe InputPoller connects your client applications to your stack. It handles communication over the two SQS channels you created for your client application.\n\n");
249
-    echo("Usage: php ". basename(__FILE__) . " [-h] [-d] -n <client_name> [-l <log path>]\n");
249
+    echo("Usage: php " . basename(__FILE__) . " [-h] [-d] -n <client_name> [-l <log path>]\n");
250 250
     echo("-h: Print this help\n");
251 251
     echo("-d: Debug mode\n");
252 252
     echo("-l <log_path>: Location where logs will be dumped in (folder).\n");
@@ -283,9 +283,9 @@  discard block
 block discarded – undo
283 283
     $cpeLogger = new CpeSdk\CpeLogger($logPath, $options['n'], $debug);
284 284
         
285 285
     $config = new \stdClass;
286
-    $config->clients = [(object)[
286
+    $config->clients = [(object) [
287 287
             'name' => $options['n'],
288
-            'queues' => (object)[
288
+            'queues' => (object) [
289 289
                 'input' => getenv('INPUT_QUEUE'),
290 290
                 'output' => getenv('OUTPUT_QUEUE'),
291 291
             ],
Please login to merge, or discard this patch.
Braces   +32 added lines, -23 removed lines patch added patch discarded remove patch
@@ -86,9 +86,10 @@  discard block
 block discarded – undo
86 86
     // If a msg is received, we pass it to 'handle_input' for processing
87 87
     public function poll_SQS_queues()
88 88
     {
89
-        if (!isset($this->config->{'clients'}))
90
-            throw new CpeSdk\CpeException("Clients configuration invalid. Check the config file or your parameters.",
89
+        if (!isset($this->config->{'clients'})) {
90
+                    throw new CpeSdk\CpeException("Clients configuration invalid. Check the config file or your parameters.",
91 91
                 self::INVALID_CONFIG);
92
+        }
92 93
         
93 94
         // For all clients in config files
94 95
         // We poll from queues
@@ -102,13 +103,14 @@  discard block
 block discarded – undo
102 103
             try {
103 104
                 if ($msg = $this->cpeSqsListener->receive_message($queue, 10))
104 105
                 {
105
-                    if (!($decoded = json_decode($msg['Body'])))
106
-                        $this->cpeLogger->log_out(
106
+                    if (!($decoded = json_decode($msg['Body']))) {
107
+                                            $this->cpeLogger->log_out(
107 108
                             "ERROR", 
108 109
                             basename(__FILE__), 
109 110
                             "JSON data invalid in queue: '$queue'");
110
-                    else                    
111
-                        $this->handle_message($decoded);
111
+                    } else {
112
+                                            $this->handle_message($decoded);
113
+                    }
112 114
                 }
113 115
             } catch (CpeSdk\CpeException $e) {
114 116
                 $this->cpeLogger->log_out(
@@ -123,8 +125,9 @@  discard block
 block discarded – undo
123 125
             }
124 126
             
125 127
             // Message polled. Valid or not, we delete it from SQS
126
-            if ($msg)
127
-                $this->cpeSqsListener->delete_message($queue, $msg);
128
+            if ($msg) {
129
+                            $this->cpeSqsListener->delete_message($queue, $msg);
130
+            }
128 131
         }
129 132
     }
130 133
 
@@ -149,12 +152,13 @@  discard block
 block discarded – undo
149 152
             basename(__FILE__), 
150 153
             "Received message '" . $message->{"type"}  . "'"
151 154
         );
152
-        if ($this->debug)
153
-            $this->cpeLogger->log_out(
155
+        if ($this->debug) {
156
+                    $this->cpeLogger->log_out(
154 157
                 "DEBUG", 
155 158
                 basename(__FILE__), 
156 159
                 "Details:\n" . json_encode($message, JSON_PRETTY_PRINT)
157 160
             );
161
+        }
158 162
 
159 163
         // We call the callback function that handles this message  
160 164
         $this->{$this->typesMap[$message->{"type"}]}($message);
@@ -168,12 +172,13 @@  discard block
 block discarded – undo
168 172
     // Start a new workflow in SWF to initiate new transcoding job
169 173
     private function start_job($message)
170 174
     {
171
-        if ($this->debug)
172
-            $this->cpeLogger->log_out(
175
+        if ($this->debug) {
176
+                    $this->cpeLogger->log_out(
173 177
                 "DEBUG",
174 178
                 basename(__FILE__),
175 179
                 "Starting new workflow!"
176 180
             );
181
+        }
177 182
         
178 183
         // Workflow info
179 184
         $workflowType = array(
@@ -225,13 +230,15 @@  discard block
 block discarded – undo
225 230
             !isset($message->{"time"})   || $message->{"time"} == "" || 
226 231
             !isset($message->{"jobId"})  || $message->{"jobId"} == "" || 
227 232
             !isset($message->{"type"})   || $message->{"type"} == "" || 
228
-            !isset($message->{"data"})   || $message->{"data"} == "")
229
-            throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!",
233
+            !isset($message->{"data"})   || $message->{"data"} == "") {
234
+                    throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!",
230 235
                 self::INVALID_JSON);
236
+        }
231 237
         
232
-        if (!isset($message->{'data'}->{'workflow'}))
233
-            throw new CpeSdk\CpeException("Input doesn't contain any workflow information. You must provide the workflow you want to send this job to!",
238
+        if (!isset($message->{'data'}->{'workflow'})) {
239
+                    throw new CpeSdk\CpeException("Input doesn't contain any workflow information. You must provide the workflow you want to send this job to!",
234 240
                 self::INVALID_JSON);
241
+        }
235 242
     }
236 243
 }
237 244
 
@@ -262,11 +269,13 @@  discard block
 block discarded – undo
262 269
     // Handle input parameters
263 270
     $options = getopt("l:hdn:");
264 271
 
265
-    if (isset($options['h']))
266
-        usage();
272
+    if (isset($options['h'])) {
273
+            usage();
274
+    }
267 275
     
268
-    if (isset($options['d'])) 
269
-        $debug = true;
276
+    if (isset($options['d'])) {
277
+            $debug = true;
278
+    }
270 279
         
271 280
     $logPath = null;
272 281
     if (isset($options['l']))
@@ -304,8 +313,7 @@  discard block
 block discarded – undo
304 313
 // Create InputPoller object
305 314
 try {
306 315
     $inputPoller = new InputPoller($config);
307
-}
308
-catch (CpeSdk\CpeException $e) {
316
+} catch (CpeSdk\CpeException $e) {
309 317
     echo $e->getMessage();
310 318
     $cpeLogger->log_out(
311 319
         "FATAL", 
@@ -319,5 +327,6 @@  discard block
 block discarded – undo
319 327
 print "Start polling ...\n";
320 328
 
321 329
 // Start polling loop to get incoming commands from SQS input queues
322
-while (42)
330
+while (42) {
323 331
     $inputPoller->poll_SQS_queues();
332
+}
Please login to merge, or discard this patch.