Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

utils.ignition-remediation.resolve_rule()   A

Complexity

Conditions 4

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 7
nop 1
dl 0
loc 10
ccs 0
cts 7
cp 0
crap 20
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
3
import errno
4
import os
5
import sys
6
import argparse
7
import urllib.parse
8
from string import Template
9
10
description = \
11
"""
12
Encode a given file to an ignition based remediation or decode an ignition
13
based file remediation to the resulting file.
14
15
Encoding
16
========
17
To encode a file, you need to provide a number of attributes. At the minimum,
18
you need to provide the version of the file that would be applied with
19
MachineConfigs. Then you also need to provide the path where the file would
20
be uploaded to using the MC and the file mode. Finally, either provide the
21
platforms the remediation applies to or provide the rule name. If the rule
22
name is provided, then the script reads the platforms from existing
23
remediations. The output is either written to the rule directory if a rule
24
is provided or to a file.
25
26
Examples
27
--------
28
    * Encode a remediation based on a "golden file" at /tmp/chrony.conf. The
29
      remediation will replace the file at /etc/chrony.conf with the mode 0644.
30
      The resulting MachineConfig would be automatically written to the
31
      directory where the rule chronyd_no_chronyc_network is stored. The
32
      platforms that the remediation is applied to would be read from the
33
      existing bash remediations.
34
35
      python3 utils/ignition-remediation.py encode \\
36
                                            --mode=0644 \\
37
                                            --infile=/tmp/chrony.conf \\
38
                                            --target=/etc/chrony.conf \\
39
                                            --rule=chronyd_no_chronyc_network
40
41
    * As above, but print out the resulting remediation to stdout. You could
42
      substitute stdout for any other file to have the remediation dumped
43
      there.
44
45
      python3 utils/ignition-remediation.py encode \\
46
                                            --mode=0644 \\
47
                                            --infile=/tmp/chrony.conf \\
48
                                            --target=/etc/chrony.conf \\
49
                                            --rule=chronyd_no_chronyc_network \\
50
                                            --outfile=stdout
51
52
Decoding
53
========
54
Either pass in the rule name or an absolute path to the ignition file. When
55
passing in the rule name, the script expects your current directory is at
56
the root of the checkout.
57
58
Examples
59
--------
60
    * Decode a remediation for a given rule:
61
62
      python3 utils/ignition-remediation.py decode \\
63
                                            --rule=chronyd_no_chronyc_network
64
65
    * Decode a remediation from a given file:
66
67
      python3 utils/ignition-remediation.py decode \\
68
                                            --infile=/tmp/remediation.yml
69
70
"""
71
72
mc_template = \
73
"""# platform = $platforms
74
apiVersion: machineconfiguration.openshift.io/v1
75
kind: MachineConfig
76
spec:
77
  config:
78
    ignition:
79
      version: 2.2.0
80
    storage:
81
      files:
82
      - contents:
83
          source: data:,$content
84
        filesystem: root
85
        mode: $mode
86
        path: $path
87
"""
88
89
90
def urlencoded_file(filename):
91
    with open(filename) as f:
92
        return urllib.parse.quote(''.join(f.readlines()))
93
94
95
def encode(infile, target_path, mode, platforms):
96
    content = urlencoded_file(infile)
97
    tmpl = Template(mc_template)
98
    mc = tmpl.substitute(path=target_path, mode=mode,
99
                         platforms=platforms, content=content)
100
    return mc
101
102
103
def encode_outfile(encode_outfile, rule_dir):
104
    ign_file = None
105
106
    if encode_outfile == "stdout":
107
        ign_file = sys.stdout
108
    elif encode_outfile is not None:
109
        ign_file = open(encode_outfile, "w")
110
    elif rule_dir is not None:
111
        ign_dir = os.path.join(rule_dir, "ignition")
112
        ign_path = os.path.join(ign_dir, "shared.yml")
113
        try:
114
            ign_file = open(ign_path, "w")
115
        except OSError as e:
116
            if e.errno == errno.ENOENT:
117
                os.mkdir(ign_dir)
118
                ign_file = open(ign_path, "w")
119
            else:
120
                raise
121
122
    return ign_file
123
124
125
def write_encoded(fhandle, remediation):
126
    try:
127
        fhandle.write(remediation)
128
    finally:
129
        fhandle.close()
130
131
132
def required_for_encode(args, param, print_warning=False):
133
    if getattr(args, param) is None:
134
        if print_warning:
135
            print(f"Argument {param} is required when "
136
                   "encoding a source to an MCO")
137
        return False
138
    return True
139
140
141
def check_encode_args(args):
142
    if required_for_encode(args, "mode", print_warning=True) is False or \
143
       required_for_encode(args, "infile", print_warning=True) is False or \
144
       required_for_encode(args, "target_path", print_warning=True) is False:
145
            print("--mode, --infile and --target-path are "
146
                  "required when encoding")
147
            sys.exit(1)
148
149
    if required_for_encode(args, "platforms") is False and \
150
       required_for_encode(args, "rule") is False:
151
            print("Either platforms is given or the rule name "
152
                  "must given so that we can deduce the platforms")
