Completed
Push — master ( 62a151...74c045 )
by
unknown
53s
created

check_dangling_files()   B

Complexity

Conditions 5

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 14
rs 8.5454
1
from itertools import chain
2
import operator
3
import time
4
import logging
5
6
from django.apps import apps
7
from django.utils import timezone
8
from django.conf import settings
9
from django.db.models import Count
10
from django.core.cache import cache
11
from django.template import defaultfilters as filters
12
13
import boto
14
from boto.s3.key import Key
15
from raven import Client
16
17
18
from omaha.models import Version as OmahaVersion
19
from omaha.utils import valuedispatch
20
from sparkle.models import SparkleVersion
21
from crash.models import Crash, Symbols
22
from feedback.models import Feedback
23
24
from dynamic_preferences_registry import global_preferences_manager as gpm
25
26
dsn = getattr(settings, 'RAVEN_CONFIG', None)
27
if dsn:
28
    dsn = dsn['dsn']
29
raven = Client(dsn, name=getattr(settings, 'HOST_NAME', None), release=getattr(settings, 'APP_VERSION', None))
30
31
@valuedispatch
32
def bulk_delete(cls, qs):
33
    raise NotImplementedError
34
35
36
@bulk_delete.register(Crash)
37
def _(cls, qs):
38
    if settings.DEFAULT_FILE_STORAGE == 'omaha_server.s3utils.S3Storage':
39
        qs = s3_bulk_delete(qs, file_fields=['archive', 'upload_file_minidump'],
40
                            s3_fields=['minidump_archive', 'minidump'])
41
42
    result = dict()
43
    result['count'] = qs.count()
44
    result['size'] = qs.get_size()
45
    elements = list(qs.values_list('id', 'created', 'signature', 'userid', 'appid'))
46
    result['elements'] = map(lambda x: dict(id=x[0], element_created=x[1].strftime("%d. %B %Y %I:%M%p"), signature=x[2],
47
                                            userid=x[3], appid=x[4]), elements)
48
    qs.delete()
49
    return result
50
51
52
@bulk_delete.register(Feedback)
53
def _(cls, qs):
54
    if settings.DEFAULT_FILE_STORAGE == 'storages.backends.s3boto.S3BotoStorage':
55
        qs = s3_bulk_delete(qs, file_fields=['attached_file', 'blackbox', 'screenshot', 'system_logs'],
56
                            s3_fields=['feedback_attach', 'blackbox', 'screenshot', 'system_logs'])
57
58
    result = dict()
59
    result['count'] = qs.count()
60
    result['size'] = qs.get_size()
61
    elements = list(qs.values_list('id', 'created'))
62
    result['elements'] = map(lambda x: dict(id=x[0], element_created=x[1].strftime("%d. %B %Y %I:%M%p")), elements)
63 View Code Duplication
    qs.delete()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
64
    return result
65
66
67
@bulk_delete.register(Symbols)
68
def _(cls, qs):
69
    if settings.DEFAULT_FILE_STORAGE == 'storages.backends.s3boto.S3BotoStorage':
70
        qs = s3_bulk_delete(qs, file_fields=['file'], s3_fields=['symbols'])
71
72
    result = dict()
73
    result['count'] = qs.count()
74
    result['size'] = qs.get_size()
75
    elements = list(qs.values_list('id', 'created'))
76
    result['elements'] = map(lambda x: dict(id=x[0], element_created=x[1].strftime("%d. %B %Y %I:%M%p")), elements)
77
    qs.delete()
78
    return result
79
80
81
@bulk_delete.register(OmahaVersion)
82
def _(cls, qs):
83
    if settings.DEFAULT_FILE_STORAGE == 'storages.backends.s3boto.S3BotoStorage':
84
        qs = s3_bulk_delete(qs, file_fields=['file'], s3_fields=['build'])
85
86
    result = dict()
87
    result['count'] = qs.count()
88
    result['size'] = qs.get_size()
89
    elements = list(qs.values_list('id', 'created'))
90
    result['elements'] = map(lambda x: dict(id=x[0], element_created=x[1].strftime("%d. %B %Y %I:%M%p")), elements)
91 View Code Duplication
    qs.delete()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
92
    return result
93
94
95
@bulk_delete.register(SparkleVersion)
96
def _(cls, qs):
97
    if settings.DEFAULT_FILE_STORAGE == 'storages.backends.s3boto.S3BotoStorage':
98
        qs = s3_bulk_delete(qs, file_fields=['file'], s3_fields=['sparkle'])
99
100
    result = dict()
101
    result['count'] = qs.count()
102
    result['size'] = qs.get_size()
103
    result['elements'] = list(qs.values_list('id', 'created'))
