Completed
Branch master (98e9e8)
by Fabian
08:43
created

EcsAction   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 65
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 18
dl 0
loc 65
ccs 48
cts 48
cp 1
c 1
b 0
f 0
rs 10

11 Methods

Rating   Name   Duplication   Size   Complexity  
A update_service() 0 4 1
A __init__() 0 13 4
A service_name() 0 3 1
A get_current_task_definition() 0 4 1
A service() 0 3 1
A client() 0 3 1
A get_service() 0 3 1
A update_task_definition() 0 6 1
A get_running_tasks_count() 0 7 4
A cluster_name() 0 3 1
A is_deployed() 0 5 2
1 1
from datetime import datetime
2
3 1
import boto3
4 1
from botocore.exceptions import ClientError, NoCredentialsError
5
6
7 1
class EcsClient(object):
8 1
    def __init__(self, access_key_id=None, secret_access_key=None, region=None, profile=None):
9 1
        session = boto3.session.Session(aws_access_key_id=access_key_id,
10
                                        aws_secret_access_key=secret_access_key,
11
                                        region_name=region,
12
                                        profile_name=profile)
13 1
        self.boto = session.client(u'ecs')
14
15 1
    def describe_services(self, cluster_name, service_name):
16 1
        return self.boto.describe_services(cluster=cluster_name, services=[service_name])
17
18 1
    def describe_task_definition(self, task_definition_arn):
19 1
        return self.boto.describe_task_definition(taskDefinition=task_definition_arn)
20
21 1
    def list_tasks(self, cluster_name, service_name):
22 1
        return self.boto.list_tasks(cluster=cluster_name, serviceName=service_name)
23
24 1
    def describe_tasks(self, cluster_name, task_arns):
25 1
        return self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns)
26
27 1
    def register_task_definition(self, family, containers, volumes):
28 1
        return self.boto.register_task_definition(family=family, containerDefinitions=containers, volumes=volumes)
29
30 1
    def deregister_task_definition(self, task_definition_arn):
31 1
        return self.boto.deregister_task_definition(taskDefinition=task_definition_arn)
32
33 1
    def update_service(self, cluster, service, desired_count, task_definition):
34 1
        return self.boto.update_service(
35
            cluster=cluster,
36
            service=service,
37
            desiredCount=desired_count,
38
            taskDefinition=task_definition
39
        )
40
41
42 1
class EcsService(dict):
43 1
    def __init__(self, cluster, iterable=None, **kwargs):
44 1
        self._cluster = cluster
45 1
        super(EcsService, self).__init__(iterable, **kwargs)
46
47 1
    def set_desired_count(self, desired_count):
48 1
        self[u'desiredCount'] = desired_count
49
50 1
    def set_task_definition(self, task_definition):
51 1
        self[u'taskDefinition'] = task_definition.arn
52
53 1
    @property
54
    def cluster(self):
55 1
        return self._cluster
56
57 1
    @property
58
    def name(self):
59 1
        return self.get(u'serviceName')
60
61 1
    @property
62
    def task_definition(self):
63 1
        return self.get(u'taskDefinition')
64
65 1
    @property
66
    def desired_count(self):
67 1
        return self.get(u'desiredCount')
68
69 1
    @property
70
    def deployment_created_at(self):
71 1
        for deployment in self.get(u'deployments'):
72 1
            if deployment.get(u'status') == u'PRIMARY':
73 1
                return deployment.get(u'createdAt')
74 1
        return datetime.now()
75
76 1
    @property
77
    def deployment_updated_at(self):
78 1
        for deployment in self.get(u'deployments'):
79 1
            if deployment.get(u'status') == u'PRIMARY':
80 1
                return deployment.get(u'updatedAt')
81 1
        return datetime.now()
82
83 1
    @property
84
    def errors(self):
85 1
        errors = {}
86 1
        for event in self.get('events'):
87 1
            if u'unable' in event[u'message'] and event[u'createdAt'] >= self.deployment_updated_at:
88 1
                errors[event[u'createdAt'].isoformat()] = 'ERROR: %s' % event[u'message']
89 1
        return errors
90
91 1
    @property
92
    def older_errors(self):
93 1
        errors = {}
94 1
        for event in self.get('events'):
95 1
            if u'unable' in event[u'message'] and \
96
                                    self.deployment_created_at <= event[u'createdAt'] <= self.deployment_updated_at:
97 1
                errors[event[u'createdAt'].isoformat()] = 'ERROR: %s' % event[u'message']
98 1
        return errors
99
100
101 1
class EcsTaskDefinition(dict):
102 1
    def __init__(self, iterable=None, **kwargs):
103 1
        super(EcsTaskDefinition, self).__init__(iterable, **kwargs)
104 1
        self._diff = []
105
106 1
    @property
107
    def containers(self):
108 1
        return self.get(u'containerDefinitions')
109
110 1
    @property
111
    def container_names(self):
112 1
        for container in self.get(u'containerDefinitions'):
113 1
            yield container[u'name']
114
115 1
    @property
116
    def volumes(self):
117 1
        return self.get(u'volumes')
118
119 1
    @property
120
    def arn(self):
121 1
        return self.get(u'taskDefinitionArn')
122
123 1
    @property
124
    def family(self):
125 1
        return self.get(u'family')
126
127 1
    @property