153
            sys.exit(1)
154
155
156
def resolve_path(rule, path):
157
    # If the path to the ingnition file is given, we just use it
158
    if path is not None:
159
        return path
160
161
    # If path is not given and rule is not given,
162
    # we don't know what to work with
163
    if rule is None:
164
        return None
165
166
    # Otherwise we try to guess it based on the rule name
167
    for root, dirs, files in os.walk('./linux_os'):
168
        if root.endswith(rule):
169
            return os.path.join([root, "ignition", "shared.yml"])
170
171
172
def resolve_rule(rule):
173
    # If the rulename is not given, we can't find the rule path
174
    if rule is None:
175
        return None
176
177
    # Otherwise we try to guess it based on the rule name
178
    for root, dirs, files in os.walk('./linux_os'):
179
        if root.endswith(rule):
180
            return root
181
    return None
182
183
184
def rule_platforms(rule_path):
185
    platforms = ""
186
187
    with open(os.path.join(rule_path, "bash", "shared.sh")) as bash_rem:
188
        for brl in bash_rem.readlines():
189
            if brl.startswith("# platform = "):
190
                platforms += brl.split(" = ")[1].strip()
191
                if "multi_platform_ocp" not in platforms:
192
                    platforms += ",multi_platform_ocp"
193
                return platforms
194
195
    # ignition rules wouldn't make sense w/o ocp platform
196
    return "multi_platform_ocp "
197
198
199
def resolve_platforms(rule_path, args_platforms):
200
    if args_platforms is not None:
201
        return args_platforms
202
203
    return rule_platforms(rule_path)
204
205
206
def decode(filename):
207
    if filename is None:
208
        raise IOError("No filename given to decode")
209
210
    with open(filename) as f:
211
        for line in f.readlines():
212
            # FIXME: what about multiple files?
213
            if 'source: data:,' in line:
214
                return decode_data(line)
215
        raise ValueError("Could not locate the data source in the file")
216
217
    raise IOError("Could not open the remediation file")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IOError does not seem to be defined.
Loading history...
218
219
220
def decode_data(line):
221
    return urllib.parse.unquote(line)
222
223
224
def resolve_for_decode(rule, path):
225
    if path is not None:
226
        return path
227
228
    rule_dir = resolve_rule(rule)
229
    if rule_dir is None:
230
        return None
231
    return os.path.join(rule_dir, "ignition", "shared.yml")
232
233
234
def main():
235
    parser = argparse.ArgumentParser(
236
                            description=description,
237
                            formatter_class=argparse.RawTextHelpFormatter)
238
239
    common_opts = parser.add_argument_group('common')
240
    common_opts.add_argument('--infile', action='store', type=str,
241
                             help='The file to encode or decode')
242
    common_opts.add_argument('--rule', action='store', type=str,
243
                             help="When decoding, used to load the ignition "
244
                                  "file. When encoding, used to write the "
245
                                  "ignition file to")
246
247
    encode_opts = parser.add_argument_group('encoding')
248
    encode_opts.add_argument('--target-path', action='store', type=str,
249
                             help="The path where the file would be applied "
250
                                  "on the host.")
251
    encode_opts.add_argument('--outfile', action='store', type=str,
252
                             help="The path the encoded result will be "
253
                                  "written to. Optional if --rule is "
254
                                  "specified")
255
    encode_opts.add_argument('--mode', action='store', type=str,
256
                             help='The mode of the file on the host.')
257
    encode_opts.add_argument('--platforms', action='store', type=str,
258
                             help="Optional, if not set and rule is set, "
259
                                  "the script will read plarforms from "
260
                                  "existing rules")
261
262
    parser.add_argument('ACTION', choices=['encode', 'decode'])
263
264
    args = parser.parse_args()
265
266
    if args.ACTION == 'encode':
267
        check_encode_args(args)
268
        rule_path = resolve_rule(args.rule)
269
        platforms = resolve_platforms(rule_path, args.platforms)
270
        if platforms is None:
271
            print("Platforms neither passed through flags nor could "
272
                  "be detected, fail")
273
            sys.exit(1)
274
275
        remediation = encode(args.infile, args.target_path,
276
                             args.mode, platforms)
277
278
        try:
279
            outfile_handle = encode_outfile(args.outfile, rule_path)
280
            write_encoded(outfile_handle, remediation)
281
        except OSError as e:
282
            # The rule dir probably couldn't be written to
283
            print(e)
284
            print("Please check the rule name or path")
285
            sys.exit(1)
286
    elif args.ACTION == 'decode':
287
        rem_path = resolve_for_decode(args.rule, args.infile)
288
        try:
289
            print(decode(rem_path))
290
        except IOError as e:
291
            # The rule probably couldn't be loaded
292
            print(e)
293
            print("Please check the rule name or path")
294
            sys.exit(1)
295
        except ValueError as e:
296
            # Malformed remediation?
297
            print(e)
298
            print("Please check if the remediation is well-formed "
299
                  "ignition file")
300
            sys.exit(1)
301
    else:
302
        print("Unknown action")
303
        sys.exit(1)
304
305
if __name__ == "__main__":
306
    main()
307