104
    elements = list(qs.values_list('id', 'created'))
105
    result['elements'] = map(lambda x: dict(id=x[0], element_created=x[1].strftime("%d. %B %Y %I:%M%p")), elements)
106
    qs.delete()
107
    return result
108
109
110
def s3_bulk_delete(qs, file_fields, s3_fields):
111
    conn = boto.connect_s3(settings.AWS_ACCESS_KEY_ID, settings.AWS_SECRET_ACCESS_KEY)
112
    bucket = conn.get_bucket(settings.AWS_STORAGE_BUCKET_NAME)
113
114
    file_keys = qs.values_list(*file_fields)
115
    file_keys = [key for key in chain(*file_keys) if key]
116
    bucket.delete_keys(file_keys)
117
    s3_keys = [x for x in chain(*[bucket.list(prefix="%s/" % field) for field in s3_fields])]
118
    error_keys = filter(lambda key: key in s3_keys, file_keys)
119
    if error_keys:
120
        logging.error("Files were not deleted from s3: %r" % error_keys)
121
        exclude_fields = [qs.exclude(**{"%s__in" % key: error_keys}) for key in file_fields]
122
        qs = reduce(operator.and_, exclude_fields)
123
124
    update_kwargs = dict(zip(file_fields, [None for x in file_fields]))
125
    qs.update(**update_kwargs)
126
    return qs
127
128
129
def delete_older_than(app, model_name, limit=None):
130
    if not limit:
131
        preference_key = '__'.join([model_name, 'limit_storage_days'])
132
        limit = gpm[preference_key]
133
    model = apps.get_model(app, model_name)
134
    offset = timezone.timedelta(days=limit)
135
    limit = timezone.now() - offset
136
    old_objects = model.objects.filter(created__lte=limit)
137
    result = dict()
138
    if old_objects:
139
        result = bulk_delete(model, old_objects)
140
    return result
141
142
143
def delete_duplicate_crashes(limit=None):
144
    logger = logging.getLogger('limitation')
145
    full_result = dict(count=0, size=0, elements=[])
146
    if not limit:
147
        preference_key = '__'.join(['Crash', 'duplicate_number'])
148
        limit = gpm[preference_key]
149
    duplicated = Crash.objects.values('signature').annotate(count=Count('signature'))
150
    duplicated = filter(lambda x: x['count'] > limit, duplicated)
151
    logger.info('Duplicated signatures: %r' % duplicated)
152
    for group in duplicated:
153
        qs = Crash.objects.filter(signature=group['signature']).order_by('created')
154
        dup_elements = []
155
        dup_count = qs.count()
156
        while dup_count > limit:
157
            bulk_size = dup_count - limit if dup_count - limit < 1000 else 1000
158
            bulk_ids = qs[:bulk_size].values_list('id', flat=True)
159
            bulk = qs.filter(id__in=bulk_ids)
160
            result = bulk_delete(Crash, bulk)
161
            full_result['count'] += result['count']
162
            full_result['size'] += result['size']
163
            full_result['elements'] += result['elements']
164
            dup_elements += result['elements']
165
            dup_count -= bulk_size
166
    return full_result
167
168
169
def delete_size_is_exceeded(app, model_name, limit=None):
170
    if not limit:
171
        preference_key = '__'.join([model_name, 'limit_size'])
172
        limit = gpm[preference_key] * 1024 * 1024 * 1024
173
    else:
174
        limit *= 1024*1024*1024
175
    model = apps.get_model(app, model_name)
176
    group_count = 1000
177
    full_result = dict(count=0, size=0, elements=[])
178
    objects_size = model.objects.get_size()
179
180
    while objects_size > limit:
181
        group_objects_ids = list(model.objects.order_by('created').values_list("id", flat=True)[:group_count])
182
        group_objects = model.objects.order_by('created').filter(pk__in=group_objects_ids)
183
        group_size = group_objects.get_size()
184
        diff_size = objects_size - limit
185
186
        if group_size > diff_size:
187
            group_size = 0
188
            low_border = 0
189
            for instance in group_objects:
190
                group_size += instance.size
191
                low_border += 1
192
                if group_size >= diff_size:
193
                    group_objects = model.objects.order_by('created').filter(pk__in=group_objects_ids[:low_border])
194
                    break
195
196
        result = bulk_delete(model, group_objects)
197
        objects_size -= result['size']
198
        full_result['count'] += result['count']
199
        full_result['size'] += result['size']
200
        full_result['elements'] += result['elements']
201
    return full_result
202
203
204
def monitoring_size():
205
    size = OmahaVersion.objects.get_size()
206
    if size > gpm['Version__limit_size'] * 1024 * 1024 * 1024:
