Completed
Push — master ( e7778f...95d0df )
by
unknown
33s
created

_get_bit_stream()   A

Complexity

Conditions 3

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 6
rs 9.4285
1
from __future__ import print_function
2
3
import array
4
import png
5
import json
6
7
from st2actions.runners.pythonrunner import Action
8
9
__all__ = [
10
    'ImageDecoder'
11
]
12
13
14
def _get_image_array(image_path):
15
    with open(image_path, 'rb') as image_fd:
16
        image_reader = png.Reader(file=image_fd)
17
18
        image_map = image_reader.read()[2]
19
20
        return list(image_map)
21
22
23
def _get_bit_stream(message):
24
    for char in message:
25
        char_int = ord(char)
26
27
        for i in range(8):
28
            yield char_int >> i & 1
29
30
31
def _assemble_bit_stream(message_array):
32
    return message_array
33
34
35
def _encode_image(image, bit_stream):
36
    new_image = []
37
    for byte_array in image:
38
        new_byte_array = array.array('B')
39
        for byte in byte_array:
40
            try:
41
                bit = bit_stream.next()
42
            except StopIteration:
43
                bit = 0
44
45
            new_byte = byte & int('11111110', 2) | int('0000000%s' % bit, 2)
46
47
            new_byte_array.append(new_byte)
48
49
        new_image.append(new_byte_array)
50
51
    return new_image
52
53
54
def _decode_image(image):
55
    return image
56
57
58
def _save_image(name, image_map):
59
    image = png.from_array(image_map, mode='RGBA')
60
    image.save(name)
61
62
63
def _encode(image, output_image, message):
64
    for arg in [image, output_image, message]:
65
        if not isinstance(arg, str):
66
            raise TypeError("Argument must be string.")
67
68
        if arg == '':
69
            raise ValueError("Argument cannot be empty.")
70
71
    image_map = _get_image_array(image)
72
    bit_generator = _get_bit_stream(message)
73
    encoded_array = _encode_image(image_map, bit_generator)
74
    _save_image(output_image, encoded_array)
75
76
77
def _remove_null(output):
78
    try:
79
        output = output[:output.index(u'\u0000')]
80
81
    except ValueError:
82
        return
83
84
    return output
85
86
87
def _decode(image):
88
    if not isinstance(image, str):
89
        raise TypeError("Argument must be string.")
90
91
    if image == '':
92
        raise ValueError("Argument cannot be empty.")
93
94
    image_map = _get_image_array(image)
95
    bit_stream = _decode_image(image_map)
96
    output = _assemble_bit_stream(bit_stream)
97
98
    return _remove_null(output)
99
100
101
class ImageDecoder(Action):
102
    def run(self, encode, image, output_image=None, message=None):
103
        if encode:
104
            _encode(
105
                str(image),
106
                str(output_image),
107
                str(message)
108
            )
109
110
            return True
111
112
        elif not encode:
113
            result = {
114
                'message': str(_decode(str(image)))
115
            }
116
117
            return json.dumps(result)
118