|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
|
|
3
|
|
|
from __future__ import ( |
|
4
|
|
|
absolute_import, |
|
5
|
|
|
division, |
|
6
|
|
|
print_function |
|
7
|
|
|
) |
|
8
|
|
|
|
|
9
|
|
|
import argparse |
|
10
|
|
|
import logging |
|
11
|
|
|
import os |
|
12
|
|
|
import sys |
|
13
|
|
|
|
|
14
|
|
|
import yaml |
|
15
|
|
|
|
|
16
|
|
|
sys.path.append( |
|
17
|
|
|
os.path.realpath( |
|
18
|
|
|
os.path.join( |
|
19
|
|
|
os.path.dirname(__file__), |
|
20
|
|
|
'..' |
|
21
|
|
|
) |
|
22
|
|
|
) |
|
23
|
|
|
) |
|
24
|
|
|
|
|
25
|
|
|
from pydecider.register import register |
|
26
|
|
|
from pydecider.plan import Plan |
|
27
|
|
|
from pydecider.swf_decider import SWFDecider as Decider |
|
28
|
|
|
|
|
29
|
|
|
def main(): |
|
30
|
|
|
parser = argparse.ArgumentParser(description='Generic SWF Decider daemon. Read you Plan.yaml and process your workflow accordingly.') |
|
31
|
|
|
parser.add_argument('-d', '--domain', required=True, help='The SWF domain for your workflow') |
|
32
|
|
|
parser.add_argument('-t', '--task_list', required=True, help='The Decision TaskList your decider will listen to') |
|
33
|
|
|
parser.add_argument('--plan', required=True, help='The location of your Plan file') |
|
34
|
|
|
parser.add_argument('--output_queue', required=False, help='SQS queue to output updates to') |
|
35
|
|
|
parser.add_argument('--plan_name', required=False, help='If you want to override the plan name in your Plan file') |
|
36
|
|
|
parser.add_argument('--plan_version', required=False, help='If you want to override the plan version in your Plan file') |
|
37
|
|
|
parser.add_argument('--log_file', required=False, help='Location of the log file') |
|
38
|
|
|
args = parser.parse_args() |
|
39
|
|
|
|
|
40
|
|
|
log_file = "/var/tmp/logs/cpe/decider.log" |
|
41
|
|
|
if args.log_file: |
|
42
|
|
|
log_file = args.log_file |
|
43
|
|
|
logging.basicConfig(level=logging.INFO, |
|
44
|
|
|
filename=log_file) |
|
45
|
|
|
|
|
46
|
|
|
# Load the main plan data |
|
47
|
|
|
with open(args.plan) as f: |
|
48
|
|
|
plan_data = yaml.load(f) |
|
49
|
|
|
|
|
50
|
|
|
if args.plan_name: |
|
51
|
|
|
plan_data['name'] = args.plan_name |
|
52
|
|
|
if args.plan_version: |
|
53
|
|
|
plan_data['version'] = args.plan_version |
|
54
|
|
|
|
|
55
|
|
|
# Construct the plan |
|
56
|
|
|
p = Plan.from_data(plan_data) |
|
57
|
|
|
logging.info('Loaded plan %r', p) |
|
58
|
|
|
|
|
59
|
|
|
# Make sure the plan is registered in SWF |
|
60
|
|
|
register(domain=args.domain, |
|
61
|
|
|
workflows=((p.name, p.version, |
|
62
|
|
|
p.default_execution_start_to_close_timeout, |
|
63
|
|
|
p.default_task_start_to_close_timeout),)) |
|
64
|
|
|
|
|
65
|
|
|
if 'OUTPUT_QUEUE' in os.environ: |
|
66
|
|
|
output_queue = os.environ['OUTPUT_QUEUE'] |
|
67
|
|
|
elif (args.output_queue is not None and |
|
68
|
|
|
args.output_queue != ""): |
|
69
|
|
|
output_queue = args.output_queue |
|
70
|
|
|
else: |
|
71
|
|
|
raise Exception("No 'OUTPUT_QUEUE' provided!"); |
|
72
|
|
|
|
|
73
|
|
|
d = Decider(domain=args.domain, task_list=args.task_list, plan=p, output_queue=output_queue) |
|
74
|
|
|
|
|
75
|
|
|
while d.run(): |
|
76
|
|
|
pass |
|
77
|
|
|
|
|
78
|
|
|
if __name__ == '__main__': |
|
79
|
|
|
main() |
|
80
|
|
|
|