1
|
|
|
# pylint: disable=no-member |
2
|
|
|
|
3
|
|
|
from items_selector import ItemsSelector |
4
|
|
|
from utils import compact_dict, get_client, chunk_index_list |
5
|
|
|
from easydict import EasyDict |
6
|
|
|
from collections import defaultdict |
7
|
|
|
import curator.api as api |
8
|
|
|
import logging |
9
|
|
|
import sys |
10
|
|
|
|
11
|
|
|
logger = logging.getLogger(__name__) |
12
|
|
|
|
13
|
|
|
|
14
|
|
|
class CuratorInvoke(object): |
15
|
|
|
# Supported curator commands for indices and snapshots. |
16
|
|
|
SUPPORTS = { |
17
|
|
|
'snapshots': ['delete'], |
18
|
|
|
'indices': [ |
19
|
|
|
'alias', 'allocation', 'bloom', 'close', 'delete', |
20
|
|
|
'open', 'optimize', 'replicas', 'snapshot' |
21
|
|
|
] |
22
|
|
|
} |
23
|
|
|
|
24
|
|
|
def __init__(self, **opts): |
25
|
|
|
self.opts = EasyDict(opts) |
26
|
|
|
self._client = None |
27
|
|
|
self._iselector = None |
28
|
|
|
|
29
|
|
|
@property |
30
|
|
|
def client(self): |
31
|
|
|
if not self._client: |
32
|
|
|
o = self.opts |
33
|
|
|
self._client = get_client(**({ |
34
|
|
|
'host': o.host, 'port': o.port, 'url_prefix': o.url_prefix, |
35
|
|
|
'http_auth': o.http_auth, 'use_ssl': o.use_ssl, |
36
|
|
|
'master_only': o.master_only, 'timeout': o.timeout |
37
|
|
|
})) |
38
|
|
|
return self._client |
39
|
|
|
|
40
|
|
|
@property |
41
|
|
|
def iselector(self): |
42
|
|
|
""" |
43
|
|
|
Used to fetch indices/snapshots and apply filter to them. |
44
|
|
|
""" |
45
|
|
|
if not self._iselector: |
46
|
|
|
self._iselector = ItemsSelector(self.client, **self.opts) |
47
|
|
|
return self._iselector |
48
|
|
|
|
49
|
|
|
def _enhanced_working_list(self, command, act_on): |
50
|
|
|
"""Enhance working_list by pruning kibana indices and filtering |
51
|
|
|
disk space. Returns filter working list. |
52
|
|
|
:rtype: list |
53
|
|
|
""" |
54
|
|
|
working_list = self.iselector.fetch(act_on=act_on) |
55
|
|
|
|
56
|
|
|
# Protect against accidental delete |
57
|
|
|
if command == 'delete': |
58
|
|
|
logger.info("Pruning Kibana-related indices to prevent accidental deletion.") |
59
|
|
|
working_list = api.utils.prune_kibana(working_list) |
60
|
|
|
|
61
|
|
|
# If filter by disk space, filter the working_list by space: |
62
|
|
|
if working_list and command == 'delete': |
63
|
|
|
if self.opts.disk_space: |
64
|
|
|
working_list = api.filter.filter_by_space(self.client, working_list, |
65
|
|
|
disk_space=float(self.opts.disk_space), |
66
|
|
|
reverse=(self.opts.reverse or True)) |
67
|
|
|
|
68
|
|
|
if not working_list: |
69
|
|
|
logger.error('No %s matched provided args: %s', act_on, self.opts) |
70
|
|
|
print "ERROR. No {} found in Elasticsearch.".format(act_on) |
71
|
|
|
sys.exit(99) |
72
|
|
|
|
73
|
|
|
return working_list |
74
|
|
|
|
75
|
|
|
def fetch(self, act_on, on_nofilters_showall=False): |
76
|
|
|
""" |
77
|
|
|
Forwarder method to indices/snapshots selector. |
78
|
|
|
""" |
79
|
|
|
return self.iselector.fetch(act_on=act_on, on_nofilters_showall=on_nofilters_showall) |
80
|
|
|
|
81
|
|
|
def command_kwargs(self, command): |
82
|
|
|
""" |
83
|
|
|
Return kwargs dict for a specific command options or return empty dict. |
84
|
|
|
""" |
85
|
|
|
opts = defaultdict(lambda: None, self.opts) |
86
|
|
|
kwargs = { |
87
|
|
|
'alias': {'alias': opts['name'], 'remove': opts['remove']}, |
88
|
|
|
'allocation': {'rule': opts['rule']}, |
89
|
|
|
'bloom': {'delay': opts['delay']}, |
90
|
|
|
'replicas': {'replicas': opts['count']}, |
91
|
|
|
'optimize': { |
92
|
|
|
'max_num_segments': opts['max_num_segments'], |
93
|
|
|
'request_timeout': opts['timeout'], |
94
|
|
|
'delay': opts['delay'] |
95
|
|
|
}, |
96
|
|
|
'snapshot': { |
97
|
|
|
'name': opts['name'], 'prefix': opts['snapshot_prefix'], |
98
|
|
|
'repository': opts['repository'], 'partial': opts['partial'], |
99
|
|
|
'ignore_unavailable': opts['ignore_unavailable'], |
100
|
|
|
'include_global_state': opts['include_global_state'], |
101
|
|
|
'wait_for_completion': opts['wait_for_completion'], |
102
|
|
|
'request_timeout': opts['timeout'] |
103
|
|
|
} |
104
|
|
|
}.get(command, {}) |
105
|
|
|
return compact_dict(kwargs) |
106
|
|
|
|
107
|
|
|
def _call_api(self, method, *args, **kwargs): |
108
|
|
|
"""Invoke curator api method call. |
109
|
|
|
""" |
110
|
|
|
api_method = api.__dict__.get(method) |
111
|
|
|
|
112
|
|
|
logger.debug("Perfoming api call %s with args: %s, kwargs: %s", |
113
|
|
|
method, args, kwargs) |
114
|
|
|
return api_method(self.client, *args, **kwargs) |
115
|
|
|
|
116
|
|
|
def command_on_indices(self, command, working_list): |
117
|
|
|
"""Invoke command which acts on indices and perform an api call. |
118
|
|
|
""" |
119
|
|
|
kwargs = self.command_kwargs(command) |
120
|
|
|
method = 'open_indices' if command == 'open' else command |
121
|
|
|
|
122
|
|
|
# List is too big and it will be proceeded in chunks. |
123
|
|
|
if len(api.utils.to_csv(working_list)) > 3072: |
124
|
|
|
logger.warn('Very large list of indices. Breaking it up into smaller chunks.') |
125
|
|
|
success = True |
126
|
|
|
for indices in chunk_index_list(working_list): |
127
|
|
|
if not self._call_api(method, indices, **kwargs): |
128
|
|
|
success = False |
129
|
|
|
return success |
130
|
|
|
else: |
131
|
|
|
return self._call_api(method, working_list, **kwargs) |
132
|
|
|
|
133
|
|
|
def command_on_snapshots(self, command, working_list): |
134
|
|
|
"""Invoke command which acts on snapshots and perform an api call. |
135
|
|
|
""" |
136
|
|
|
if command == 'snapshot': |
137
|
|
|
method = 'create_snapshot' |
138
|
|
|
kwargs = self.command_kwargs(command) |
139
|
|
|
# The snapshot command should get the full (not chunked) |
140
|
|
|
# list of indices. |
141
|
|
|
kwargs['indices'] = working_list |
142
|
|
|
return self._call_api(method, **kwargs) |
143
|
|
|
|
144
|
|
|
elif command == 'delete': |
145
|
|
|
method = 'delete_snapshot' |
146
|
|
|
success = True |
147
|
|
|
for s in working_list: |
148
|
|
|
if not self._call_api(method, repository=self.opts.repository, |
149
|
|
|
snapshot=s): |
150
|
|
|
success = False |
151
|
|
|
return success |
152
|
|
|
else: |
153
|
|
|
# should never get here |
154
|
|
|
raise RuntimeError("Unexpected method `{}.{}'".format('snapshots', command)) |
155
|
|
|
|
156
|
|
|
def invoke(self, command=None, act_on=None): |
157
|
|
|
"""Invoke command through translating it curator api call. |
158
|
|
|
""" |
159
|
|
|
if command not in self.SUPPORTS[act_on]: |
160
|
|
|
raise ValueError("Unsupported curator command: {} {}".format(command, act_on)) |
161
|
|
|
if act_on is None: |
162
|
|
|
raise ValueError("Requires act_on either on `indices' or `snapshots'") |
163
|
|
|
|
164
|
|
|
working_list = self._enhanced_working_list(command, act_on) |
165
|
|
|
|
166
|
|
|
if act_on == 'indices' and command != 'snapshot': |
167
|
|
|
return self.command_on_indices(command, working_list) |
168
|
|
|
else: |
169
|
|
|
# Command on snapshots and snapshot command (which |
170
|
|
|
# actually has selected indices before). |
171
|
|
|
return self.command_on_snapshots(command, working_list) |
172
|
|
|
|