Completed
Push — master ( 98e9e8...6ab974 )
by Fabian
8s
created

EcsTaskDefinition.validate_container_options()   A

Complexity

Conditions 3

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 4
ccs 4
cts 4
cp 1
rs 10
cc 3
crap 3
1 1
from datetime import datetime
2 1
from json import dumps
3
4 1
import boto3
5 1
from botocore.exceptions import ClientError, NoCredentialsError
6
7
8 1
class EcsClient(object):
9 1
    def __init__(self, access_key_id=None, secret_access_key=None, region=None, profile=None):
10 1
        session = boto3.session.Session(aws_access_key_id=access_key_id,
11
                                        aws_secret_access_key=secret_access_key,
12
                                        region_name=region,
13
                                        profile_name=profile)
14 1
        self.boto = session.client(u'ecs')
15
16 1
    def describe_services(self, cluster_name, service_name):
17 1
        return self.boto.describe_services(cluster=cluster_name, services=[service_name])
18
19 1
    def describe_task_definition(self, task_definition_arn):
20 1
        return self.boto.describe_task_definition(taskDefinition=task_definition_arn)
21
22 1
    def list_tasks(self, cluster_name, service_name):
23 1
        return self.boto.list_tasks(cluster=cluster_name, serviceName=service_name)
24
25 1
    def describe_tasks(self, cluster_name, task_arns):
26 1
        return self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns)
27
28 1
    def register_task_definition(self, family, containers, volumes):
29 1
        return self.boto.register_task_definition(family=family, containerDefinitions=containers, volumes=volumes)
30
31 1
    def deregister_task_definition(self, task_definition_arn):
32 1
        return self.boto.deregister_task_definition(taskDefinition=task_definition_arn)
33
34 1
    def update_service(self, cluster, service, desired_count, task_definition):
35 1
        return self.boto.update_service(
36
            cluster=cluster,
37
            service=service,
38
            desiredCount=desired_count,
39
            taskDefinition=task_definition
40
        )
41
42
43 1
class EcsService(dict):
44 1
    def __init__(self, cluster, iterable=None, **kwargs):
45 1
        self._cluster = cluster
46 1
        super(EcsService, self).__init__(iterable, **kwargs)
47
48 1
    def set_desired_count(self, desired_count):
49 1
        self[u'desiredCount'] = desired_count
50
51 1
    def set_task_definition(self, task_definition):
52 1
        self[u'taskDefinition'] = task_definition.arn
53
54 1
    @property
55
    def cluster(self):
56 1
        return self._cluster
57
58 1
    @property
59
    def name(self):
60 1
        return self.get(u'serviceName')
61
62 1
    @property
63
    def task_definition(self):
64 1
        return self.get(u'taskDefinition')
65
66 1
    @property
67
    def desired_count(self):
68 1
        return self.get(u'desiredCount')
69
70 1
    @property
71
    def deployment_created_at(self):
72 1
        for deployment in self.get(u'deployments'):
73 1
            if deployment.get(u'status') == u'PRIMARY':
74 1
                return deployment.get(u'createdAt')
75 1
        return datetime.now()
76
77 1
    @property
78
    def deployment_updated_at(self):
79 1
        for deployment in self.get(u'deployments'):
80 1
            if deployment.get(u'status') == u'PRIMARY':
81 1
                return deployment.get(u'updatedAt')
82 1
        return datetime.now()
83
84 1
    @property
85
    def errors(self):
86 1
        errors = {}
87 1
        for event in self.get('events'):
88 1
            if u'unable' in event[u'message'] and event[u'createdAt'] >= self.deployment_updated_at:
89 1
                errors[event[u'createdAt'].isoformat()] = 'ERROR: %s' % event[u'message']
90 1
        return errors
91
92 1
    @property
93
    def older_errors(self):
94 1
        errors = {}
95 1
        for event in self.get('events'):
96 1
            if u'unable' in event[u'message'] and \
97
                                    self.deployment_created_at <= event[u'createdAt'] <= self.deployment_updated_at:
98 1
                errors[event[u'createdAt'].isoformat()] = 'ERROR: %s' % event[u'message']
99 1
        return errors
100
101
102 1
class EcsTaskDefinition(dict):
103 1
    def __init__(self, iterable=None, **kwargs):
104 1
        super(EcsTaskDefinition, self).__init__(iterable, **kwargs)
105 1
        self._diff = []
106
107 1
    @property
108
    def containers(self):
109 1
        return self.get(u'containerDefinitions')
110
111 1
    @property
112
    def container_names(self):
113 1
        for container in self.get(u'containerDefinitions'):
114 1
            yield container[u'name']
115
116 1
    @property
117
    def volumes(self):
118 1
        return self.get(u'volumes')
119
120 1
    @property
121
    def arn(self):
122 1
        return self.get(u'taskDefinitionArn')
123
124 1
    @property
125
    def family(self):
126 1
        return self.get(u'family')
127
128 1
    @property
129
    def revision(self):
130 1
        return self.get(u'revision')
131
132 1
    @property
133
    def diff(self):
134 1
        return self._diff
135
136 1
    def set_images(self, tag=None, **images):
137 1
        self.validate_container_options(**images)
138 1
        for container in self.containers:
139 1
            if container[u'name'] in images:
140 1
                new_image = images[container[u'name']]
141 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'image', new_image, container[u'image'])
142 1
                self._diff.append(diff)
143 1
                container[u'image'] = new_image
144 1
            elif tag:
145 1
                image_definition = container[u'image'].rsplit(u':', 1)
