Completed
Branch feature/ignore-task-placement-... (472d66)
by Fabian
01:48
created

EcsActionError

Complexity

Total Complexity 0

Size/Duplication

Total Lines 2
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 0
c 1
b 0
f 0
dl 0
loc 2
ccs 2
cts 2
cp 1
1 1
from datetime import datetime
2 1
from json import dumps
3
4 1
import boto3
5 1
from botocore.exceptions import ClientError, NoCredentialsError
6 1
from dateutil.tz.tz import tzlocal
7
8
9 1
class EcsClient(object):
10 1
    def __init__(self, access_key_id=None, secret_access_key=None, region=None, profile=None):
11 1
        session = boto3.session.Session(aws_access_key_id=access_key_id,
12
                                        aws_secret_access_key=secret_access_key,
13
                                        region_name=region,
14
                                        profile_name=profile)
15 1
        self.boto = session.client(u'ecs')
16
17 1
    def describe_services(self, cluster_name, service_name):
18 1
        return self.boto.describe_services(cluster=cluster_name, services=[service_name])
19
20 1
    def describe_task_definition(self, task_definition_arn):
21 1
        return self.boto.describe_task_definition(taskDefinition=task_definition_arn)
22
23 1
    def list_tasks(self, cluster_name, service_name):
24 1
        return self.boto.list_tasks(cluster=cluster_name, serviceName=service_name)
25
26 1
    def describe_tasks(self, cluster_name, task_arns):
27 1
        return self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns)
28
29 1
    def register_task_definition(self, family, containers, volumes, role_arn):
30 1
        return self.boto.register_task_definition(
31
            family=family,
32
            containerDefinitions=containers,
33
            volumes=volumes,
34
            taskRoleArn=role_arn or ''
35
        )
36
37 1
    def deregister_task_definition(self, task_definition_arn):
38 1
        return self.boto.deregister_task_definition(taskDefinition=task_definition_arn)
39
40 1
    def update_service(self, cluster, service, desired_count, task_definition):
41 1
        return self.boto.update_service(
42
            cluster=cluster,
43
            service=service,
44
            desiredCount=desired_count,
45
            taskDefinition=task_definition
46
        )
47
48 1
    def run_task(self, cluster, task_definition, count, started_by, overrides):
49 1
        return self.boto.run_task(
50
            cluster=cluster,
51
            taskDefinition=task_definition,
52
            count=count,
53
            startedBy=started_by,
54
            overrides=overrides
55
        )
56
57
58 1
class EcsService(dict):
59 1
    def __init__(self, cluster, iterable=None, **kwargs):
60 1
        self._cluster = cluster
61 1
        super(EcsService, self).__init__(iterable, **kwargs)
62
63 1
    def set_desired_count(self, desired_count):
64 1
        self[u'desiredCount'] = desired_count
65
66 1
    def set_task_definition(self, task_definition):
67 1
        self[u'taskDefinition'] = task_definition.arn
68
69 1
    @property
70
    def cluster(self):
71 1
        return self._cluster
72
73 1
    @property
74
    def name(self):
75 1
        return self.get(u'serviceName')
76
77 1
    @property
78
    def task_definition(self):
79 1
        return self.get(u'taskDefinition')
80
81 1
    @property
82
    def desired_count(self):
83 1
        return self.get(u'desiredCount')
84
85 1
    @property
86
    def deployment_created_at(self):
87 1
        for deployment in self.get(u'deployments'):
88 1
            if deployment.get(u'status') == u'PRIMARY':
89 1
                return deployment.get(u'createdAt')
90 1
        return datetime.now()
91
92 1
    @property
93
    def deployment_updated_at(self):
94 1
        for deployment in self.get(u'deployments'):
95 1
            if deployment.get(u'status') == u'PRIMARY':
96 1
                return deployment.get(u'updatedAt')
97 1
        return datetime.now()
98
99 1
    @property
100
    def errors(self):
101 1
        return self.get_warnings(self.deployment_updated_at)
102
103 1
    @property
104
    def older_errors(self):
105 1
        return self.get_warnings(self.deployment_created_at, self.deployment_updated_at)
106
107 1
    def get_warnings(self, since=None, until=None):
108 1
        since = since or self.deployment_updated_at
109 1
        until = until or datetime.now(tz=tzlocal())
110 1
        errors = {}
111 1
        for event in self.get('events'):
112 1
            if u'unable' in event[u'message'] and since < event[u'createdAt'] < until:
113 1
                errors[event[u'createdAt']] = event[u'message']
114 1
        return errors
115
116
117 1
class EcsTaskDefinition(dict):
118 1
    def __init__(self, iterable=None, **kwargs):
119 1
        super(EcsTaskDefinition, self).__init__(iterable, **kwargs)
