Completed
Pull Request — master (#358)
by
unknown
02:09
created

CuratorInvoke   A

Complexity

Total Complexity 30

Size/Duplication

Total Lines 158
Duplicated Lines 0 %
Metric Value
dl 0
loc 158
rs 10
wmc 30

10 Methods

Rating   Name   Duplication   Size   Complexity  
A _call_api() 0 8 1
B _enhanced_working_list() 0 25 6
B command_kwargs() 0 25 2
A __init__() 0 4 1
B command_on_snapshots() 0 22 5
B invoke() 0 16 5
A client() 0 10 2
A iselector() 0 8 2
A fetch() 0 5 1
B command_on_indices() 0 16 5
1
# pylint: disable=no-member
2
3
from items_selector import ItemsSelector
4
from utils import compact_dict, get_client, chunk_index_list
5
from easydict import EasyDict
6
from collections import defaultdict
7
import curator.api as api
8
import logging
9
import sys
10
11
logger = logging.getLogger(__name__)
12
13
14
class CuratorInvoke(object):
15
    # Supported curator commands for indices and snapshots.
16
    SUPPORTS = {
17
        'snapshots': ['delete'],
18
        'indices': [
19
            'alias', 'allocation', 'bloom', 'close', 'delete',
20
            'open', 'optimize', 'replicas', 'snapshot'
21
        ]
22
    }
23
24
    def __init__(self, **opts):
25
        self.opts = EasyDict(opts)
26
        self._client = None
27
        self._iselector = None
28
29
    @property
30
    def client(self):
31
        if not self._client:
32
            o = self.opts
33
            self._client = get_client(**({
34
                'host': o.host, 'port': o.port, 'url_prefix': o.url_prefix,
35
                'http_auth': o.http_auth, 'use_ssl': o.use_ssl,
36
                'master_only': o.master_only, 'timeout': o.timeout
37
            }))
38
        return self._client
39
40
    @property
41
    def iselector(self):
42
        """
43
        Used to fetch indices/snapshots and apply filter to them.
44
        """
45
        if not self._iselector:
46
            self._iselector = ItemsSelector(self.client, **self.opts)
47
        return self._iselector
48
49
    def _enhanced_working_list(self, command, act_on):
50
        """Enhance working_list by pruning kibana indices and filtering
51
        disk space. Returns filter working list.
52
        :rtype: list
53
        """
54
        working_list = self.iselector.fetch(act_on=act_on)
55
56
        # Protect against accidental delete
57
        if command == 'delete':
58
            logger.info("Pruning Kibana-related indices to prevent accidental deletion.")
59
            working_list = api.utils.prune_kibana(working_list)
60
61
        # If filter by disk space, filter the working_list by space:
62
        if working_list and command == 'delete':
63
            if self.opts.disk_space:
64
                working_list = api.filter.filter_by_space(self.client, working_list,
65
                                                          disk_space=float(self.opts.disk_space),
66
                                                          reverse=(self.opts.reverse or True))
67
68
        if not working_list:
69
            logger.error('No %s matched provided args: %s', act_on, self.opts)
70
            print "ERROR. No {} found in Elasticsearch.".format(act_on)
71
            sys.exit(99)
72
73
        return working_list
74
75
    def fetch(self, act_on, on_nofilters_showall=False):
76
        """
77
        Forwarder method to indices/snapshots selector.
78
        """
79
        return self.iselector.fetch(act_on=act_on, on_nofilters_showall=on_nofilters_showall)
80
81
    def command_kwargs(self, command):
82
        """
83
        Return kwargs dict for a specific command options or return empty dict.
84
        """
85
        opts = defaultdict(lambda: None, self.opts)
86
        kwargs = {
87
            'alias': {'alias': opts['name'], 'remove': opts['remove']},
88
            'allocation': {'rule': opts['rule']},
89
            'bloom': {'delay': opts['delay']},
90
            'replicas': {'replicas': opts['count']},
91
            'optimize': {
92
                'max_num_segments': opts['max_num_segments'],
93
                'request_timeout': opts['timeout'],
94
                'delay': opts['delay']
95
            },
96
            'snapshot': {
97
                'name': opts['name'], 'prefix': opts['snapshot_prefix'],
98
                'repository': opts['repository'], 'partial': opts['partial'],
99
                'ignore_unavailable': opts['ignore_unavailable'],
100
                'include_global_state': opts['include_global_state'],
101
                'wait_for_completion': opts['wait_for_completion'],
102
                'request_timeout': opts['timeout']
103
            }
104
        }.get(command, {})
105
        return compact_dict(kwargs)
106
107
    def _call_api(self, method, *args, **kwargs):
108
        """Invoke curator api method call.
109
        """
110
        api_method = api.__dict__.get(method)
111
112
        logger.debug("Perfoming api call %s with args: %s, kwargs: %s",
113
                     method, args, kwargs)
114
        return api_method(self.client, *args, **kwargs)
115
116
    def command_on_indices(self, command, working_list):
117
        """Invoke command which acts on indices and perform an api call.
118
        """
119
        kwargs = self.command_kwargs(command)
120
        method = 'open_indices' if command == 'open' else command
121
122
        # List is too big and it will be proceeded in chunks.
123
        if len(api.utils.to_csv(working_list)) > 3072:
124
            logger.warn('Very large list of indices.  Breaking it up into smaller chunks.')
125
            success = True
126
            for indices in chunk_index_list(working_list):
127
                if not self._call_api(method, indices, **kwargs):
128
                    success = False
129
            return success
130
        else:
131
            return self._call_api(method, working_list, **kwargs)
132
133
    def command_on_snapshots(self, command, working_list):
134
        """Invoke command which acts on snapshots and perform an api call.
135
        """
136
        if command == 'snapshot':
137
            method = 'create_snapshot'
138
            kwargs = self.command_kwargs(command)
139
            # The snapshot command should get the full (not chunked)
140
            # list of indices.
141
            kwargs['indices'] = working_list
142
            return self._call_api(method, **kwargs)
143
144
        elif command == 'delete':
145
            method = 'delete_snapshot'
146
            success = True
147
            for s in working_list:
148
                if not self._call_api(method, repository=self.opts.repository,
149
                                      snapshot=s):
150
                    success = False
151
            return success
152
        else:
153
            # should never get here
154
            raise RuntimeError("Unexpected method `{}.{}'".format('snapshots', command))
155
156
    def invoke(self, command=None, act_on=None):
157
        """Invoke command through translating it curator api call.
158
        """
159
        if command not in self.SUPPORTS[act_on]:
160
            raise ValueError("Unsupported curator command: {} {}".format(command, act_on))
161
        if act_on is None:
162
            raise ValueError("Requires act_on either on `indices' or `snapshots'")
163
164
        working_list = self._enhanced_working_list(command, act_on)
165
166
        if act_on == 'indices' and command != 'snapshot':
167
            return self.command_on_indices(command, working_list)
168
        else:
169
            # Command on snapshots and snapshot command (which
170
            # actually has selected indices before).
171
            return self.command_on_snapshots(command, working_list)
172