sensitivity_it.pickle_to_s3()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 39
Code Lines 28

Duplication

Lines 39
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 28
dl 39
loc 39
rs 8.7413
c 0
b 0
f 0
cc 5
nop 2
1
import boto3
2
import cloudpickle
3
import os
4
import pickle
5
from argparse import ArgumentParser
6
from functools import wraps
7
8
9 View Code Duplication
def pickle_to_s3(server_side_encryption=None, array_job=True):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
10
    def real_decorator(f):
11
        @wraps(f)
12
        def wrapper(*args, **kwargs):
13
            s3 = boto3.client("s3")
14
            bucket = os.environ.get("CLOUDKNOT_JOBS_S3_BUCKET")
15
16
            if array_job:
17
                array_index = os.environ.get("AWS_BATCH_JOB_ARRAY_INDEX")
18
            else:
19
                array_index = '0'
20
21
            jobid = os.environ.get("AWS_BATCH_JOB_ID")
22
23
            if array_job:
24
                jobid = jobid.split(':')[0]
25
26
            key = '/'.join([
27
                'cloudknot.jobs',
28
                os.environ.get("CLOUDKNOT_S3_JOBDEF_KEY"),
29
                jobid,
30
                array_index,
31
                '{0:03d}'.format(int(os.environ.get("AWS_BATCH_JOB_ATTEMPT"))),
32
                'output.pickle'
33
            ])
34
35
            result = f(*args, **kwargs)
36
37
            # Only pickle output and write to S3 if it is not None
38
            if result is not None:
39
                pickled_result = cloudpickle.dumps(result)
40
                if server_side_encryption is None:
41
                    s3.put_object(Bucket=bucket, Body=pickled_result, Key=key)
42
                else:
43
                    s3.put_object(Bucket=bucket, Body=pickled_result, Key=key,
44
                                  ServerSideEncryption=server_side_encryption)
45
46
        return wrapper
47
    return real_decorator