146 1
                new_image = u'%s:%s' % (image_definition[0], tag.strip())
147 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'image', new_image, container[u'image'])
148 1
                self._diff.append(diff)
149 1
                container[u'image'] = new_image
150
151 1
    def set_commands(self, **commands):
152 1
        self.validate_container_options(**commands)
153 1
        for container in self.containers:
154 1
            if container[u'name'] in commands:
155 1
                new_command = commands[container[u'name']]
156 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'command', new_command, container.get(u'command'))
157 1
                self._diff.append(diff)
158 1
                container[u'command'] = [new_command]
159
160 1
    def set_environment(self, environment_list):
161 1
        environment = {}
162
163 1
        for env in environment_list:
164 1
            environment.setdefault(env[0], {})
165 1
            environment[env[0]][env[1]] = env[2]
166
167 1
        self.validate_container_options(**environment)
168 1
        for container in self.containers:
169 1
            if container[u'name'] in environment:
170 1
                self.apply_container_environment(container, environment[container[u'name']])
171
172 1
    def apply_container_environment(self, container, new_environment):
173 1
        old_environment = {env['name']: env['value'] for env in container.get('environment', {})}
174 1
        merged_environment = old_environment.copy()
175 1
        merged_environment.update(new_environment)
176
177 1
        diff = EcsTaskDefinitionDiff(container[u'name'], u'environment', dumps(merged_environment), dumps(old_environment))
178 1
        self._diff.append(diff)
179 1
        container[u'environment'] = merged_environment
180
181 1
    def validate_container_options(self, **container_options):
182 1
        for container_name in container_options:
183 1
            if container_name not in self.container_names:
184 1
                raise UnknownContainerError(u'Unknown container: %s' % container_name)
185
186
187 1
class EcsTaskDefinitionDiff(object):
188 1
    def __init__(self, container, field, value, old_value):
189 1
        self.container = container
190 1
        self.field = field
191 1
        self.value = value
192 1
        self.old_value = old_value
193
194 1
    def __repr__(self):
195 1
        return u"Changed %s of container '%s' to: %s (was: %s)" % \
196
               (self.field, self.container, self.value, self.old_value)
197
198
199 1
class EcsAction(object):
200 1
    def __init__(self, client, cluster_name, service_name):
201 1
        self._client = client
202 1
        self._cluster_name = cluster_name
203 1
        self._service_name = service_name
204
205 1
        try:
206 1
            self._service = self.get_service()
207 1
        except IndexError:
208 1
            raise ConnectionError(u'An error occurred when calling the DescribeServices operation: Service not found.')
209 1
        except ClientError as e:
210 1
            raise ConnectionError(str(e))
211 1
        except NoCredentialsError:
212 1
            raise ConnectionError(u'Unable to locate credentials. Configure credentials by running "aws configure".')
213
214 1
    def get_service(self):
215 1
        services_definition = self._client.describe_services(self._cluster_name, self._service_name)
216 1
        return EcsService(self._cluster_name, services_definition[u'services'][0])
217
218 1
    def get_current_task_definition(self, service):
219 1
        task_definition_payload = self._client.describe_task_definition(service.task_definition)
220 1
        task_definition = EcsTaskDefinition(task_definition_payload[u'taskDefinition'])
221 1
        return task_definition
222
223 1
    def update_task_definition(self, task_definition):
224 1
        response = self._client.register_task_definition(task_definition.family, task_definition.containers,
225
                                                         task_definition.volumes)
226 1
        new_task_definition = EcsTaskDefinition(response[u'taskDefinition'])
227 1
        self._client.deregister_task_definition(task_definition.arn)
228 1
        return new_task_definition
229
230 1
    def update_service(self, service):
231 1
        response = self._client.update_service(service.cluster, service.name, service.desired_count,
232
                                               service.task_definition)
233 1
        return EcsService(self._cluster_name, response[u'service'])
234
235 1
    def is_deployed(self, service):
236 1
        running_tasks = self._client.list_tasks(service.cluster, service.name)
237 1
        if not running_tasks[u'taskArns']:
238 1
            return service.desired_count == 0
239 1
        return service.desired_count == self.get_running_tasks_count(service, running_tasks[u'taskArns'])
240
241 1
    def get_running_tasks_count(self, service, task_arns):
242 1
        running_count = 0
243 1
        tasks_details = self._client.describe_tasks(self._cluster_name, task_arns)
244 1
        for task in tasks_details[u'tasks']:
245 1
            if task[u'taskDefinitionArn'] == service.task_definition and task[u'lastStatus'] == u'RUNNING':
246 1
                running_count += 1
247 1
        return running_count
248
249 1
    @property
250
    def client(self):
251 1
        return self._client
252
253 1
    @property
254
    def service(self):
255 1
        return self._service
256
257 1
    @property
258
    def cluster_name(self):
259 1
        return self._cluster_name
260
261 1
    @property
262
    def service_name(self):
263 1
        return self._service_name
264
265
266 1
class DeployAction(EcsAction):
267 1
    def deploy(self, task_definition):
268 1
        self._service.set_task_definition(task_definition)
269 1
        return self.update_service(self._service)
270
271
272 1
class ScaleAction(EcsAction):
273 1
    def scale(self, desired_count):
274 1
        self._service.set_desired_count(desired_count)
275 1
        return self.update_service(self._service)
276
277
278 1
class EcsError(Exception):
279 1
    pass
280
281
282 1
class ConnectionError(EcsError):
283 1
    pass
284
285
286 1
class UnknownContainerError(EcsError):
287
    pass
288