207
        raven.captureMessage("[Limitation]Size limit of omaha versions is exceeded. Current size is %s [%d]" %
208
                             (filters.filesizeformat(size).replace(u'\xa0', u' '), time.time()),
209
                             data={'level': 30, 'logger': 'limitation'})
210
    cache.set('omaha_version_size', size)
211
212
    size = SparkleVersion.objects.get_size()
213
    if size > gpm['SparkleVersion__limit_size'] * 1024 * 1024 * 1024:
214
        raven.captureMessage("[Limitation]Size limit of sparkle versions is exceeded. Current size is %s [%d]" %
215
                             (filters.filesizeformat(size).replace(u'\xa0', u' '), time.time()),
216
                             data={'level': 30, 'logger': 'limitation'})
217
    cache.set('sparkle_version_size', size)
218
219
    size = Feedback.objects.get_size()
220
    if size > gpm['Feedback__limit_size'] * 1024 * 1024 * 1024:
221
        raven.captureMessage("[Limitation]Size limit of feedbacks is exceeded. Current size is %s [%d]" %
222
                             (filters.filesizeformat(size).replace(u'\xa0', u' '), time.time()),
223
                             data={'level': 30, 'logger': 'limitation'})
224
    cache.set('feedbacks_size', size)
225
226
    size = Crash.objects.get_size()
227
    if size > gpm['Crash__limit_size'] * 1024 * 1024 * 1024:
228
        raven.captureMessage("[Limitation]Size limit of crashes is exceeded. Current size is %s [%d]" %
229
                             (filters.filesizeformat(size).replace(u'\xa0', u' '), time.time()),
230
                             data={'level': 30, 'logger': 'limitation'})
231
    cache.set('crashes_size', size)
232
233
    size = Symbols.objects.get_size()
234
    if size > gpm['Symbols__limit_size'] * 1024 * 1024 * 1024:
235
        raven.captureMessage("[Limitation]Size limit of symbols is exceeded. Current size is %s [%d]" %
236
                             (filters.filesizeformat(size).replace(u'\xa0', u' '), time.time()),
237
                             data={'level': 30, 'logger': 'limitation'})
238
    cache.set('symbols_size', size)
239
240
241
def handle_dangling_files(model, prefix, file_fields):
242
    conn = boto.connect_s3(settings.AWS_ACCESS_KEY_ID, settings.AWS_SECRET_ACCESS_KEY)
243
    bucket = conn.get_bucket(settings.AWS_STORAGE_BUCKET_NAME)
244
    result = dict()
245
    dangling_files_in_db, dangling_files_in_s3 = check_dangling_files(model, prefix, file_fields, bucket)
246
    if dangling_files_in_db:
247
        # send notifications
248
        result['mark'] = 'db'
249
        result['data'] = dangling_files_in_db
250
        result['status'] = 'Send notifications'
251
        result['count'] = len(dangling_files_in_db)
252
        result['cleaned_space'] = 0
253
    elif dangling_files_in_s3:
254
        # delete files from s3
255
        result['mark'] = 's3'
256
        result['data'] = dangling_files_in_s3
257
        result['status'] = 'Delete files'
258
        result.update(delete_dangling_files_s3(bucket, result['data']))
259
    else:
260
        # not detected dangling files
261
        result['mark'] = 'Nothing'
262
        result['data'] = []
263
        result['status'] = 'Nothing'
264
        result['count'] = 0
265
        result['cleaned_space'] = 0
266
    return result
267
268
269
def check_dangling_files(model, prefix, file_fields, bucket):
270
    keys_from_s3 = list()
271
    for pref in prefix:
272
        key_from_s3 = bucket.list(prefix="%s/" % pref)
273
        keys_from_s3.append(key_from_s3)
274
    obj_from_s3 = [x for x in chain(*keys_from_s3)]
275
    urls_from_s3 = list()
276
    for obj in obj_from_s3:
277
        urls_from_s3.append(obj.key)
278
    all_objects = model.objects.all()
279
    urls = list(filter(None, [filed for filed in chain(*all_objects.values_list(*file_fields))]))
280
    dangling_files_in_db = list(set(urls) - set(urls_from_s3))
281
    dangling_files_in_s3 = list(set(urls_from_s3) - set(urls))
282
    return dangling_files_in_db, dangling_files_in_s3
283
284
285
def delete_dangling_files_s3(bucket, file_paths):
286
    result = dict()
287
    result['cleaned_space'] = 0
288
    result['count'] = 0
289
    for path in file_paths:
290
        _file = bucket.lookup(path)
291
        result['cleaned_space'] += _file.size
292
        result['count'] += 1
293
        bucket.delete_key(_file.key)
294
    return result
295