48
49
50 View Code Duplication
def sensitivity_it(counter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
51
    
52
    import matplotlib as mpl
53
    mpl.use('Agg')
54
    import matplotlib.pyplot as plt
55
    import diff_classifier.aws as aws
56
    import diff_classifier.utils as ut
57
    import diff_classifier.msd as msd
58
    import diff_classifier.features as ft
59
    import diff_classifier.imagej as ij
60
    import diff_classifier.heatmaps as hm
61
62
    from scipy.spatial import Voronoi
63
    import scipy.stats as stats
64
    from shapely.geometry import Point
65
    from shapely.geometry.polygon import Polygon
66
    import matplotlib.cm as cm
67
    import os
68
    import os.path as op
69
    import numpy as np
70
    import numpy.ma as ma
71
    import pandas as pd
72
    import boto3
73
    import itertools
74
    
75
    #Sweep parameters
76
    #----------------------------------
77
    radius = [4.5, 6.0, 7.0]
78
    do_median_filtering = [True, False]
79
    quality = [1.5, 4.5, 8.5]
80
    linking_max_distance = [6.0, 10.0, 15.0]
81
    gap_closing_max_distance = [6.0, 10.0, 15.0]
82
    max_frame_gap = [1, 2, 5]
83
    track_displacement = [0.0, 10.0, 20.0]
84
85
    sweep = [radius, do_median_filtering, quality, linking_max_distance, gap_closing_max_distance, max_frame_gap,
86
             track_displacement]
87
    all_params = list(itertools.product(*sweep))
88
89
    #Variable prep
90
    #----------------------------------
91
    s3 = boto3.client('s3')
92
93
    folder = '01_18_Experiment'
94
    s_folder = '{}/sensitivity'.format(folder)
95
    local_folder = '.'
96
    prefix = "P1_S1_R_0001_2_2"
97
    name = "{}.tif".format(prefix)
98
    local_im = op.join(local_folder, name)
99
    aws.download_s3('{}/{}/{}.tif'.format(folder, prefix.split('_')[0], prefix), '{}.tif'.format(prefix))
100
101
    outputs = np.zeros((len(all_params), len(all_params[0])+2))
102
103
    #Tracking and calculations
104
    #------------------------------------
105
    params = all_params[counter]
106
    outfile = 'Traj_{}_{}.csv'.format(name.split('.')[0], counter)
107
    msd_file = 'msd_{}_{}.csv'.format(name.split('.')[0], counter)
108
    geo_file = 'geomean_{}_{}.csv'.format(name.split('.')[0], counter)
109
    geoS_file = 'geoSEM_{}_{}.csv'.format(name.split('.')[0], counter)
110
    msd_image = 'msds_{}_{}.png'.format(name.split('.')[0], counter)
111
    iter_name = "{}_{}".format(prefix, counter)
112
113
    ij.track(local_im, outfile, template=None, fiji_bin=None, radius=params[0], threshold=0., 
114
             do_median_filtering=params[1], quality=params[2], x=511, y=511, ylo=1, median_intensity=300.0, snr=0.0, 
115
             linking_max_distance=params[3], gap_closing_max_distance=params[4], max_frame_gap=params[5],
116
             track_displacement=params[6])
117
118
    traj = ut.csv_to_pd(outfile)
119
    msds = msd.all_msds2(traj, frames=651)
120
    msds.to_csv(msd_file)
121
    gmean1, gSEM1 = hm.plot_individual_msds(iter_name, alpha=0.05)
122
    np.savetxt(geo_file, gmean1, delimiter=",")
123
    np.savetxt(geoS_file, gSEM1, delimiter=",")
124
125
    aws.upload_s3(outfile, '{}/{}'.format(s_folder, outfile))
126
    aws.upload_s3(msd_file, '{}/{}'.format(s_folder, msd_file))
127
    aws.upload_s3(geo_file, '{}/{}'.format(s_folder, geo_file))
128
    aws.upload_s3(geoS_file, '{}/{}'.format(s_folder, geoS_file))
129
    aws.upload_s3(msd_image, '{}/{}'.format(s_folder, msd_image))
130
131
    print('Successful parameter calculations for {}'.format(iter_name))
132
133
134 View Code Duplication
if __name__ == "__main__":
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
135
    description = ('Download input from an S3 bucket and provide that input '
136
                   'to our function. On return put output in an S3 bucket.')
137
138
    parser = ArgumentParser(description=description)
139
140
    parser.add_argument(
141
        'bucket', metavar='bucket', type=str,
142
        help='The S3 bucket for pulling input and pushing output.'
143
    )
144
145
    parser.add_argument(
146
        '--starmap', action='store_true',
147
        help='Assume input has already been grouped into a single tuple.'
148
    )
149
150
    parser.add_argument(
151
        '--arrayjob', action='store_true',
152
        help='If True, this is an array job and it should reference the '
153
             'AWS_BATCH_JOB_ARRAY_INDEX environment variable.'
154
    )
155
156
    parser.add_argument(
157
        '--sse', dest='sse', action='store',
158
        choices=['AES256', 'aws:kms'], default=None,
159
        help='Server side encryption algorithm used when storing objects '
160
             'in S3.'
161
    )
162
163
    args = parser.parse_args()
164
165
    s3 = boto3.client('s3')
166
    bucket = args.bucket
167
168
    jobid = os.environ.get("AWS_BATCH_JOB_ID")
169
170
    if args.arrayjob:
171
        jobid = jobid.split(':')[0]
172
173
    key = '/'.join([
174
        'cloudknot.jobs',
175
        os.environ.get("CLOUDKNOT_S3_JOBDEF_KEY"),
176
        jobid,
177
        'input.pickle'
178
    ])
179
180
    response = s3.get_object(Bucket=bucket, Key=key)
181
    input_ = pickle.loads(response.get('Body').read())
182
183
    if args.arrayjob:
184
        array_index = int(os.environ.get("AWS_BATCH_JOB_ARRAY_INDEX"))
185
        input_ = input_[array_index]
186
187
    if args.starmap:
188
        pickle_to_s3(args.sse, args.arrayjob)(sensitivity_it)(*input_)
189
    else:
190
        pickle_to_s3(args.sse, args.arrayjob)(sensitivity_it)(input_)
191