120 1
        self._diff = []
121
122 1
    @property
123
    def containers(self):
124 1
        return self.get(u'containerDefinitions')
125
126 1
    @property
127
    def container_names(self):
128 1
        for container in self.get(u'containerDefinitions'):
129 1
            yield container[u'name']
130
131 1
    @property
132
    def volumes(self):
133 1
        return self.get(u'volumes')
134
135 1
    @property
136
    def arn(self):
137 1
        return self.get(u'taskDefinitionArn')
138
139 1
    @property
140
    def family(self):
141 1
        return self.get(u'family')
142
143 1
    @property
144
    def role_arn(self):
145 1
        return self.get(u'taskRoleArn')
146
147 1
    @property
148
    def revision(self):
149 1
        return self.get(u'revision')
150
151 1
    @property
152
    def family_revision(self):
153 1
        return '%s:%d' % (self.get(u'family'), self.get(u'revision'))
154
155 1
    @property
156
    def diff(self):
157 1
        return self._diff
158
159 1
    def get_overrides(self):
160 1
        override = dict()
161 1
        overrides = []
162 1
        for diff in self.diff:
163 1
            if override.get('name') != diff.container:
164 1
                override = dict(name=diff.container)
165 1
                overrides.append(override)
166 1
            if diff.field == 'command':
167 1
                override['command'] = self.get_overrides_command(diff.value)
168 1
            elif diff.field == 'environment':
169 1
                override['environment'] = self.get_overrides_environment(diff.value)
170 1
        return overrides
171
172 1
    def get_overrides_command(self, command):
173 1
        return command.split(' ')
174
175 1
    def get_overrides_environment(self, environment_dict):
176 1
        return [{"name": e, "value": environment_dict[e]} for e in environment_dict]
177
178 1
    def set_images(self, tag=None, **images):
179 1
        self.validate_container_options(**images)
180 1
        for container in self.containers:
181 1
            if container[u'name'] in images:
182 1
                new_image = images[container[u'name']]
183 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'image', new_image, container[u'image'])
184 1
                self._diff.append(diff)
185 1
                container[u'image'] = new_image
186 1
            elif tag:
187 1
                image_definition = container[u'image'].rsplit(u':', 1)
188 1
                new_image = u'%s:%s' % (image_definition[0], tag.strip())
189 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'image', new_image, container[u'image'])
190 1
                self._diff.append(diff)
191 1
                container[u'image'] = new_image
192
193 1
    def set_commands(self, **commands):
194 1
        self.validate_container_options(**commands)
195 1
        for container in self.containers:
196 1
            if container[u'name'] in commands:
197 1
                new_command = commands[container[u'name']]
198 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'command', new_command, container.get(u'command'))
199 1
                self._diff.append(diff)
200 1
                container[u'command'] = [new_command]
201
202 1
    def set_environment(self, environment_list):
203 1
        environment = {}
204
205 1
        for env in environment_list:
206 1
            environment.setdefault(env[0], {})
207 1
            environment[env[0]][env[1]] = env[2]
208
209 1
        self.validate_container_options(**environment)
210 1
        for container in self.containers:
211 1
            if container[u'name'] in environment:
212 1
                self.apply_container_environment(container, environment[container[u'name']])
213
214 1
    def apply_container_environment(self, container, new_environment):
215 1
        old_environment = {env['name']: env['value'] for env in container.get('environment', {})}
216 1
        merged_environment = old_environment.copy()
217 1
        merged_environment.update(new_environment)
218
219 1
        diff = EcsTaskDefinitionDiff(container[u'name'], u'environment', merged_environment, old_environment)
220 1
        self._diff.append(diff)
221
222 1
        container[u'environment'] = [{"name": e, "value": merged_environment[e]} for e in merged_environment]
223
224 1
    def validate_container_options(self, **container_options):
225 1
        for container_name in container_options:
226 1
            if container_name not in self.container_names:
227 1
                raise UnknownContainerError(u'Unknown container: %s' % container_name)
228
229 1
    def set_role_arn(self, role_arn):
230 1
        if role_arn:
231 1
            diff = EcsTaskDefinitionDiff(None, u'role_arn', role_arn, self[u'taskRoleArn'])
232 1
            self[u'taskRoleArn'] = role_arn
233 1
            self._diff.append(diff)
234
235
236 1
class EcsTaskDefinitionDiff(object):
237 1
    def __init__(self, container, field, value, old_value):
238 1
        self.container = container
239 1
        self.field = field
240 1
        self.value = value
241 1
        self.old_value = old_value
242
243 1
    def __repr__(self):
244 1
        if self.container:
245 1
            return u"Changed %s of container '%s' to: %s (was: %s)" % (
246
                self.field,
247
                self.container,
248
                dumps(self.value),
249
                dumps(self.old_value)
250
            )
