libs.survey.page_view.image.Image.capture()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 24
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 24
rs 9.6
c 0
b 0
f 0
cc 2
nop 2
1
import base64
2
import os
3
import pickle
4
import time
5
import uuid
6
from hashlib import sha256
7
from multiprocessing import Process, Queue
8
9
import cv2
10
import numpy
11
from skimage.metrics import structural_similarity
12
13
from .browser import BrowserRender, BrowserAgent
14
15
"""
16
    Copyright (c) 2020 Star Inc.(https://starinc.xyz)
17
18
    This Source Code Form is subject to the terms of the Mozilla Public
19
    License, v. 2.0. If a copy of the MPL was not distributed with this
20
    file, You can obtain one at http://mozilla.org/MPL/2.0/.
21
"""
22
23
24
class Image:
25
    """
26
    Handle images for PageView
27
    """
28
29
    def __init__(self, pbp_handle):
30
        self.capture_handle = WebCapture(pbp_handle.cfg["WebCapture"])
31
        self.data_control = pbp_handle.data_control
32
33
    async def capture(self, url: str):
34
        """
35
        Capture Web Page by URL
36
37
        :param url: URL to capture
38
        :return: string hashed and NumPy Array
39
        """
40
        cache_file = "{}.png".format(
41
            uuid.uuid5(uuid.NAMESPACE_URL, url).hex
42
        )
43
        layout_path = self.capture_handle.get_page_image(
44
            target_url=url,
45
            output_image=cache_file
46
        )
47
        image_num_array = None
48
        count, timeout = 0, 5
49
        while image_num_array is None:
50
            assert count < timeout, "Timeout while reading image_num_array"
51
            image_num_array = self.capture_handle.image_object(layout_path)
52
            time.sleep(1)
53
            count += 1
54
        hash_object = sha256(image_num_array)
55
        self.capture_handle.delete_page_image(cache_file)
56
        return hash_object.hexdigest(), image_num_array
57
58
    async def signature(self, hex_digest: str):
59
        """
60
        Match PageView signature from database
61
62
        :param hex_digest: string hashed
63
        :return: URL or NoneType
64
        """
65
        return self.data_control.find_page_by_view_signature(hex_digest)
66
67
    async def rank(self, target_num_array: str):
68
        """
69
        To rank URL not registered if it same/similar to someone in trustlist.
70
71
        :param target_num_array: NumPy Array
72
        :return: URLs that similar to the target
73
        """
74
        q = Queue()
75
        thread = None
76
77
        def _compare(sample: dict):
78
            """
79
            Child function, rank sample of URL with samples in trustlist
80
81
            :param sample: NumPy Array
82
            :return:
83
            """
84
            origin_sample = self.capture_handle.image_object_from_b64(
85
                sample["target_view_narray"].encode("utf-8")
86
            )
87
            q.put([
88
                sample["url"],
89
                self.capture_handle.image_compare(
90
                    target_num_array,
91
                    origin_sample
92
                )
93
            ])
94
95
        trust_samples = self.data_control.get_view_narray_from_trustlist()
96
        for record in trust_samples:
97
            thread = Process(
98
                target=_compare,
99
                args=(record,)
100
            )
101
            thread.start()
102
103
        if thread:
104
            thread.join()
105
106
        for _ in trust_samples:
107
            yield q.get()
108
109
110
class WebCapture:
111
    """
112
    To take screenshot for PBP.
113
    """
114
115
    def __init__(self, config: dict):
116
        self.capture_browser = config["capture_browser"]
117
        self.cache_path = config["cache_path"]
118
        self.browser = config["capture_type"]
119
120
        if not os.path.exists(self.cache_path):
121
            os.makedirs(self.cache_path)
122
123
    @staticmethod
124
    def __set_browser_simulation(type_id: str):
125
        """
126
        Set Browser Simulation by ID
127
128
        :param type_id: Type ID
129
        :return: class object
130
        """
131
        return {
132
            '1': BrowserRender,
133
            '2': BrowserAgent
134
        }[type_id]
135
136
    def get_page_image(self, target_url: str, output_image: str = 'out.png'):
137
        """
138
        To get the image of the URL you provided
139
140
        :param target_url: The target URL
141
        :param output_image: Output path (optional)
142
        :return: bool
143
        """
144
        layout_path = os.path.join(self.cache_path, output_image)
145
        simulation = self.__set_browser_simulation(self.browser)(self.capture_browser)
146
        if os.path.isfile(layout_path):
147
            os.remove(layout_path)
148
        simulation.capture(target_url, layout_path)
149
        simulation.close()
150
        return layout_path
151
152
    def delete_page_image(self, output_image: str = 'out.png'):
153
        """
154
        To delete the image of the URL you provided
155
156
        :param output_image: Output path (optional)
157
        :return: bool
158
        """
159
        layout_path = os.path.join(self.cache_path, output_image)
160
        if os.path.isfile(layout_path):
161
            os.remove(layout_path)
162
163
    @staticmethod
164
    def image_object(path: str):
165
        """
166
        Create NumPy Array
167
168
        :param path: The Image Path
169
        :return: NumPy Array
170
        """
171
        return cv2.imread(path, 0)
172
173
    @staticmethod
174
    def image_object_from_b64(b64_string: bytes):
175
        """
176
        Import NumPy Array by base64
177
178
        :param b64_string: base64 NumPy Array dumped
179
        :return: NumPy Array
180
        """
181
        string = base64.b64decode(b64_string)
182
        return pickle.loads(string)
183
184
    @staticmethod
185
    def image_compare(img1: numpy.numarray, img2: numpy.numarray):
186
        """
187
        To compare image using structural similarity index
188
189
        :param img1: Image object
190
        :param img2: Image object
191
        :return: float of the similar lever
192
        """
193
        return structural_similarity(img1, img2, multichannel=True)
194