@@ 9-47 (lines=39) @@ | ||
6 | from functools import wraps |
|
7 | ||
8 | ||
9 | def pickle_to_s3(server_side_encryption=None, array_job=True): |
|
10 | def real_decorator(f): |
|
11 | @wraps(f) |
|
12 | def wrapper(*args, **kwargs): |
|
13 | s3 = boto3.client("s3") |
|
14 | bucket = os.environ.get("CLOUDKNOT_JOBS_S3_BUCKET") |
|
15 | ||
16 | if array_job: |
|
17 | array_index = os.environ.get("AWS_BATCH_JOB_ARRAY_INDEX") |
|
18 | else: |
|
19 | array_index = '0' |
|
20 | ||
21 | jobid = os.environ.get("AWS_BATCH_JOB_ID") |
|
22 | ||
23 | if array_job: |
|
24 | jobid = jobid.split(':')[0] |
|
25 | ||
26 | key = '/'.join([ |
|
27 | 'cloudknot.jobs', |
|
28 | os.environ.get("CLOUDKNOT_S3_JOBDEF_KEY"), |
|
29 | jobid, |
|
30 | array_index, |
|
31 | '{0:03d}'.format(int(os.environ.get("AWS_BATCH_JOB_ATTEMPT"))), |
|
32 | 'output.pickle' |
|
33 | ]) |
|
34 | ||
35 | result = f(*args, **kwargs) |
|
36 | ||
37 | # Only pickle output and write to S3 if it is not None |
|
38 | if result is not None: |
|
39 | pickled_result = cloudpickle.dumps(result) |
|
40 | if server_side_encryption is None: |
|
41 | s3.put_object(Bucket=bucket, Body=pickled_result, Key=key) |
|
42 | else: |
|
43 | s3.put_object(Bucket=bucket, Body=pickled_result, Key=key, |
|
44 | ServerSideEncryption=server_side_encryption) |
|
45 | ||
46 | return wrapper |
|
47 | return real_decorator |
|
48 | ||
49 | ||
50 | def sensitivity_it(counter): |
@@ 9-47 (lines=39) @@ | ||
6 | from functools import wraps |
|
7 | ||
8 | ||
9 | def pickle_to_s3(server_side_encryption=None, array_job=True): |
|
10 | def real_decorator(f): |
|
11 | @wraps(f) |
|
12 | def wrapper(*args, **kwargs): |
|
13 | s3 = boto3.client("s3") |
|
14 | bucket = os.environ.get("CLOUDKNOT_JOBS_S3_BUCKET") |
|
15 | ||
16 | if array_job: |
|
17 | array_index = os.environ.get("AWS_BATCH_JOB_ARRAY_INDEX") |
|
18 | else: |
|
19 | array_index = '0' |
|
20 | ||
21 | jobid = os.environ.get("AWS_BATCH_JOB_ID") |
|
22 | ||
23 | if array_job: |
|
24 | jobid = jobid.split(':')[0] |
|
25 | ||
26 | key = '/'.join([ |
|
27 | 'cloudknot.jobs', |
|
28 | os.environ.get("CLOUDKNOT_S3_JOBDEF_KEY"), |
|
29 | jobid, |
|
30 | array_index, |
|
31 | '{0:03d}'.format(int(os.environ.get("AWS_BATCH_JOB_ATTEMPT"))), |
|
32 | 'output.pickle' |
|
33 | ]) |
|
34 | ||
35 | result = f(*args, **kwargs) |
|
36 | ||
37 | # Only pickle output and write to S3 if it is not None |
|
38 | if result is not None: |
|
39 | pickled_result = cloudpickle.dumps(result) |
|
40 | if server_side_encryption is None: |
|
41 | s3.put_object(Bucket=bucket, Body=pickled_result, Key=key) |
|
42 | else: |
|
43 | s3.put_object(Bucket=bucket, Body=pickled_result, Key=key, |
|
44 | ServerSideEncryption=server_side_encryption) |
|
45 | ||
46 | return wrapper |
|
47 | return real_decorator |
|
48 | ||
49 | ||
50 | def sensitivity_it(counter): |