CpeSqsWriter::send_activity_msg()   B
last analyzed

Complexity

Conditions 4
Paths 5

Size

Total Lines 41
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 41
rs 8.5806
c 1
b 0
f 0
cc 4
eloc 25
nc 5
nop 4
1
<?php
2
3
/**
4
 * This class is used by the CPE Core ActivityPoller
5
 * AND can be also used by the Custom Activities
6
 *
7
 * Use it to send updates to the Clients.
8
 **/
9
10
namespace SA\CpeSdk\Sqs;
11
12
// Amazon libraries
13
use Aws\Common\Aws;
14
use Aws\Sqs;
15
16
// SA Cpe SDK
17
use SA\CpeSdk;
18
19
class CpeSqsWriter
20
{
21
    private $debug;
22
    private $sqs;
23
24
    // Exceptions
25
    const INVALID_JSON       = "INVALID_JSON";
26
27
    // Statuses
28
    const WORKFLOW_SCHEDULED = "WORKFLOW_SCHEDULED";
29
    const JOB_STARTED        = "JOB_STARTED";
30
    const JOB_COMPLETED      = "JOB_COMPLETED";
31
    const JOB_FAILED         = "JOB_FAILED";
32
    const ACTIVITY_STARTED   = "ACTIVITY_STARTED";
33
    const ACTIVITY_FAILED    = "ACTIVITY_FAILED";
34
    const ACTIVITY_TIMEOUT   = "ACTIVITY_TIMEOUT";
35
    const ACTIVITY_COMPLETED = "ACTIVITY_COMPLETED";
36
    const ACTIVITY_PROGRESS  = "ACTIVITY_PROGRESS";
37
    const ACTIVITY_PREPARING = "ACTIVITY_PREPARING";
38
    const ACTIVITY_FINISHING = "ACTIVITY_FINISHING";
39
    
40
    public function __construct($debug)
41
    {
42
        $this->debug = $debug;
43
44
        // Create AWS SDK instance.
45
        // We expect an AWS role or the proper AWS env variable sets for KEY/SECRET
46
        $aws = Aws::factory(array(
47
                'region' => getenv("AWS_DEFAULT_REGION")
48
            ));
49
        $this->sqs = $aws->get('Sqs');
50
    }
51
52
    
53
    /**
54
     * SEND messages to OUTPUT SQS queue
55
     *
56
     * Send messages to back to the clients.
57
     * Clients listening to the SQS queue will receive them
58
     * Send updates and notifications:
59
     * Activity Started, Failed, Succeeded, etc
60
     */
61
    
62
    public function workflow_scheduled($workflowType, $runId, $workflowId, $message)
63
    {
64
        $msg = [
65
            'time'         => microtime(true),
66
            'type'         => self::WORKFLOW_SCHEDULED,
67
            "jobId"        => $message->{"jobId"},
68
            "runId"        => $runId,
69
            "workflowId"   => $workflowId,
70
            "workflowType" => $workflowType,
71
            "input"        => $message->{'data'}
72
        ];
73
        
74
        $client = $message->{'data'}->{"client"};
75
        $this->sqs->sendMessage(array(
76
                'QueueUrl'    => $client->{'queues'}->{'output'},
77
                'MessageBody' => json_encode($msg),
78
            ));
79
    }
80
    
81
    public function activity_started($task)
82
    {
83
        // last param to 'true' to force sending 'input' info back to client
84
        $this->send_activity_msg(
85
            $task, 
86
            self::ACTIVITY_STARTED,
87
            true
88
        );
89
    }
90
91
    public function activity_completed($task, $result = null)
92
    {
93
        // Last param append extra data to the message to send back to the client. In this case the result data.
94
        $this->send_activity_msg(
95
            $task, 
96
            self::ACTIVITY_COMPLETED,
97
            false,
98
            $result
99
        );
100
    }
101
102
    public function activity_failed($task, $reason, $details)
103
    {
104
        $this->send_activity_msg(
105
            $task, 
106
            self::ACTIVITY_FAILED,
107
            false,
108
            [
109
                "reason"  => $reason,
110
                "details" => $details
111
            ]
112
        );
113
    }
114
115
    public function activity_timeout($task)
116
    {
117
        $this->send_activity_msg(
118
            $task, 
119
            self::ACTIVITY_TIMEOUT,
120
            true
121
        );
122
    }
123
124
    public function activity_canceled($task)
0 ignored issues
show
Unused Code introduced by
The parameter $task is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
125
    {
126
        // FIXME: To implement
127
    }
128
129
    public function activity_progress($task, $progress)
130
    {
131
        $this->send_activity_msg(
132
            $task, 
133
            self::ACTIVITY_PROGRESS, 
134
            false,
135
            $progress
136
        );
137
    }
138
139
    public function activity_preparing($task)
140
    {
141
        $this->send_activity_msg(
142
            $task, 
143
            self::ACTIVITY_PREPARING
144
        );
145
    }
146
147
    public function activity_finishing($task)
148
    {
149
        $this->send_activity_msg(
150
            $task, 
151
            self::ACTIVITY_FINISHING
152
        );
153
    }
154
155
    
156
    /**
157
     * UTILS
158
     */
159
160
    // Craft a new message array
161
    private function craft_new_msg($type, $data)
162
    {
163
        $msg = array(
164
            'time'   => microtime(true),
165
            'type'   => $type,
166
            'data'   => $data
167
        );
168
169
        return $msg;
170
    }
171
172
    // Send a message to SQS output queue
173
    private function send_activity_msg(
174
        $activityTask, 
175
        $eventType, 
176
        $sendInput = null, 
177
        $result = null)
178
    {
179
        if (!($input = json_decode($activityTask->get('input'))))
180
            throw new CpeException("Task input JSON is invalid!\n".$activityTask->get('input'),
181
                INVALID_JSON);
182
        
183
        $activity = [
184
            'activityId'   => $activityTask->get('activityId'),
185
            'activityType' => $activityTask->get('activityType')
186
        ];
187
        
188
        // Want to send back the input data ?
189
        if ($sendInput)
190
            $activity['input'] = $input;
191
        
192
        // Extra data? Concat to data array.
193
        if ($result)
194
            $activity['result'] = $result;
195
        
196
        // Initial data structure
197
        $data = array(
198
            'workflow' => $activityTask->get('workflowExecution'),
199
            'activity' => $activity
200
        );
201
        
202
        $msg = $this->craft_new_msg(
203
            $eventType,
204
            $data
205
        );
206
207
        // Send message to SQS output queue. 
208
        $client = $input->{"client"};
209
        $this->sqs->sendMessage(array(
210
                'QueueUrl'    => $client->{'queues'}->{'output'},
211
                'MessageBody' => json_encode($msg),
212
            ));
213
    }
214
}