251
        else:
252 1
            return u"Changed %s to: %s (was: %s)" % (
253
                self.field,
254
                dumps(self.value),
255
                dumps(self.old_value)
256
            )
257
258
259 1
class EcsActionError(Exception):
260 1
    pass
261
262
263 1
class EcsAction(object):
264 1
    def __init__(self, client, cluster_name, service_name):
265 1
        self._client = client
266 1
        self._cluster_name = cluster_name
267 1
        self._service_name = service_name
268
269 1
        try:
270 1
            self._service = self.get_service()
271 1
        except IndexError:
272 1
            raise ConnectionError(u'An error occurred when calling the DescribeServices operation: Service not found.')
273 1
        except ClientError as e:
274 1
            raise ConnectionError(str(e))
275 1
        except NoCredentialsError:
276 1
            raise ConnectionError(u'Unable to locate credentials. Configure credentials by running "aws configure".')
277
278 1
    def get_service(self):
279 1
        services_definition = self._client.describe_services(self._cluster_name, self._service_name)
280 1
        return EcsService(self._cluster_name, services_definition[u'services'][0])
281
282 1
    def get_current_task_definition(self, service):
283 1
        task_definition_payload = self._client.describe_task_definition(service.task_definition)
284 1
        task_definition = EcsTaskDefinition(task_definition_payload[u'taskDefinition'])
285 1
        return task_definition
286
287 1
    def get_task_definition(self, task_definition):
288 1
        task_definition_payload = self._client.describe_task_definition(task_definition)
289 1
        task_definition = EcsTaskDefinition(task_definition_payload[u'taskDefinition'])
290 1
        return task_definition
291
292 1
    def update_task_definition(self, task_definition):
293 1
        response = self._client.register_task_definition(
294
            task_definition.family,
295
            task_definition.containers,
296
            task_definition.volumes,
297
            task_definition.role_arn
298
        )
299 1
        new_task_definition = EcsTaskDefinition(response[u'taskDefinition'])
300 1
        self._client.deregister_task_definition(task_definition.arn)
301 1
        return new_task_definition
302
303 1
    def update_service(self, service):
304 1
        response = self._client.update_service(service.cluster, service.name, service.desired_count,
305
                                               service.task_definition)
306 1
        return EcsService(self._cluster_name, response[u'service'])
307
308 1
    def is_deployed(self, service):
309 1
        if len(service[u'deployments']) != 1:
310 1
            return False
311 1
        running_tasks = self._client.list_tasks(service.cluster, service.name)
312 1
        if not running_tasks[u'taskArns']:
313 1
            return service.desired_count == 0
314 1
        return service.desired_count == self.get_running_tasks_count(service, running_tasks[u'taskArns'])
315
316 1
    def get_running_tasks_count(self, service, task_arns):
317 1
        running_count = 0
318 1
        tasks_details = self._client.describe_tasks(self._cluster_name, task_arns)
319 1
        for task in tasks_details[u'tasks']:
320 1
            if task[u'taskDefinitionArn'] == service.task_definition and task[u'lastStatus'] == u'RUNNING':
321 1
                running_count += 1
322 1
        return running_count
323
324 1
    @property
325
    def client(self):
326 1
        return self._client
327
328 1
    @property
329
    def service(self):
330 1
        return self._service
331
332 1
    @property
333
    def cluster_name(self):
334 1
        return self._cluster_name
335
336 1
    @property
337
    def service_name(self):
338 1
        return self._service_name
339
340
341 1
class DeployAction(EcsAction):
342 1
    def deploy(self, task_definition):
343 1
        self._service.set_task_definition(task_definition)
344 1
        return self.update_service(self._service)
345
346
347 1
class ScaleAction(EcsAction):
348 1
    def scale(self, desired_count):
349 1
        self._service.set_desired_count(desired_count)
350 1
        return self.update_service(self._service)
351
352
353 1
class RunAction(EcsAction):
354 1
    def __init__(self, client, cluster_name):
355 1
        self._client = client
356 1
        self._cluster_name = cluster_name
357 1
        self.started_tasks = []
358
359 1
    def run(self, task_definition, count, started_by):
360 1
        result = self._client.run_task(
361
            cluster=self._cluster_name,
362
            task_definition=task_definition.family_revision,
363
            count=count,
364
            started_by=started_by,
365
            overrides=dict(containerOverrides=task_definition.get_overrides())
366
        )
367 1
        self.started_tasks = result['tasks']
368 1
        return True
369
370
371 1
class EcsError(Exception):
372 1
    pass
373
374
375 1
class ConnectionError(EcsError):
376 1
    pass
377
378
379 1
class UnknownContainerError(EcsError):
380
    pass
381