Completed
Push — master ( 22977c...90431d )
by Nicolas
02:16
created

CpeSqsWriter::activity_canceled()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 1
nc 1
nop 1
1
<?php
2
3
/**
4
 * This class is used by the CPE Core ActivityPoller
5
 * AND can be also used by the Custom Activities
6
 *
7
 * Use it to send updates to the Clients.
8
 **/
9
10
namespace SA\CpeSdk\Sqs;
11
12
// Amazon libraries
13
use Aws\Common\Aws;
14
use Aws\Sqs;
15
16
// SA Cpe SDK
17
use SA\CpeSdk;
18
19
class CpeSqsWriter
20
{
21
    private $debug;
22
    private $sqs;
23
24
    // Exceptions
25
    const INVALID_JSON       = "INVALID_JSON";
26
27
    // Statuses
28
    const WORKFLOW_SCHEDULED = "WORKFLOW_SCHEDULED";
29
    const JOB_STARTED        = "JOB_STARTED";
30
    const JOB_COMPLETED      = "JOB_COMPLETED";
31
    const JOB_FAILED         = "JOB_FAILED";
32
    const ACTIVITY_STARTED   = "ACTIVITY_STARTED";
33
    const ACTIVITY_FAILED    = "ACTIVITY_FAILED";
34
    const ACTIVITY_TIMEOUT   = "ACTIVITY_TIMEOUT";
35
    const ACTIVITY_COMPLETED = "ACTIVITY_COMPLETED";
36
    const ACTIVITY_PROGRESS  = "ACTIVITY_PROGRESS";
37
    const ACTIVITY_PREPARING = "ACTIVITY_PREPARING";
38
    const ACTIVITY_FINISHING = "ACTIVITY_FINISHING";
39
    
40
    public function __construct($debug)
41
    {
42
        $this->debug = $debug;
43
44
        // Create AWS SDK instance
45
        $aws = Aws::factory(array(
46
                'region' => getenv("AWS_DEFAULT_REGION")
47
            ));
48
        $this->sqs = $aws->get('Sqs');
49
    }
50
51
    
52
    /**
53
     * SEND messages to OUTPUT SQS queue
54
     *
55
     * Send messages to back to the clients.
56
     * Clients listening to the SQS queue will receive them
57
     * Send updates and notifications:
58
     * Activity Started, Failed, Succeeded, etc
59
     */
60
    
61
    public function workflow_scheduled($workflowType, $runId, $workflowId, $message)
62
    {
63
        $msg = [
64
            'time'         => microtime(true),
65
            'type'         => self::WORKFLOW_SCHEDULED,
66
            "jobId"        => $message->{"jobId"},
67
            "runId"        => $runId,
68
            "workflowId"   => $workflowId,
69
            "workflowType" => $workflowType,
70
            "input"        => $message->{'data'}
71
        ];
72
        
73
        $client = $message->{'data'}->{"client"};
74
        $this->sqs->sendMessage(array(
75
                'QueueUrl'    => $client->{'queues'}->{'output'},
76
                'MessageBody' => json_encode($msg),
77
            ));
78
    }
79
    
80
    public function activity_started($task)
81
    {
82
        // last param to 'true' to force sending 'input' info back to client
83
        $this->send_activity_msg(
84
            $task, 
85
            self::ACTIVITY_STARTED,
86
            true
87
        );
88
    }
89
90
    public function activity_completed($task, $result = null)
91
    {
92
        // Last param append extra data to the message to send back to the client. In this case the result data.
93
        $this->send_activity_msg(
94
            $task, 
95
            self::ACTIVITY_COMPLETED,
96
            false,
97
            $result
98
        );
99
    }
100
101
    public function activity_failed($task, $reason, $details)
102
    {
103
        $this->send_activity_msg(
104
            $task, 
105
            self::ACTIVITY_FAILED,
106
            false,
107
            [
108
                "reason"  => $reason,
109
                "details" => $details
110
            ]
111
        );
112
    }
113
114
    public function activity_timeout($task)
115
    {
116
        $this->send_activity_msg(
117
            $task, 
118
            self::ACTIVITY_TIMEOUT,
119
            true
120
        );
121
    }
122
123
    public function activity_canceled($task)
0 ignored issues
show
Unused Code introduced by
The parameter $task is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
124
    {
125
        // FIXME: To implement
126
    }
127
128
    public function activity_progress($task, $progress)
129
    {
130
        $this->send_activity_msg(
131
            $task, 
132
            self::ACTIVITY_PROGRESS, 
133
            false,
134
            $progress
135
        );
136
    }
137
138
    public function activity_preparing($task)
139
    {
140
        $this->send_activity_msg(
141
            $task, 
142
            self::ACTIVITY_PREPARING
143
        );
144
    }
145
146
    public function activity_finishing($task)
147
    {
148
        $this->send_activity_msg(
149
            $task, 
150
            self::ACTIVITY_FINISHING
151
        );
152
    }
153
154
    
155
    /**
156
     * UTILS
157
     */
158
159
    // Craft a new message array
160
    private function craft_new_msg($type, $data)
161
    {
162
        $msg = array(
163
            'time'   => microtime(true),
164
            'type'   => $type,
165
            'data'   => $data
166
        );
167
168
        return $msg;
169
    }
170
171
    // Send a message to SQS output queue
172
    private function send_activity_msg(
173
        $activityTask, 
174
        $eventType, 
175
        $sendInput = null, 
176
        $result = null)
177
    {
178
        if (!($input = json_decode($activityTask->get('input'))))
179
            throw new CpeException("Task input JSON is invalid!\n".$activityTask->get('input'),
180
                INVALID_JSON);
181
        
182
        $activity = [
183
            'activityId'   => $activityTask->get('activityId'),
184
            'activityType' => $activityTask->get('activityType')
185
        ];
186
        
187
        // Want to send back the input data ?
188
        if ($sendInput)
189
            $activity['input'] = $input;
190
        
191
        // Extra data? Concat to data array.
192
        if ($result)
193
            $activity['result'] = $result;
194
        
195
        // Initial data structure
196
        $data = array(
197
            'workflow' => $activityTask->get('workflowExecution'),
198
            'activity' => $activity
199
        );
200
        
201
        $msg = $this->craft_new_msg(
202
            $eventType,
203
            $data
204
        );
205
206
        // Send message to SQS output queue. 
207
        $client = $input->{"client"};
208
        $this->sqs->sendMessage(array(
209
                'QueueUrl'    => $client->{'queues'}->{'output'},
210
                'MessageBody' => json_encode($msg),
211
            ));
212
    }
213
}