CpeActivity::send_heartbeat()   B
last analyzed

Complexity

Conditions 3
Paths 7

Size

Total Lines 26
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 26
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 18
nc 7
nop 2
1
<?php
2
3
/**
4
 * This Class must be used to create your own activities.
5
 * Extend it in your own activities and implement the do_activity method
6
 */
7
8
namespace SA\CpeSdk;
9
10
use SA\CpeSdk;
11
12
class CpeActivity
13
{
14
    public $params;          // Activity params coming from ActivityPoller
15
    public $debug;           // Debug flag
16
    
17
    public $activityId;      // ID of the activity
18
    public $activityType;    // Type of activity
19
    
20
    public $cpeLogger;       // Logger
21
    public $cpeSqsWriter;    // Used to write messages in SQS
22
    public $cpeSwfHandler;   // USed to control SWF
23
    public $cpeJsonValidator;// Run JSON schemas validation
24
    
25
    public $input_str;       // Complete activity input string
26
    public $input;           // Complete activity input JSON object
27
    public $activityLogKey;  // Create a key workflowId:activityId to put in logs
28
    
29
    const HEARTBEAT_FAILED     = "HEARTBEAT_FAILED";
30
    const NO_ACTIVITY_NAME     = "NO_ACTIVITY_NAME";
31
    const NO_ACTIVITY_VERSION  = "NO_ACTIVITY_VERSION";
32
    const ACTIVITY_TASK_EMPTY  = "ACTIVITY_TASK_EMPTY";
33
    const NO_INPUT             = "NO_INPUT";
34
    const INPUT_INVALID        = "INPUT_INVALID";
35
36
    public function __construct($params, $debug, $cpeLogger = null)
37
    {
38
        $this->debug         = $debug;
39
        
40
        // For listening to the Input SQS queue
41
        $this->cpeSqsWriter  = new CpeSdk\Sqs\CpeSqsWriter($this->debug);
42
        // For listening to the Input SQS queue 
43
        $this->cpeSwfHandler = new CpeSdk\Swf\CpeSwfHandler();        
44
        // For listening to the Input SQS queue 
45
        $this->cpeJsonValidator = new CpeSdk\CpeJsonValidator();     
46
        // Save activity params
47
        $this->params        = $params;
48
49
        // Check if there is an activity name
50
        if (!isset($params["name"]) || !$params["name"])
51
            throw new CpeSdk\CpeException("Can't instantiate BasicActivity: 'name' is not provided or empty !\n", 
52
			    self::NO_ACTIVITY_NAME);
53
        
54
        if (!$cpeLogger)
55
            $this->cpeLogger = new CpeSdk\CpeLogger(null, $params["name"], $debug); 
56
        else
57
            $this->cpeLogger = $cpeLogger;
58
        
59
        // Create logger object. Use activity name for logger
60
61
        // Check if there is a version name
62
        if (!isset($params["version"]) || !$params["version"])
63
            throw new CpeSdk\CpeException("Can't instantiate BasicActivity: 'version' is not provided or empty !\n", 
64
			    self::NO_ACTIVITY_VERSION);
65
66
        // Initialize the activity in SWF if necessary
67
        $this->init_activity();
68
    }
69
    
70
    /**
71
     * We initialise the Activity in SWF
72
     * WE check if it is already registered or not
73
     * If not we register it
74
     */
75
    private function init_activity()
76
    {
77
        // Save activity info
78
        $this->activityType = array(
79
            "name"    => $this->params["name"],
80
            "version" => $this->params["version"]);
81
82
        try {
83
            // Check if activity already exists 
84
            $this->cpeSwfHandler->swf->describeActivityType(array(
85
                    "domain"       => $this->params["domain"],
86
                    "activityType" => $this->activityType
87
                ));
88
            
89
            // Activity exists as there is no exception
90
            return true;
91
        } catch (\Aws\Swf\Exception\UnknownResourceException $e) {
92
            $this->cpeLogger->log_out("ERROR", "[CPE SDK] ".basename(__FILE__), 
93
                "Activity '" . $this->params["name"] . "' doesn't exists. Creating it ...\n");
94
        }
95
        
96
        // Register activites if doesn't exists in SWF
97
        $this->cpeSwfHandler->swf->registerActivityType($this->params);
98
    }
99
100
    /**
101
     * We perform basic high level task verifications
102
     * WE make sure it contains an input and we save it
103
     * This is the first method to be called when your activity starts
104
     */
105
    public function do_task_check($task)
106
    {
107
        if (!$task)
108
            throw new CpeSdk\CpeException("Activity Task empty !", 
109
			    self::ACTIVITY_TASK_EMPTY); 
110
        
111
        if (!isset($task["input"]) || !$task["input"] ||
112
            $task["input"] == "")
113
            throw new CpeSdk\CpeException("No input provided to 'Activity'", 
114
			    self::NO_INPUT);
115
        
116
        // Save input string
117
        $this->input_str      = $task["input"];
118
119
        // Save Task info
120
        $this->activityId     = $task->get("activityId");
121
        $this->activityType   = $task->get("activityType");
122
        
123
        // Create a key workflowId:activityId to put in logs
124
        $this->activityLogKey =
125
            $task->get("workflowExecution")['workflowId'] 
126
            . ":$this->activityId";
127
    }
128
129
    /**
130
     * Perform JSON input validation
131
     * We capture the four keys that compose a CPE Task input
132
     */
133
    public function do_input_validation()
134
    {
135
        // Check JSON input
136
        if (!($this->input = json_decode($this->input_str)))
137
            throw new CpeSdk\CpeException("JSON input is invalid !", 
138
			    self::INPUT_INVALID);
139
    }
140
    
141
    /**
142
     * Send SQS notification saying your activity started
143
     * Implement this method in your activity
144
     * This is where your logic starts
145
     */ 
146
    public function do_activity($task)
147
    {
148
        // Send started through SQS to notify client
149
        $this->cpeSqsWriter->activity_started($task);
150
    }
151
152
    /**
153
     * Activity failed to SQS and SWF
154
     * Called by ActivityPoller if your activity throws an exeception 
155
     */
156
    public function activity_failed($task, $reason = "", $details = "")
157
    {
158
        try {
159
            // Notify client of failure
160
            $this->cpeSqsWriter->activity_failed($task, $reason, $details);
161
            
162
            $this->cpeLogger->log_out("ERROR", "[CPE SDK] ".basename(__FILE__),
163
                "[$reason] $details",
164
                $this->activityLogKey);
165
            
166
            $this->cpeSwfHandler->swf->respondActivityTaskFailed(array(
167
                    "taskToken" => $task["taskToken"],
168
                    "reason"    => $reason,
169
                    "details"   => $details,
170
                ));
171
        } catch (\Exception $e) {
172
            $this->cpeLogger->log_out("ERROR", "[CPE SDK] ".basename(__FILE__), 
173
                "Unable to send 'Task Failed' response ! " . $e->getMessage(),
174
                $this->activityLogKey);
175
            return false;
176
        }
177
    }
178
179
    /**
180
     * Send activity completed to SQS and SWF
181
     * Called by ActivityPoller once your activity completed
182
     */
183
    public function activity_completed($task, $result = null)
184
    {
185
        try {
186
            // Notify client of failure
187
            $this->cpeSqsWriter->activity_completed($task, $result);
188
        
189
            $this->cpeLogger->log_out("INFO", "[CPE SDK] ".basename(__FILE__),
190
                "Notify SWF activity is completed !",
191
                $this->activityLogKey);
192
            $this->cpeSwfHandler->swf->respondActivityTaskCompleted(array(
193
                    "taskToken" => $task["taskToken"],
194
                    "result"    => json_encode($result),
195
                ));
196
        } catch (\Exception $e) {
197
            $this->cpeLogger->log_out("ERROR", "[CPE SDK] ".basename(__FILE__), 
198
                "Unable to send 'Task Completed' response ! " . $e->getMessage(),
199
                $this->activityLogKey);
200
            return false;
201
        }
202
    }
203
    
204
    /**
205
     * Send heartbeat to SWF to keep the task alive.
206
     * Timeout is configurable at the Activity level in SWF
207
     * You must call this regularly within your logic to make sure SWF doesn't
208
     * consider your activity as 'timeed out'.
209
     */
210
    public function send_heartbeat($task, $details = null)
211
    {
212
        try {
213
            $taskToken = $task->get("taskToken");
214
            $this->cpeLogger->log_out("INFO", "[CPE SDK] ".basename(__FILE__), 
215
                "Sending heartbeat to SWF ...",
216
                $this->activityLogKey);
217
      
218
            $info = $this->cpeSwfHandler->swf->recordActivityTaskHeartbeat(array(
219
                    "details"   => $details,
220
                    "taskToken" => $taskToken));
221
222
            // Workflow returns if this task should be canceled
223
            if ($info->get("cancelRequested") == true)
224
            {
225
                $this->cpeLogger->log_out("WARNING", "[CPE SDK] ".basename(__FILE__), 
226
                    "Cancel has been requested for this task '" . $task->get("activityId") . "' ! Killing task ...",
227
                    $this->activityLogKey);
228
                throw new CpeSdk\CpeException("Cancel request. No heartbeat, leaving!",
229
                    self::HEARTBEAT_FAILED);
230
            }
231
        } catch (\Exception $e) {
232
            throw new CpeSdk\CpeException("Heartbeat failed !: ".$e->getMessage(),
233
                self::HEARTBEAT_FAILED);
234
        }
235
    }
236
}
237