GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

PurgeESCommand   A
last analyzed

Complexity

Total Complexity 19

Size/Duplication

Total Lines 119
Duplicated Lines 20.17 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 24
loc 119
rs 10
wmc 19

11 Methods

Rating   Name   Duplication   Size   Complexity  
A take_action() 0 11 4
A _purge_index() 0 12 3
A _perform_index_purge() 0 18 2
A _logging() 0 2 1
A purge_geocomplete_index() 0 3 1
A __init__() 0 3 1
A purge_jobs_index() 0 4 1
B get_parser() 24 24 1
A _reset_sync() 0 12 3
A _perform_sync_reset() 0 12 1
A purge_companies_index() 0 4 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
import logging
3
4
import elasticsearch.exceptions
5
import elasticsearch_dsl
6
7
from pyjobsweb import model
8
from pyjobsweb.commands import AppContextCommand
9
from pyjobsweb.lib.lock import acquire_inter_process_lock
10
11
12
class PurgeESCommand(AppContextCommand):
13
    """
14
    Purge Elasticsearch indixes by dropping and re-creating them
15
    """
16
    def __init__(self, *args, **kwargs):
17
        super(PurgeESCommand, self).__init__(args, kwargs)
18
        self._logger = logging.getLogger(__name__)
19
20 View Code Duplication
    def get_parser(self, prog_name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
21
        parser = super(PurgeESCommand, self).get_parser(prog_name)
22
23
        jobs_help_msg = 'purges the jobs index of the elasticsearch database'
24
        parser.add_argument('-j', '--jobs',
25
                            help=jobs_help_msg,
26
                            dest='purge_jobs_index',
27
                            action='store_const', const=True)
28
29
        companies_help_msg = 'purges the companies index of the elasticsearch' \
30
                             'database'
31
        parser.add_argument('-co', '--companies',
32
                            help=companies_help_msg,
33
                            dest='purge_companies_index',
34
                            action='store_const', const=True)
35
36
        geocomplete_help_msg = \
37
            'purges the geocomplete index of the elasticsearch database'
38
        parser.add_argument('-g', '--geocomplete',
39
                            help=geocomplete_help_msg,
40
                            dest='purge_geocomplete_index',
41
                            action='store_const', const=True)
42
43
        return parser
44
45
    def _logging(self, logging_level, message):
46
        self._logger.log(logging_level, message)
47
48
    def _perform_index_purge(self, index_name, index_settings, doc_type_class):
49
        log_msg = 'Dropping %s index.' % index_name
50
        self._logging(logging.INFO, log_msg)
51
52
        index = elasticsearch_dsl.Index(index_name)
53
        index.settings(**index_settings)
54
        index.doc_type(doc_type_class)
55
56
        try:
57
            index.delete(ignore=404)
58
            index.create()
59
        except elasticsearch.exceptions.ElasticsearchException as e:
60
            log_msg = 'Error while dropping %s index: %s.' % (index_name, e)
61
            self._logging(logging.ERROR, log_msg)
62
            return
63
64
        log_msg = 'Index %s has been dropped successfully.' % index_name
65
        self._logging(logging.INFO, log_msg)
66
67
    def _purge_index(self, index_name, index_settings, doc_type_class):
68
        log_msg = 'Purging index %s.' % index_name
69
        self._logging(logging.INFO, log_msg)
70
71
        with acquire_inter_process_lock('purge_%s' % index_name) as acquired:
72
            if not acquired:
73
                err_msg = 'Another process is already purging the %s ' \
74
                          'index, aborting now.' % index_name
75
                self._logging(logging.WARNING, err_msg)
76
            else:
77
                self._perform_index_purge(index_name,
78
                                          index_settings, doc_type_class)
79
80
    def _perform_sync_reset(self, sqlalchemy_table_class):
81
        # Update the Postgresql database
82
        table_name = sqlalchemy_table_class.__tablename__
83
84
        log_msg = 'Resetting Postgresql %s table sync data.' % table_name
85
        self._logging(logging.INFO, log_msg)
86
87
        sqlalchemy_table_class.reset_last_sync()
88
89
        log_msg = 'Postgresql %s table sync data successfully reset.' \
90
                  % table_name
91
        self._logging(logging.INFO, log_msg)
92
93
    def _reset_sync(self, index_name, sqlalchemy_table_class):
94
        err_msg = 'Resetting synchronization data for index %s.' % index_name
95
        self._logging(logging.WARNING, err_msg)
96
97
        with acquire_inter_process_lock('purge_%s' % index_name) as acquired:
98
            if not acquired:
99
                err_msg = 'Another process is already resetting the %s ' \
100
                          'index synchronization data, aborting now.' \
101
                          % index_name
102
                self._logging(logging.WARNING, err_msg)
103
            else:
104
                self._perform_sync_reset(sqlalchemy_table_class)
105
106
    def purge_jobs_index(self):
107
        index_name = model.JobElastic().index
108
        self._purge_index(index_name, dict(), model.JobElastic)
109
        self._reset_sync(index_name, model.JobAlchemy)
110
111
    def purge_companies_index(self):
112
        index_name = model.CompanyElastic().index
113
        self._purge_index(index_name, dict(), model.CompanyElastic)
114
        self._reset_sync(index_name, model.CompanyAlchemy)
115
116
    def purge_geocomplete_index(self):
117
        index_name = model.Geocomplete().index
118
        self._purge_index(index_name, dict(), model.Geocomplete)
119
120
    def take_action(self, parsed_args):
121
        super(PurgeESCommand, self).take_action(parsed_args)
122
123
        if parsed_args.purge_jobs_index:
124
            self.purge_jobs_index()
125
126
        if parsed_args.purge_companies_index:
127
            self.purge_companies_index()
128
129
        if parsed_args.purge_geocomplete_index:
130
            self.purge_geocomplete_index()