128
    def revision(self):
129 1
        return self.get(u'revision')
130
131 1
    @property
132
    def diff(self):
133 1
        return self._diff
134
135 1
    def set_images(self, tag=None, **images):
136 1
        self.validate_image_options(**images)
137 1
        for container in self.containers:
138 1
            if container[u'name'] in images:
139 1
                new_image = images[container[u'name']]
140 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'image', new_image, container[u'image'])
141 1
                self._diff.append(diff)
142 1
                container[u'image'] = new_image
143 1
            elif tag:
144 1
                image_definition = container[u'image'].rsplit(u':', 1)
145 1
                new_image = u'%s:%s' % (image_definition[0], tag.strip())
146 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'image', new_image, container[u'image'])
147 1
                self._diff.append(diff)
148 1
                container[u'image'] = new_image
149
150 1
    def set_commands(self, **commands):
151 1
        self.validate_image_options(**commands)
152 1
        for container in self.containers:
153 1
            if container[u'name'] in commands:
154 1
                new_command = commands[container[u'name']]
155 1
                diff = EcsTaskDefinitionDiff(container[u'name'], u'command', new_command, container.get(u'command'))
156 1
                self._diff.append(diff)
157 1
                container[u'command'] = [new_command]
158
159 1
    def validate_image_options(self, **container_options):
160 1
        for container_name in container_options:
161 1
            if container_name not in self.container_names:
162 1
                raise UnknownContainerError(u'Unknown container: %s' % container_name)
163
164
165 1
class EcsTaskDefinitionDiff(object):
166 1
    def __init__(self, container, field, value, old_value):
167 1
        self.container = container
168 1
        self.field = field
169 1
        self.value = value
170 1
        self.old_value = old_value
171
172 1
    def __repr__(self):
173 1
        return u"Changed %s of container '%s' to: %s (was: %s)" % \
174
               (self.field, self.container, self.value, self.old_value)
175
176
177 1
class EcsAction(object):
178 1
    def __init__(self, client, cluster_name, service_name):
179 1
        self._client = client
180 1
        self._cluster_name = cluster_name
181 1
        self._service_name = service_name
182
183 1
        try:
184 1
            self._service = self.get_service()
185 1
        except IndexError:
186 1
            raise ConnectionError(u'An error occurred when calling the DescribeServices operation: Service not found.')
187 1
        except ClientError as e:
188 1
            raise ConnectionError(str(e))
189 1
        except NoCredentialsError:
190 1
            raise ConnectionError(u'Unable to locate credentials. Configure credentials by running "aws configure".')
191
192 1
    def get_service(self):
193 1
        services_definition = self._client.describe_services(self._cluster_name, self._service_name)
194 1
        return EcsService(self._cluster_name, services_definition[u'services'][0])
195
196 1
    def get_current_task_definition(self, service):
197 1
        task_definition_payload = self._client.describe_task_definition(service.task_definition)
198 1
        task_definition = EcsTaskDefinition(task_definition_payload[u'taskDefinition'])
199 1
        return task_definition
200
201 1
    def update_task_definition(self, task_definition):
202 1
        response = self._client.register_task_definition(task_definition.family, task_definition.containers,
203
                                                         task_definition.volumes)
204 1
        new_task_definition = EcsTaskDefinition(response[u'taskDefinition'])
205 1
        self._client.deregister_task_definition(task_definition.arn)
206 1
        return new_task_definition
207
208 1
    def update_service(self, service):
209 1
        response = self._client.update_service(service.cluster, service.name, service.desired_count,
210
                                               service.task_definition)
211 1
        return EcsService(self._cluster_name, response[u'service'])
212
213 1
    def is_deployed(self, service):
214 1
        running_tasks = self._client.list_tasks(service.cluster, service.name)
215 1
        if not running_tasks[u'taskArns']:
216 1
            return service.desired_count == 0
217 1
        return service.desired_count == self.get_running_tasks_count(service, running_tasks[u'taskArns'])
218
219 1
    def get_running_tasks_count(self, service, task_arns):
220 1
        running_count = 0
221 1
        tasks_details = self._client.describe_tasks(self._cluster_name, task_arns)
222 1
        for task in tasks_details[u'tasks']:
223 1
            if task[u'taskDefinitionArn'] == service.task_definition and task[u'lastStatus'] == u'RUNNING':
224 1
                running_count += 1
225 1
        return running_count
226
227 1
    @property
228
    def client(self):
229 1
        return self._client
230
231 1
    @property
232
    def service(self):
233 1
        return self._service
234
235 1
    @property
236
    def cluster_name(self):
237 1
        return self._cluster_name
238
239 1
    @property
240
    def service_name(self):
241 1
        return self._service_name
242
243
244 1
class DeployAction(EcsAction):
245 1
    def deploy(self, task_definition):
246 1
        self._service.set_task_definition(task_definition)
247 1
        return self.update_service(self._service)
248
249
250 1
class ScaleAction(EcsAction):
251 1
    def scale(self, desired_count):
252 1
        self._service.set_desired_count(desired_count)
253 1
        return self.update_service(self._service)
254
255
256 1
class EcsError(Exception):
257 1
    pass
258
259
260 1
class ConnectionError(EcsError):
261 1
    pass
262
263
264 1
class UnknownContainerError(EcsError):
265
    pass
266