1
|
|
|
#!/usr/bin/env python |
2
|
|
|
|
3
|
|
|
from __future__ import ( |
4
|
|
|
absolute_import, |
5
|
|
|
division, |
6
|
|
|
print_function |
7
|
|
|
) |
8
|
|
|
|
9
|
|
|
import argparse |
10
|
|
|
import logging |
11
|
|
|
import os |
12
|
|
|
import sys |
13
|
|
|
|
14
|
|
|
import yaml |
15
|
|
|
|
16
|
|
|
sys.path.append( |
17
|
|
|
os.path.realpath( |
18
|
|
|
os.path.join( |
19
|
|
|
os.path.dirname(__file__), |
20
|
|
|
'..' |
21
|
|
|
) |
22
|
|
|
) |
23
|
|
|
) |
24
|
|
|
|
25
|
|
|
from pydecider.register import register |
26
|
|
|
from pydecider.plan import Plan |
27
|
|
|
from pydecider.swf_decider import SWFDecider as Decider |
28
|
|
|
|
29
|
|
|
def main(): |
30
|
|
|
parser = argparse.ArgumentParser(description='Generic SWF Decider daemon. Read you Plan.yaml and process your workflow accordingly.') |
31
|
|
|
parser.add_argument('-d', '--domain', required=True, help='The SWF domain for your workflow') |
32
|
|
|
parser.add_argument('-t', '--task_list', required=True, help='The Decision TaskList your decider will listen to') |
33
|
|
|
parser.add_argument('--plan', required=True, help='The location of your Plan file') |
34
|
|
|
parser.add_argument('--output_queue', required=False, help='SQS queue to output updates to') |
35
|
|
|
parser.add_argument('--plan_name', required=False, help='If you want to override the plan name in your Plan file') |
36
|
|
|
parser.add_argument('--plan_version', required=False, help='If you want to override the plan version in your Plan file') |
37
|
|
|
parser.add_argument('--log_file', required=False, help='Location of the log file') |
38
|
|
|
args = parser.parse_args() |
39
|
|
|
|
40
|
|
|
log_file = "/var/tmp/logs/cpe/decider.log" |
41
|
|
|
if args.log_file: |
42
|
|
|
log_file = args.log_file |
43
|
|
|
logging.basicConfig(level=logging.DEBUG, |
44
|
|
|
filename=log_file) |
45
|
|
|
|
46
|
|
|
# Load the main plan data |
47
|
|
|
with open(args.plan) as f: |
48
|
|
|
plan_data = yaml.load(f) |
49
|
|
|
|
50
|
|
|
if args.plan_name: |
51
|
|
|
plan_data['name'] = args.plan_name |
52
|
|
|
if args.plan_version: |
53
|
|
|
plan_data['version'] = args.plan_version |
54
|
|
|
|
55
|
|
|
# Construct the plan |
56
|
|
|
p = Plan.from_data(plan_data) |
57
|
|
|
logging.info('Loaded plan %r', p) |
58
|
|
|
|
59
|
|
|
# Make sure the plan is registered in SWF |
60
|
|
|
register(domain=args.domain, |
61
|
|
|
workflows=((p.name, p.version),)) |
62
|
|
|
|
63
|
|
|
if 'OUTPUT_QUEUE' in os.environ: |
64
|
|
|
output_queue = os.environ['OUTPUT_QUEUE'] |
65
|
|
|
elif (args.output_queue is not None and |
66
|
|
|
args.output_queue != ""): |
67
|
|
|
output_queue = args.output_queue |
68
|
|
|
else: |
69
|
|
|
raise Exception("No 'OUTPUT_QUEUE' provided!"); |
70
|
|
|
|
71
|
|
|
d = Decider(domain=args.domain, task_list=args.task_list, plan=p, output_queue=args.output_queue) |
72
|
|
|
|
73
|
|
|
while d.run(): |
74
|
|
|
pass |
75
|
|
|
|
76
|
|
|
if __name__ == '__main__': |
77
|
|
|
main() |
78
|
|
|
|