|
1
|
|
|
from __future__ import ( |
|
|
|
|
|
|
2
|
|
|
absolute_import, |
|
3
|
|
|
division, |
|
4
|
|
|
print_function |
|
5
|
|
|
) |
|
6
|
|
|
|
|
7
|
|
|
import json |
|
8
|
|
|
import logging |
|
9
|
|
|
import time |
|
10
|
|
|
|
|
11
|
|
|
import boto.swf.layer2 as swf |
|
|
|
|
|
|
12
|
|
|
import boto.sqs.connection as sqs |
|
|
|
|
|
|
13
|
|
|
import boto.sqs.queue as sqs_queue |
|
|
|
|
|
|
14
|
|
|
|
|
15
|
|
|
from .state_machine import StateMachine |
|
16
|
|
|
|
|
17
|
|
|
_LOGGER = logging.getLogger(__name__) |
|
18
|
|
|
|
|
19
|
|
|
|
|
20
|
|
|
class SWFDecider(swf.Decider): |
|
|
|
|
|
|
21
|
|
|
|
|
22
|
|
|
name = 'generic' |
|
23
|
|
|
version = '1.0' |
|
24
|
|
|
|
|
25
|
|
|
def __init__(self, domain, task_list, output_queue, plan=None): |
|
26
|
|
|
self.domain = domain |
|
27
|
|
|
self.task_list = task_list |
|
28
|
|
|
super(SWFDecider, self).__init__() |
|
29
|
|
|
|
|
30
|
|
|
self.statemachine = StateMachine(plan) |
|
31
|
|
|
self.sqs = sqs.SQSConnection() |
|
32
|
|
|
|
|
|
|
|
|
|
33
|
|
|
self.output_queue = sqs_queue.Queue(self.sqs, output_queue) |
|
34
|
|
|
|
|
35
|
|
|
def run(self): |
|
|
|
|
|
|
36
|
|
|
decision_task = self.poll() |
|
|
|
|
|
|
37
|
|
|
_LOGGER.debug('Received decision task: %r', decision_task) |
|
38
|
|
|
if 'events' in decision_task: |
|
39
|
|
|
# Collect the entire history if there are enough events to become |
|
40
|
|
|
# paginated |
|
41
|
|
|
events = decision_task['events'] |
|
42
|
|
|
while 'nextPageToken' in decision_task: |
|
43
|
|
|
decision_task = self.poll( |
|
|
|
|
|
|
44
|
|
|
next_page_token=decision_task['nextPageToken'] |
|
45
|
|
|
) |
|
46
|
|
|
if 'events' in decision_task: |
|
47
|
|
|
events.extend(decision_task['events']) |
|
48
|
|
|
|
|
49
|
|
|
# Compute decision based on events |
|
50
|
|
|
decisions = self._run(events, decision_task['workflowExecution']) |
|
51
|
|
|
self.complete(decisions=decisions) |
|
|
|
|
|
|
52
|
|
|
|
|
53
|
|
|
_LOGGER.debug('Tic') |
|
54
|
|
|
return True |
|
55
|
|
|
|
|
56
|
|
|
def _run(self, events, workflowExecution): |
|
|
|
|
|
|
57
|
|
|
# Run the statemachine on the events |
|
58
|
|
|
results = self.statemachine.eval(events) |
|
59
|
|
|
|
|
60
|
|
|
# Now we can do 4 things: |
|
61
|
|
|
# - Complete the workflow |
|
62
|
|
|
# - Fail the workflow |
|
63
|
|
|
# - Schedule more activities |
|
64
|
|
|
# - Nothing |
|
65
|
|
|
decisions = swf.Layer1Decisions() |
|
66
|
|
|
|
|
67
|
|
|
if self.statemachine.is_succeeded: |
|
68
|
|
|
self.sqs.send_message(self.output_queue, json.jumps({ |
|
|
|
|
|
|
69
|
|
|
'time': time.time(), |
|
70
|
|
|
'type': 'WORKFLOW_COMPLETED', |
|
71
|
|
|
'data': { |
|
72
|
|
|
'workflow': workflowExecution |
|
73
|
|
|
} |
|
74
|
|
|
})) |
|
75
|
|
|
decisions.complete_workflow_execution(result=None) |
|
76
|
|
|
return decisions |
|
77
|
|
|
|
|
78
|
|
|
elif self.statemachine.is_failed: |
|
79
|
|
|
# FIXME: Improve error reporting |
|
|
|
|
|
|
80
|
|
|
self.sqs.send_message(self.output_queue, json.dumps({ |
|
81
|
|
|
'time': time.time(), |
|
82
|
|
|
'type': 'WORKFLOW_FAILED', |
|
83
|
|
|
'data': { |
|
84
|
|
|
'workflow': workflowExecution |
|
85
|
|
|
} |
|
86
|
|
|
})) |
|
87
|
|
|
decisions.fail_workflow_execution(reason='State machine aborted') |
|
88
|
|
|
return decisions |
|
89
|
|
|
|
|
90
|
|
|
# We are still going, start any ready activity |
|
91
|
|
|
for next_step in results: |
|
92
|
|
|
activity = next_step.activity |
|
93
|
|
|
# FIXME: We are assuming JSON activity input here |
|
|
|
|
|
|
94
|
|
|
activity_input = ( |
|
95
|
|
|
json.dumps(next_step.activity_input) |
|
96
|
|
|
if next_step.activity_input is not None |
|
97
|
|
|
else None |
|
98
|
|
|
) |
|
99
|
|
|
|
|
100
|
|
|
self.sqs.send_message(self.output_queue, json.dumps({ |
|
101
|
|
|
'time': time.time(), |
|
102
|
|
|
'type': 'ACTIVITY_SCHEDULED', |
|
103
|
|
|
'data': { |
|
104
|
|
|
'workflow': workflowExecution, |
|
105
|
|
|
'activity': { |
|
106
|
|
|
'activityId': next_step.name, |
|
107
|
|
|
'activityType': activity.name, |
|
108
|
|
|
'activityVersion': activity.version |
|
109
|
|
|
} |
|
110
|
|
|
} |
|
111
|
|
|
})) |
|
112
|
|
|
decisions.schedule_activity_task( |
|
113
|
|
|
activity_id=next_step.name, |
|
114
|
|
|
activity_type_name=activity.name, |
|
115
|
|
|
activity_type_version=activity.version, |
|
116
|
|
|
task_list=activity.task_list, |
|
117
|
|
|
control=None, # FIXME: Do we want to pass context data? |
|
|
|
|
|
|
118
|
|
|
heartbeat_timeout=activity.heartbeat_timeout, |
|
119
|
|
|
schedule_to_close_timeout=activity.schedule_to_close_timeout, |
|
120
|
|
|
schedule_to_start_timeout=activity.schedule_to_start_timeout, |
|
121
|
|
|
start_to_close_timeout=activity.start_to_close_timeout, |
|
122
|
|
|
input=activity_input, |
|
123
|
|
|
) |
|
124
|
|
|
|
|
125
|
|
|
return decisions |
|
126
|
|
|
|
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.