yaml_processor._update()   A
last analyzed

Complexity

Conditions 5

Size

Total Lines 20
Code Lines 17

Duplication

Lines 20
Ratio 100 %

Importance

Changes 0
Metric Value
cc 5
eloc 17
nop 4
dl 20
loc 20
rs 9.0833
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
3
import sys
4
import argparse
5
from argparse import Namespace
6
import os, shutil
7
import getopt
8
from ruamel.yaml import YAML, yaml_object
9
from ruamel.yaml.comments import CommentedSeq, CommentedMap
10
from ruamel.yaml.tokens import CommentToken
11
12
##
13
yaml = YAML(typ="rt")
14
## format yaml file
15
yaml.indent(mapping=2, sequence=4, offset=2)
16
17
18
############################################
19
# Comment operation
20
#
21
############################################
22
def _extract_comment(_comment):
23
    """
24
    remove '#' at start of comment
25
    """
26
    # if _comment is empty, do nothing
27
    if not _comment:
28
        return _comment
29
30
    # str_ = _comment.lstrip(" ")
31
    str_ = _comment.strip()
32
    str_ = str_.lstrip("#")
33
34
    return str_
35
36
37 View Code Duplication
def _add_eol_comment(element, *args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
38
    """
39
    add_eol_comment
40
    args --> (comment, key)
41
    """
42
    if element is None or \
43
            (not isinstance(element, CommentedMap) and
44
             not isinstance(element, CommentedSeq)) or \
45
            args[0] is None or \
46
            len(args[0]) == 0:
47
        return
48
49
    comment = args[0]
50
    # comment is empty, do nothing
51
    if not comment:
52
        return
53
54
    key = args[1]
55
    try:
56
        element.yaml_add_eol_comment(*args, **kwargs)
57
    except Exception:
58
        element.ca.items.pop(key, None)
59
        element.yaml_add_eol_comment(*args, **kwargs)
60
61
62 View Code Duplication
def _map_comment(_element, _key):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
63
    origin_comment = ""
64
    token = _element.ca.items.get(_key, None)
65
    if token is not None:
66
        try:
67
            origin_comment = token[2].value
68
        except Exception:
69
            try:
70
                # comment is below element, add profix "#\n"
71
                col = _element.lc.col + 2
72
                space_list = [" " for i in range(col)]
73
                space_str = "".join(space_list)
74
75
                origin_comment = "\n" + "".join([space_str + t.value for t in token[3]])
76
            except Exception:
77
                pass
78
79
    return origin_comment
80
81
82
def _seq_comment(_element, _index):
83
    # get target comment
84
    _comment = ""
85
    token = _element.ca.items.get(_index, None)
86
    if token is not None:
87
        _comment = token[0].value
88
89
    return _comment
90
91
92
def _start_comment(_element):
93
    _comment = ""
94
    cmt = _element.ca.comment
95
    try:
96
        _comment = cmt[1][0].value
97
    except Exception:
98
        pass
99
100
    return _comment
101
102
103 View Code Duplication
def _comment_counter(_comment):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
104
    """
105
106
    counter comment tips and split into list
107
    """
108
109
    x = lambda l: l.strip().strip("#").strip()
110
111
    _counter = []
112
    if _comment.startswith("\n"):
113
        _counter.append("")
114
        _counter.append(x(_comment[1:]))
115
116
        return _counter
117
    elif _comment.startswith("#\n"):
118
        _counter.append("")
119
        _counter.append(x(_comment[2:]))
120
    else:
121
        index = _comment.find("\n")
122
        _counter.append(x(_comment[:index]))
123
        _counter.append(x(_comment[index + 1:]))
124
125
    return _counter
126
127
128 View Code Duplication
def _obtain_comment(_m_comment, _t_comment):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
129
    if not _m_comment or not _t_comment:
130
        return _m_comment or _t_comment
131
132
    _m_counter = _comment_counter(_m_comment)
133
    _t_counter = _comment_counter(_t_comment)
134
135
    if not _m_counter[0] and not _t_counter[1]:
136
        comment = _t_comment + _m_comment
137
    elif not _m_counter[1] and not _t_counter[0]:
138
        comment = _m_comment + _t_comment
139
    elif _t_counter[0] and _t_counter[1]:
140
        comment = _t_comment
141
    elif not _t_counter[0] and not _t_counter[1]:
142
        comment = _m_comment
143
    elif not _m_counter[0] and not _m_counter[1]:
144
        comment = _t_comment
145
    else:
146
        if _t_counter[0]:
147
            comment = _m_comment.replace(_m_counter[0], _t_counter[0], 1)
148
        else:
149
            comment = _m_comment.replace(_m_counter[1], _t_counter[1], 1)
150
151
    i = comment.find("\n\n")
152
    while i >= 0:
153
        comment = comment.replace("\n\n\n", "\n\n", 1)
154
        i = comment.find("\n\n\n")
155
156
    return comment
157
158
159
############################################
160
# Utils
161
#
162
############################################
163 View Code Duplication
def _get_update_par(_args):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
164
    _dict = _args.__dict__
165
166
    # file path
167
    _in_file = _dict.get("f", None) or _dict.get("file", None)
168
    # tips
169
    _tips = _dict.get('tips', None) or "Input \"-h\" for more information"
170
    # update
171
    _u = _dict.get("u", None) or _dict.get("update", None)
172
    # apppend
173
    _a = _dict.get('a', None) or _dict.get('append', None)
174
    # out stream group
175
    _i = _dict.get("i", None) or _dict.get("inplace", None)
176
    _o = _dict.get("o", None) or _dict.get("out_file", None)
177
178
    return _in_file, _u, _a, _i, _o, _tips
179
180
181
############################################
182
# Element operation
183
#
184
############################################
185 View Code Duplication
def update_map_element(element, key, value, comment, _type):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
186
    """
187
     element:
188
     key:
189
     value:
190
     comment:
191
     _type:  value type.
192
    """
193
    if element is None or not isinstance(element, CommentedMap):
194
        print("Only key-value update support")
195
        sys.exit(1)
196
197
    origin_comment = _map_comment(element, key)
198
199
    sub_element = element.get(key, None)
200
    if isinstance(sub_element, CommentedMap) or isinstance(sub_element, CommentedSeq):
201
        print("Only support update a single value")
202
203
    element.update({key: value})
204
205
    comment = _obtain_comment(origin_comment, comment)
206
    _add_eol_comment(element, _extract_comment(comment), key)
207
208
209
def update_seq_element(element, value, comment, _type):
210
    if element is None or not isinstance(element, CommentedSeq):
211
        print("Param `-a` only use to append yaml list")
212
        sys.exit(1)
213
    element.append(str(value))
214
215
    comment = _obtain_comment("", comment)
216
    _add_eol_comment(element, _extract_comment(comment), len(element) - 1)
217
218
219 View Code Duplication
def run_update(code, keys, value, comment, _app):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
220
    key_list = keys.split(".")
221
222
    space_str = ":\n  "
223
    key_str = "{}".format(key_list[0])
224
    for key in key_list[1:]:
225
        key_str = key_str + space_str + key
226
        space_str = space_str + "  "
227
    if not _app:
228
        yaml_str = """{}: {}""".format(key_str, value)
229
    else:
230
        yaml_str = "{}{}- {}".format(key_str, space_str, value)
231
232
    if comment:
233
        yaml_str = "{} # {}".format(yaml_str, comment)
234
235
    mcode = yaml.load(yaml_str)
236
237
    _merge(code, mcode)
238
239
240 View Code Duplication
def _update(code, _update, _app, _tips):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
241
    if not _update:
242
        return code
243
244
    _update_list = [l.strip() for l in _update.split(",")]
245
    for l in _update_list:
246
        try:
247
            variant, comment = l.split("#")
248
        except ValueError:
249
            variant = l
250
            comment = None
251
252
        try:
253
            keys, value = variant.split("=")
254
            run_update(code, keys, value, comment, _app)
255
        except ValueError:
256
            print("Invalid format. print command \"--help\" get more info.")
257
            sys.exit(1)
258
259
    return code
260
261
262 View Code Duplication
def _backup(in_file_p):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
263
    backup_p = in_file_p + ".bak"
264
265
    if os.path.exists(backup_p):
266
        os.remove(backup_p)
267
268
    if not os.path.exists(in_file_p):
269
        print("File {} not exists.".format(in_file_p))
270
        sys.exit(1)
271
272
    shutil.copyfile(in_file_p, backup_p)  # 复制文件
273
274
275 View Code Duplication
def _recovery(in_file_p):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
276
    backup_p = in_file_p + ".bak"
277
278
    if not os.path.exists(in_file_p):
279
        print("File {} not exists.".format(in_file_p))
280
        sys.exit(1)
281
    elif not os.path.exists(backup_p):
282
        print("Backup file not exists")
283
        sys.exit(0)
284
285
    os.remove(in_file_p)
286
287
    os.rename(backup_p, in_file_p)
288
289
290
# master merge target
291 View Code Duplication
def _merge(master, target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
292
    if type(master) != type(target):
293
        print("yaml format not match:\n")
294
        yaml.dump(master, sys.stdout)
295
        print("\n&&\n")
296
        yaml.dump(target, sys.stdout)
297
298
        sys.exit(1)
299
300
    ## item is a sequence
301
    if isinstance(target, CommentedSeq):
302
        for index in range(len(target)):
303
            # get target comment
304
            target_comment = _seq_comment(target, index)
305
306
            master_index = len(master)
307
308
            target_item = target[index]
309
310
            if isinstance(target_item, CommentedMap):
311
                merge_flag = False
312
                for idx in range(len(master)):
313
                    if isinstance(master[idx], CommentedMap):
314
                        if master[idx].keys() == target_item.keys():
315
                            _merge(master[idx], target_item)
316
                            # nonlocal merge_flag
317
                            master_index = idx
318
                            merge_flag = True
319
                            break
320
321
                if merge_flag is False:
322
                    master.append(target_item)
323
            elif target_item not in master:
324
                master.append(target[index])
325
            else:
326
                # merge(master[index], target[index])
327
                pass
328
329
            # # remove enter signal in previous item
330
            previous_comment = _seq_comment(master, master_index - 1)
331
            _add_eol_comment(master, _extract_comment(previous_comment), master_index - 1)
332
333
            origin_comment = _seq_comment(master, master_index)
334
            comment = _obtain_comment(origin_comment, target_comment)
335
            if len(comment) > 0:
336
                _add_eol_comment(master, _extract_comment(comment) + "\n\n", len(master) - 1)
337
338
    ## item is a map
339
    elif isinstance(target, CommentedMap):
340
        for item in target:
341
            if item == "flag":
342
                print("")
343
            origin_comment = _map_comment(master, item)
344
            target_comment = _map_comment(target, item)
345
346
            # get origin start comment
347
            origin_start_comment = _start_comment(master)
348
349
            # get target start comment
350
            target_start_comment = _start_comment(target)
351
352
            m = master.get(item, default=None)
353
            if m is None or \
354
                    (not (isinstance(m, CommentedMap) or
355
                          isinstance(m, CommentedSeq))):
356
                master.update({item: target[item]})
357
358
            else:
359
                _merge(master[item], target[item])
360
361
            comment = _obtain_comment(origin_comment, target_comment)
362
            if len(comment) > 0:
363
                _add_eol_comment(master, _extract_comment(comment), item)
364
365
            start_comment = _obtain_comment(origin_start_comment, target_start_comment)
366
            if len(start_comment) > 0:
367
                master.yaml_set_start_comment(_extract_comment(start_comment))
368
369
370
def _save(_code, _file):
371
    with open(_file, 'w') as wf:
372
        yaml.dump(_code, wf)
373
374
375
def _load(_file):
376
    with open(_file, 'r') as rf:
377
        code = yaml.load(rf)
378
    return code
379
380
381
############################################
382
# sub parser process operation
383
#
384
############################################
385 View Code Duplication
def merge_yaml(_args):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
386
    _dict = _args.__dict__
387
388
    _m_file = _dict.get("merge_file", None)
389
    _in_file, _u, _a, _i, _o, _tips = _get_update_par(_args)
390
391
    if not (_in_file and _m_file):
392
        print(_tips)
393
        sys.exit(1)
394
395
    code = _load(_in_file)
396
    mcode = _load(_m_file)
397
398
    _merge(code, mcode)
399
400
    _update(code, _u, _a, _tips)
401
402
    if _i:
403
        _backup(_in_file)
404
        _save(code, _in_file)
405
    elif _o:
406
        _save(code, _o)
407
    else:
408
        print(_tips)
409
        sys.exit(1)
410
411
412 View Code Duplication
def update_yaml(_args):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
413
    _in_file, _u, _a, _i, _o, _tips = _get_update_par(_args)
414
415
    if not _in_file or not _u:
416
        print(_tips)
417
        sys.exit(1)
418
419
    code = _load(_in_file)
420
421
    if _i and _o:
422
        print(_tips)
423
        sys.exit(1)
424
425
    _update(code, _u, _a, _tips)
426
427
    if _i:
428
        _backup(_in_file)
429
        _save(code, _in_file)
430
    elif _o:
431
        _save(code, _o)
432
433
434
def reset(_args):
435
    _dict = _args.__dict__
436
    _f = _dict.get('f', None) or _dict.get('file', None)
437
438
    if _f:
439
        _recovery(_f)
440
    else:
441
        _t = _dict.get('tips', None) or "Input \"-h\" for more information"
442
        print(_t)
443
444
445
############################################
446
# Cli operation
447
#
448
############################################
449
def _set_merge_parser(_parsers):
450
    """
451
    config merge parser
452
    """
453
454
    merge_parser = _parsers.add_parser("merge", help="merge with another yaml file")
455
456
    _set_merge_parser_arg(merge_parser)
457
    _set_update_parser_arg(merge_parser)
458
459
    merge_parser.set_defaults(
460
        function=merge_yaml,
461
        tips=merge_parser.format_help()
462
    )
463
464
465
def _set_merge_parser_arg(_parser):
466
    """
467
    config parser argument for merging
468
    """
469
470
    _parser.add_argument("-m", "--merge-file", help="indicate merge yaml file")
471
472
473
def _set_update_parser(_parsers):
474
    """
475
    config merge parser
476
    """
477
478
    update_parser = _parsers.add_parser("update", help="update with another yaml file")
479
    _set_update_parser_arg(update_parser)
480
481
    update_parser.set_defaults(
482
        function=update_yaml,
483
        tips=update_parser.format_help()
484
    )
485
486
487 View Code Duplication
def _set_update_parser_arg(_parser):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
488
    """
489
    config parser argument for updating
490
    """
491
492
    _parser.add_argument("-f", "--file", help="source yaml file")
493
    _parser.add_argument('-u', '--update', help="update with args, instance as \"a.b.c=d# d comment\"")
494
    _parser.add_argument('-a', '--append', action="store_true", help="append to a seq")
495
496
    group = _parser.add_mutually_exclusive_group()
497
    group.add_argument("-o", "--out-file", help="indicate output yaml file")
498
    group.add_argument("-i", "--inplace", action="store_true", help="indicate whether result store in origin file")
499
500
501
def _set_reset_parser(_parsers):
502
    """
503
    config merge parser
504
    """
505
506
    reset_parser = _parsers.add_parser("reset", help="reset yaml file")
507
508
    # indicate yaml file
509
    reset_parser.add_argument('-f', '--file', help="indicate input yaml file")
510
511
    reset_parser.set_defaults(
512
        function=reset,
513
        tips=reset_parser.format_help()
514
    )
515
516
517
def main():
518
    parser = argparse.ArgumentParser()
519
    sub_parsers = parser.add_subparsers()
520
521
    # set merge command
522
    _set_merge_parser(sub_parsers)
523
524
    # set update command
525
    _set_update_parser(sub_parsers)
526
527
    # set reset command
528
    _set_reset_parser(sub_parsers)
529
530
    # parse argument and run func
531
    args = parser.parse_args()
532
    args.function(args)
533
534
535
if __name__ == '__main__':
536
    main()
537