1 | 1 | from typing import Dict |
|
2 | 1 | from urllib import request |
|
3 | |||
4 | 1 | import httplib2 |
|
5 | 1 | from cleo.io.io import IO |
|
6 | |||
7 | 1 | from sdoc.sdoc2.node.Node import Node |
|
8 | 1 | from sdoc.sdoc2.NodeStore import NodeStore |
|
9 | |||
10 | |||
11 | 1 | View Code Duplication | class HyperlinkNode(Node): |
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
12 | """ |
||
13 | SDoc2 node for hyperlinks. |
||
14 | """ |
||
15 | |||
16 | # ------------------------------------------------------------------------------------------------------------------ |
||
17 | 1 | def __init__(self, io: IO, options: Dict[str, str], argument: str): |
|
18 | """ |
||
19 | Object constructor. |
||
20 | |||
21 | :param OutputStyle io: The IO object. |
||
22 | :param dict[str,str] options: The options of the hyperlink. |
||
23 | :param str argument: Not used. |
||
24 | """ |
||
25 | super().__init__(io, 'hyperlink', options, argument) |
||
26 | |||
27 | # ------------------------------------------------------------------------------------------------------------------ |
||
28 | 1 | def get_html_attributes(self) -> Dict[str, str]: |
|
29 | """ |
||
30 | Checks valid html attributes for hyperlinks and returns a list of attributes. |
||
31 | """ |
||
32 | valid_html_attributes = ('href', 'class', 'id', 'download', 'hreflang', 'media', 'rel', 'target', 'type') |
||
33 | attributes_dict = {} |
||
34 | |||
35 | for key, value in self._options.items(): |
||
36 | if key in valid_html_attributes: |
||
37 | attributes_dict[key] = value |
||
38 | |||
39 | return attributes_dict |
||
40 | |||
41 | # ------------------------------------------------------------------------------------------------------------------ |
||
42 | 1 | def prepare_content_tree(self) -> None: |
|
43 | """ |
||
44 | Prepares the content of the node. Checks URL of 'href' attribute. Sets if needed. |
||
45 | """ |
||
46 | # Setting scheme if we haven't. |
||
47 | if 'href' in self._options: |
||
48 | self.set_scheme(self._options['href']) |
||
49 | else: |
||
50 | self.set_scheme(self._argument) |
||
51 | |||
52 | # Trying to connect |
||
53 | self.try_connect() |
||
54 | |||
55 | # ------------------------------------------------------------------------------------------------------------------ |
||
56 | 1 | def set_scheme(self, url: str): |
|
57 | """ |
||
58 | Checks if we haven't got a scheme. Sets scheme if needed. |
||
59 | |||
60 | :param str url: The URL with scheme or without. |
||
61 | """ |
||
62 | if not request.urlparse(url).scheme: |
||
63 | if url.startswith('ftp.'): |
||
64 | url = 'ftp://{0!s}'.format(url) |
||
65 | self._options['href'] = url |
||
66 | else: |
||
67 | url = 'http://{0!s}'.format(url) |
||
68 | self._options['href'] = url |
||
69 | |||
70 | # ------------------------------------------------------------------------------------------------------------------ |
||
71 | 1 | def try_connect(self) -> None: |
|
72 | """ |
||
73 | Tries to connect to the URL. On a successful connection, checks for a redirect. If redirected to protocol https |
||
74 | and host is the same, updates the protocol in the URL. |
||
75 | """ |
||
76 | try: |
||
77 | response = request.urlopen(self._options['href']) |
||
78 | |||
79 | # Check if we can connect to host. |
||
80 | if response.getcode() not in range(200, 400): |
||
81 | self.io.warning("Cannot connect to: '{0!s}'".format(self._options['href'])) |
||
82 | else: |
||
83 | # If we connected, check the redirect. |
||
84 | url = self._options['href'].lstrip('(http://)|(https://)') |
||
85 | split_url = url.split('/') |
||
86 | |||
87 | host = split_url[0] |
||
88 | address = '/'.join(split_url[1:]) |
||
89 | |||
90 | connection = httplib2.HTTPConnectionWithTimeout(host) |
||
91 | connection.request('HEAD', address) |
||
92 | response = connection.getresponse() |
||
93 | |||
94 | if response.status in range(301, 304): |
||
95 | # If host of redirected is the same, reset 'href' option |
||
96 | if response.getheader('Location').startswith('https://' + url): |
||
97 | self._options['href'].replace('http://', 'https://') |
||
98 | |||
99 | except Exception as exception: |
||
100 | self.io.warning("Unable to retrieve URL: '{0!s}'".format(self._options['href'])) |
||
101 | self.io.warning(str(exception.__class__)) |
||
102 | self.io.warning(str(exception)) |
||
103 | |||
104 | # ------------------------------------------------------------------------------------------------------------------ |
||
105 | |||
106 | 1 | def get_command(self) -> str: |
|
107 | """ |
||
108 | Returns the command of this node, i.e. hyperlink. |
||
109 | """ |
||
110 | return 'hyperlink' |
||
111 | |||
112 | # ------------------------------------------------------------------------------------------------------------------ |
||
113 | 1 | def is_phrasing(self) -> bool: |
|
114 | """ |
||
115 | Returns True. |
||
116 | """ |
||
117 | return True |
||
118 | |||
119 | # ------------------------------------------------------------------------------------------------------------------ |
||
120 | 1 | def is_inline_command(self) -> bool: |
|
121 | """ |
||
122 | Returns True. |
||
123 | """ |
||
124 | return True |
||
125 | |||
126 | # ------------------------------------------------------------------------------------------------------------------ |
||
127 | 1 | def is_block_command(self) -> bool: |
|
128 | """ |
||
129 | Returns False. |
||
130 | """ |
||
131 | return False |
||
132 | |||
133 | |||
134 | # ---------------------------------------------------------------------------------------------------------------------- |
||
135 | NodeStore.register_inline_command('hyperlink', HyperlinkNode) |
||
136 |