1
|
|
|
from __future__ import ( |
|
|
|
|
2
|
|
|
absolute_import, |
3
|
|
|
division, |
4
|
|
|
print_function |
5
|
|
|
) |
6
|
|
|
|
7
|
|
|
import json |
8
|
|
|
import logging |
9
|
|
|
import time |
10
|
|
|
|
11
|
|
|
import boto.swf.layer2 as swf |
|
|
|
|
12
|
|
|
import boto.sqs.connection as sqs |
|
|
|
|
13
|
|
|
import boto.sqs.queue as sqs_queue |
|
|
|
|
14
|
|
|
|
15
|
|
|
from .state_machine import StateMachine |
16
|
|
|
|
17
|
|
|
_LOGGER = logging.getLogger(__name__) |
18
|
|
|
|
19
|
|
|
|
20
|
|
|
class SWFDecider(swf.Decider): |
|
|
|
|
21
|
|
|
|
22
|
|
|
name = 'generic' |
23
|
|
|
version = '1.0' |
24
|
|
|
|
25
|
|
|
def __init__(self, domain, task_list, output_queue, plan=None): |
26
|
|
|
self.domain = domain |
27
|
|
|
self.task_list = task_list |
28
|
|
|
super(SWFDecider, self).__init__() |
29
|
|
|
|
30
|
|
|
self.statemachine = StateMachine(plan) |
31
|
|
|
self.sqs = sqs.SQSConnection() |
32
|
|
|
|
|
|
|
|
33
|
|
|
self.output_queue = sqs_queue.Queue(self.sqs, output_queue) |
34
|
|
|
|
35
|
|
|
def run(self): |
|
|
|
|
36
|
|
|
decision_task = self.poll() |
|
|
|
|
37
|
|
|
_LOGGER.info('Received decision task: %r', decision_task) |
38
|
|
|
if 'events' in decision_task: |
39
|
|
|
# Collect the entire history if there are enough events to become |
40
|
|
|
# paginated |
41
|
|
|
events = decision_task['events'] |
42
|
|
|
while 'nextPageToken' in decision_task: |
43
|
|
|
decision_task = self.poll( |
|
|
|
|
44
|
|
|
next_page_token=decision_task['nextPageToken'] |
45
|
|
|
) |
46
|
|
|
if 'events' in decision_task: |
47
|
|
|
events.extend(decision_task['events']) |
48
|
|
|
|
49
|
|
|
# Compute decision based on events |
50
|
|
|
decisions = self._run(events, decision_task['workflowExecution']) |
51
|
|
|
self.complete(decisions=decisions) |
|
|
|
|
52
|
|
|
|
53
|
|
|
_LOGGER.debug('Tic') |
54
|
|
|
return True |
55
|
|
|
|
56
|
|
|
def _run(self, events, workflowExecution): |
|
|
|
|
57
|
|
|
# Run the statemachine on the events |
58
|
|
|
results = self.statemachine.eval(events) |
59
|
|
|
|
60
|
|
|
# Now we can do 4 things: |
61
|
|
|
# - Complete the workflow |
62
|
|
|
# - Fail the workflow |
63
|
|
|
# - Schedule more activities |
64
|
|
|
# - Nothing |
65
|
|
|
decisions = swf.Layer1Decisions() |
66
|
|
|
|
67
|
|
|
if self.statemachine.is_succeeded: |
68
|
|
|
self.sqs.send_message(self.output_queue, json.dumps({ |
69
|
|
|
'time': time.time(), |
70
|
|
|
'type': 'WORKFLOW_COMPLETED', |
71
|
|
|
'data': { |
72
|
|
|
'workflow': workflowExecution |
73
|
|
|
} |
74
|
|
|
})) |
75
|
|
|
decisions.complete_workflow_execution(result=None) |
76
|
|
|
return decisions |
77
|
|
|
|
78
|
|
|
elif self.statemachine.is_failed: |
79
|
|
|
# FIXME: Improve error reporting |
|
|
|
|
80
|
|
|
self.sqs.send_message(self.output_queue, json.dumps({ |
81
|
|
|
'time': time.time(), |
82
|
|
|
'type': 'WORKFLOW_FAILED', |
83
|
|
|
'data': { |
84
|
|
|
'workflow': workflowExecution |
85
|
|
|
} |
86
|
|
|
})) |
87
|
|
|
decisions.fail_workflow_execution(reason='State machine aborted') |
88
|
|
|
return decisions |
89
|
|
|
|
90
|
|
|
# We are still going, start any ready activity |
91
|
|
|
for next_step in results: |
92
|
|
|
|
93
|
|
|
activity = next_step.activity |
94
|
|
|
# FIXME: We are assuming JSON activity input here |
|
|
|
|
95
|
|
|
activity_input = ( |
96
|
|
|
json.dumps(next_step.activity_input) |
97
|
|
|
if next_step.activity_input is not None |
98
|
|
|
else None |
99
|
|
|
) |
100
|
|
|
|
101
|
|
|
self.sqs.send_message(self.output_queue, json.dumps({ |
102
|
|
|
'time': time.time(), |
103
|
|
|
'type': 'ACTIVITY_SCHEDULED', |
104
|
|
|
'data': { |
105
|
|
|
'workflow': workflowExecution, |
106
|
|
|
'activity': { |
107
|
|
|
'activityId': next_step.name, |
108
|
|
|
'activityType': activity.name, |
109
|
|
|
'activityVersion': activity.version |
110
|
|
|
} |
111
|
|
|
} |
112
|
|
|
})) |
113
|
|
|
decisions.schedule_activity_task( |
114
|
|
|
activity_id=next_step.name, |
115
|
|
|
activity_type_name=activity.name, |
116
|
|
|
activity_type_version=activity.version, |
117
|
|
|
task_list=activity.task_list, |
118
|
|
|
control=None, # FIXME: Do we want to pass context data? |
|
|
|
|
119
|
|
|
heartbeat_timeout=activity.heartbeat_timeout, |
120
|
|
|
schedule_to_close_timeout=activity.schedule_to_close_timeout, |
121
|
|
|
schedule_to_start_timeout=activity.schedule_to_start_timeout, |
122
|
|
|
start_to_close_timeout=activity.start_to_close_timeout, |
123
|
|
|
input=activity_input, |
124
|
|
|
) |
125
|
|
|
|
126
|
|
|
return decisions |
127
|
|
|
|
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.