@@ -34,7 +34,7 @@ discard block |
||
| 34 | 34 | * It opens the JSON input and starts a execute a callback corresponding to the command |
| 35 | 35 | */ |
| 36 | 36 | |
| 37 | -require_once __DIR__ . "/../vendor/autoload.php"; |
|
| 37 | +require_once __DIR__."/../vendor/autoload.php"; |
|
| 38 | 38 | |
| 39 | 39 | use Aws\Swf\Exception; |
| 40 | 40 | use SA\CpeSdk; |
@@ -95,7 +95,7 @@ discard block |
||
| 95 | 95 | foreach ($this->config->{'clients'} as $client) |
| 96 | 96 | { |
| 97 | 97 | $msg = null; |
| 98 | - $this->cpeLogger->log_out("DEBUG", __DIR__, "Polling from client: " . print_r($client, true)); |
|
| 98 | + $this->cpeLogger->log_out("DEBUG", __DIR__, "Polling from client: ".print_r($client, true)); |
|
| 99 | 99 | |
| 100 | 100 | // Long Polling messages from client input queue |
| 101 | 101 | $queue = $client->{'queues'}->{'input'}; |
@@ -139,7 +139,7 @@ discard block |
||
| 139 | 139 | $this->cpeLogger->log_out( |
| 140 | 140 | "ERROR", |
| 141 | 141 | basename(__FILE__), |
| 142 | - "Command '" . $message->{"type"} . "' is unknown! Ignoring ..." |
|
| 142 | + "Command '".$message->{"type"}."' is unknown! Ignoring ..." |
|
| 143 | 143 | ); |
| 144 | 144 | return; |
| 145 | 145 | } |
@@ -147,13 +147,13 @@ discard block |
||
| 147 | 147 | $this->cpeLogger->log_out( |
| 148 | 148 | "INFO", |
| 149 | 149 | basename(__FILE__), |
| 150 | - "Received message '" . $message->{"type"} . "'" |
|
| 150 | + "Received message '".$message->{"type"}."'" |
|
| 151 | 151 | ); |
| 152 | 152 | if ($this->debug) |
| 153 | 153 | $this->cpeLogger->log_out( |
| 154 | 154 | "DEBUG", |
| 155 | 155 | basename(__FILE__), |
| 156 | - "Details:\n" . json_encode($message, JSON_PRETTY_PRINT) |
|
| 156 | + "Details:\n".json_encode($message, JSON_PRETTY_PRINT) |
|
| 157 | 157 | ); |
| 158 | 158 | |
| 159 | 159 | // We call the callback function that handles this message |
@@ -226,10 +226,10 @@ discard block |
||
| 226 | 226 | private function validate_message($message) |
| 227 | 227 | { |
| 228 | 228 | if (!isset($message) || |
| 229 | - !isset($message->{"time"}) || $message->{"time"} == "" || |
|
| 230 | - !isset($message->{"jobId"}) || $message->{"jobId"} == "" || |
|
| 231 | - !isset($message->{"type"}) || $message->{"type"} == "" || |
|
| 232 | - !isset($message->{"data"}) || $message->{"data"} == "") |
|
| 229 | + !isset($message->{"time"}) || $message->{"time"} == "" || |
|
| 230 | + !isset($message->{"jobId"}) || $message->{"jobId"} == "" || |
|
| 231 | + !isset($message->{"type"}) || $message->{"type"} == "" || |
|
| 232 | + !isset($message->{"data"}) || $message->{"data"} == "") |
|
| 233 | 233 | throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!", |
| 234 | 234 | self::INVALID_JSON); |
| 235 | 235 | |
@@ -250,7 +250,7 @@ discard block |
||
| 250 | 250 | function usage($defaultConfigFile) |
| 251 | 251 | { |
| 252 | 252 | echo("# Description\nThe InputPoller connects your client applications to your stack. It handles communication over the two SQS channels you created for your client application.\n\n"); |
| 253 | - echo("Usage: php ". basename(__FILE__) . " [-h] [-d] -n <client_name> [-l <log path>]\n"); |
|
| 253 | + echo("Usage: php ".basename(__FILE__)." [-h] [-d] -n <client_name> [-l <log path>]\n"); |
|
| 254 | 254 | echo("-h: Print this help\n"); |
| 255 | 255 | echo("-d: Debug mode\n"); |
| 256 | 256 | echo("-l <log_path>: Location where logs will be dumped in (folder).\n"); |
@@ -287,9 +287,9 @@ discard block |
||
| 287 | 287 | $cpeLogger = new CpeSdk\CpeLogger($logPath, $options['n'], $debug); |
| 288 | 288 | |
| 289 | 289 | $config = new \stdClass; |
| 290 | - $config->clients = [(object)[ |
|
| 290 | + $config->clients = [(object) [ |
|
| 291 | 291 | 'name' => $options['n'], |
| 292 | - 'queues' => (object)[ |
|
| 292 | + 'queues' => (object) [ |
|
| 293 | 293 | 'input' => getenv('INPUT_QUEUE'), |
| 294 | 294 | 'output' => getenv('OUTPUT_QUEUE'), |
| 295 | 295 | ], |
@@ -86,9 +86,10 @@ discard block |
||
| 86 | 86 | // If a msg is received, we pass it to 'handle_input' for processing |
| 87 | 87 | public function poll_SQS_queues() |
| 88 | 88 | { |
| 89 | - if (!isset($this->config->{'clients'})) |
|
| 90 | - throw new CpeSdk\CpeException("Clients configuration invalid. Check the config file or your parameters.", |
|
| 89 | + if (!isset($this->config->{'clients'})) { |
|
| 90 | + throw new CpeSdk\CpeException("Clients configuration invalid. Check the config file or your parameters.", |
|
| 91 | 91 | self::INVALID_CONFIG); |
| 92 | + } |
|
| 92 | 93 | |
| 93 | 94 | // For all clients in config files |
| 94 | 95 | // We poll from queues |
@@ -102,13 +103,14 @@ discard block |
||
| 102 | 103 | try { |
| 103 | 104 | if ($msg = $this->cpeSqsListener->receive_message($queue, 10)) |
| 104 | 105 | { |
| 105 | - if (!($decoded = json_decode($msg['Body']))) |
|
| 106 | - $this->cpeLogger->log_out( |
|
| 106 | + if (!($decoded = json_decode($msg['Body']))) { |
|
| 107 | + $this->cpeLogger->log_out( |
|
| 107 | 108 | "ERROR", |
| 108 | 109 | basename(__FILE__), |
| 109 | 110 | "JSON data invalid in queue: '$queue'"); |
| 110 | - else |
|
| 111 | - $this->handle_message($decoded); |
|
| 111 | + } else { |
|
| 112 | + $this->handle_message($decoded); |
|
| 113 | + } |
|
| 112 | 114 | } |
| 113 | 115 | } catch (CpeSdk\CpeException $e) { |
| 114 | 116 | $this->cpeLogger->log_out( |
@@ -123,8 +125,9 @@ discard block |
||
| 123 | 125 | } |
| 124 | 126 | |
| 125 | 127 | // Message polled. Valid or not, we delete it from SQS |
| 126 | - if ($msg) |
|
| 127 | - $this->cpeSqsListener->delete_message($queue, $msg); |
|
| 128 | + if ($msg) { |
|
| 129 | + $this->cpeSqsListener->delete_message($queue, $msg); |
|
| 130 | + } |
|
| 128 | 131 | } |
| 129 | 132 | } |
| 130 | 133 | |
@@ -149,12 +152,13 @@ discard block |
||
| 149 | 152 | basename(__FILE__), |
| 150 | 153 | "Received message '" . $message->{"type"} . "'" |
| 151 | 154 | ); |
| 152 | - if ($this->debug) |
|
| 153 | - $this->cpeLogger->log_out( |
|
| 155 | + if ($this->debug) { |
|
| 156 | + $this->cpeLogger->log_out( |
|
| 154 | 157 | "DEBUG", |
| 155 | 158 | basename(__FILE__), |
| 156 | 159 | "Details:\n" . json_encode($message, JSON_PRETTY_PRINT) |
| 157 | 160 | ); |
| 161 | + } |
|
| 158 | 162 | |
| 159 | 163 | // We call the callback function that handles this message |
| 160 | 164 | $this->{$this->typesMap[$message->{"type"}]}($message); |
@@ -168,12 +172,13 @@ discard block |
||
| 168 | 172 | // Start a new workflow in SWF to initiate new transcoding job |
| 169 | 173 | private function start_job($message) |
| 170 | 174 | { |
| 171 | - if ($this->debug) |
|
| 172 | - $this->cpeLogger->log_out( |
|
| 175 | + if ($this->debug) { |
|
| 176 | + $this->cpeLogger->log_out( |
|
| 173 | 177 | "DEBUG", |
| 174 | 178 | basename(__FILE__), |
| 175 | 179 | "Starting new workflow!" |
| 176 | 180 | ); |
| 181 | + } |
|
| 177 | 182 | |
| 178 | 183 | // Workflow info |
| 179 | 184 | $workflowType = array( |
@@ -192,9 +197,10 @@ discard block |
||
| 192 | 197 | "taskList" => array("name" => $message->{'data'}->{'workflow'}->{'taskList'}), |
| 193 | 198 | "input" => json_encode($message->{'data'}) |
| 194 | 199 | ); |
| 195 | - if (isset($message->{'data'}->{'workflow'}->{'executionStartToCloseTimeout'})) |
|
| 196 | - $payload["executionStartToCloseTimeout"] = |
|
| 200 | + if (isset($message->{'data'}->{'workflow'}->{'executionStartToCloseTimeout'})) { |
|
| 201 | + $payload["executionStartToCloseTimeout"] = |
|
| 197 | 202 | $message->{'data'}->{'workflow'}->{'executionStartToCloseTimeout'}; |
| 203 | + } |
|
| 198 | 204 | $workflowRunId = $this->cpeSwfHandler->swf->startWorkflowExecution($payload); |
| 199 | 205 | |
| 200 | 206 | $this->cpeLogger->log_out( |
@@ -229,13 +235,15 @@ discard block |
||
| 229 | 235 | !isset($message->{"time"}) || $message->{"time"} == "" || |
| 230 | 236 | !isset($message->{"jobId"}) || $message->{"jobId"} == "" || |
| 231 | 237 | !isset($message->{"type"}) || $message->{"type"} == "" || |
| 232 | - !isset($message->{"data"}) || $message->{"data"} == "") |
|
| 233 | - throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!", |
|
| 238 | + !isset($message->{"data"}) || $message->{"data"} == "") { |
|
| 239 | + throw new CpeSdk\CpeException("'time', 'type', 'jobId' or 'data' fields missing in JSON message file!", |
|
| 234 | 240 | self::INVALID_JSON); |
| 241 | + } |
|
| 235 | 242 | |
| 236 | - if (!isset($message->{'data'}->{'workflow'})) |
|
| 237 | - throw new CpeSdk\CpeException("Input doesn't contain any workflow information. You must provide the workflow you want to send this job to!", |
|
| 243 | + if (!isset($message->{'data'}->{'workflow'})) { |
|
| 244 | + throw new CpeSdk\CpeException("Input doesn't contain any workflow information. You must provide the workflow you want to send this job to!", |
|
| 238 | 245 | self::INVALID_JSON); |
| 246 | + } |
|
| 239 | 247 | } |
| 240 | 248 | } |
| 241 | 249 | |
@@ -266,11 +274,13 @@ discard block |
||
| 266 | 274 | // Handle input parameters |
| 267 | 275 | $options = getopt("l:hdn:"); |
| 268 | 276 | |
| 269 | - if (isset($options['h'])) |
|
| 270 | - usage(); |
|
| 277 | + if (isset($options['h'])) { |
|
| 278 | + usage(); |
|
| 279 | + } |
|
| 271 | 280 | |
| 272 | - if (isset($options['d'])) |
|
| 273 | - $debug = true; |
|
| 281 | + if (isset($options['d'])) { |
|
| 282 | + $debug = true; |
|
| 283 | + } |
|
| 274 | 284 | |
| 275 | 285 | $logPath = null; |
| 276 | 286 | if (isset($options['l'])) |
@@ -308,8 +318,7 @@ discard block |
||
| 308 | 318 | // Create InputPoller object |
| 309 | 319 | try { |
| 310 | 320 | $inputPoller = new InputPoller($config); |
| 311 | -} |
|
| 312 | -catch (CpeSdk\CpeException $e) { |
|
| 321 | +} catch (CpeSdk\CpeException $e) { |
|
| 313 | 322 | echo $e->getMessage(); |
| 314 | 323 | $cpeLogger->log_out( |
| 315 | 324 | "FATAL", |
@@ -323,5 +332,6 @@ discard block |
||
| 323 | 332 | print "Start polling ...\n"; |
| 324 | 333 | |
| 325 | 334 | // Start polling loop to get incoming commands from SQS input queues |
| 326 | -while (42) |
|
| 335 | +while (42) { |
|
| 327 | 336 | $inputPoller->poll_SQS_queues(); |
| 337 | +} |
|