1 | # SPDX-License-Identifier: LGPL-3.0-only |
||
2 | |||
3 | 1 | """Functions to export documents and items.""" |
|
4 | 1 | ||
5 | 1 | import csv |
|
6 | 1 | import datetime |
|
7 | import os |
||
8 | 1 | from collections import defaultdict |
|
9 | 1 | from typing import Any, Dict |
|
10 | |||
11 | 1 | import openpyxl |
|
12 | 1 | import yaml |
|
13 | 1 | ||
14 | 1 | from doorstop import common, settings |
|
15 | from doorstop.common import DoorstopError |
||
16 | 1 | from doorstop.core.types import iter_documents, iter_items |
|
17 | |||
18 | 1 | LIST_SEP = '\n' # string separating list values when joined in a string |
|
19 | 1 | ||
20 | XLSX_MAX_WIDTH = 65.0 # maximum width for a column |
||
21 | 1 | XLSX_FILTER_PADDING = 3.5 # column padding to account for filter button |
|
22 | |||
23 | log = common.logger(__name__) |
||
24 | 1 | ||
25 | |||
26 | def export(obj, path, ext=None, **kwargs): |
||
27 | """Export an object to a given format. |
||
28 | |||
29 | The function can be called in two ways: |
||
30 | |||
31 | 1. document or item-like object + output file path |
||
32 | 2. tree-like object + output directory path |
||
33 | |||
34 | :param obj: (1) Item, list of Items, Document or (2) Tree |
||
35 | :param path: (1) output file path or (2) output directory path |
||
36 | :param ext: file extension to override output extension |
||
37 | |||
38 | :raises: :class:`doorstop.common.DoorstopError` for unknown file formats |
||
39 | |||
40 | :return: output location if files created, else None |
||
41 | |||
42 | 1 | """ |
|
43 | 1 | # Determine the output format |
|
44 | ext = ext or os.path.splitext(path)[-1] or '.csv' |
||
45 | check(ext) |
||
46 | 1 | ||
47 | 1 | # Export documents |
|
48 | 1 | count = 0 |
|
49 | for obj2, path2 in iter_documents(obj, path, ext): |
||
50 | count += 1 |
||
51 | 1 | ||
52 | 1 | # Export content to the specified path |
|
53 | 1 | common.create_dirname(path2) |
|
54 | 1 | log.info("exporting to {}...".format(path2)) |
|
55 | 1 | if ext in FORMAT_LINES: |
|
56 | lines = export_lines(obj2, ext, **kwargs) |
||
57 | 1 | common.write_lines(lines, path2) |
|
58 | else: |
||
59 | export_file(obj2, path2, ext, **kwargs) |
||
60 | 1 | ||
61 | 1 | # Return the exported path |
|
62 | 1 | if count: |
|
63 | 1 | msg = "exported to {} file{}".format(count, 's' if count > 1 else '') |
|
64 | log.info(msg) |
||
65 | 1 | return path |
|
66 | 1 | else: |
|
67 | log.warning("nothing to export") |
||
68 | return None |
||
69 | 1 | ||
70 | |||
71 | def export_lines(obj, ext='.yml', **kwargs): |
||
72 | """Yield lines for an export in the specified format. |
||
73 | |||
74 | :param obj: Item, list of Items, or Document to export |
||
75 | :param ext: file extension to specify the output format |
||
76 | |||
77 | :raises: :class:`doorstop.common.DoorstopError` for unknown file formats |
||
78 | |||
79 | :return: lines generator |
||
80 | 1 | ||
81 | 1 | """ |
|
82 | 1 | gen = check(ext, get_lines_gen=True) |
|
83 | log.debug("yielding {} as lines of {}...".format(obj, ext)) |
||
84 | yield from gen(obj, **kwargs) |
||
85 | 1 | ||
86 | |||
87 | def export_file(obj, path, ext=None, **kwargs): |
||
88 | """Create a file object for an export in the specified format. |
||
89 | |||
90 | :param obj: Item, list of Items, or Document to export |
||
91 | :param path: output file location with desired extension |
||
92 | :param ext: file extension to override output path's extension |
||
93 | |||
94 | :raises: :class:`doorstop.common.DoorstopError` for unknown file formats |
||
95 | |||
96 | :return: path to created file |
||
97 | 1 | ||
98 | 1 | """ |
|
99 | 1 | ext = ext or os.path.splitext(path)[-1] |
|
100 | 1 | func = check(ext, get_file_func=True) |
|
101 | 1 | log.debug("converting %s to file format %s...", obj, ext) |
|
102 | 1 | try: |
|
103 | 1 | return func(obj, path, **kwargs) |
|
104 | 1 | except IOError: |
|
105 | msg = "unable to write to: {}".format(path) |
||
106 | raise common.DoorstopFileError(msg) from None |
||
107 | 1 | ||
108 | |||
109 | def _lines_yaml(obj, **_): |
||
110 | """Yield lines for a YAML export. |
||
111 | |||
112 | :param obj: Item, list of Items, or Document to export |
||
113 | |||
114 | :return: iterator of lines of text |
||
115 | 1 | ||
116 | """ |
||
117 | 1 | for item in iter_items(obj): |
|
118 | 1 | ||
119 | 1 | data = {str(item.uid): item.data} |
|
120 | text = yaml.dump(data, default_flow_style=False, allow_unicode=True) |
||
121 | yield text |
||
122 | 1 | ||
123 | |||
124 | def _tabulate(obj, sep=LIST_SEP, auto=False): |
||
125 | """Yield lines of header/data for tabular export. |
||
126 | |||
127 | :param obj: Item, list of Items, or Document to export |
||
128 | :param sep: string separating list values when joined in a string |
||
129 | :param auto: include placeholders for new items on import |
||
130 | |||
131 | :return: iterator of rows of data |
||
132 | 1 | ||
133 | """ |
||
134 | 1 | yield_header = True |
|
135 | |||
136 | 1 | for item in iter_items(obj): |
|
137 | |||
138 | data = item.data |
||
139 | 1 | ||
140 | 1 | # Yield header |
|
141 | 1 | if yield_header: |
|
142 | 1 | header = ['level', 'text', 'ref', 'links'] |
|
143 | 1 | for value in sorted(data.keys()): |
|
144 | 1 | if value not in header: |
|
145 | 1 | header.append(value) |
|
146 | yield ['uid'] + header |
||
147 | yield_header = False |
||
148 | 1 | ||
149 | 1 | # Yield row |
|
150 | 1 | row = [item.uid] |
|
151 | 1 | for key in header: |
|
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
152 | value = data.get(key) |
||
153 | 1 | if key == 'level': |
|
154 | 1 | # some levels are floats for YAML presentation |
|
155 | value = str(value) |
||
156 | 1 | elif key == 'links': |
|
157 | 1 | # separate identifiers with a delimiter |
|
158 | value = sep.join(uid.string for uid in item.links) |
||
159 | 1 | elif isinstance(value, str) and key not in ('reviewed',): |
|
160 | 1 | # remove sentence boundaries and line wrapping |
|
161 | 1 | value = item.get(key) |
|
162 | 1 | elif value is None: |
|
163 | 1 | value = '' |
|
164 | row.append(value) |
||
165 | yield row |
||
166 | 1 | ||
167 | 1 | # Yield placeholders for new items |
|
168 | 1 | if auto: |
|
169 | for _ in range(settings.PLACEHOLDER_COUNT): |
||
170 | yield [settings.PLACEHOLDER] |
||
171 | 1 | ||
172 | |||
173 | def _file_csv(obj, path, delimiter=',', auto=False): |
||
174 | """Create a CSV file at the given path. |
||
175 | |||
176 | :param obj: Item, list of Items, or Document to export |
||
177 | :param path: location to export CSV file |
||
178 | :param delimiter: character to delimit fields |
||
179 | :param auto: include placeholders for new items on import |
||
180 | |||
181 | :return: path of created file |
||
182 | 1 | ||
183 | 1 | """ |
|
184 | 1 | with open(path, 'w', newline='', encoding='utf-8') as stream: |
|
185 | 1 | writer = csv.writer(stream, delimiter=delimiter) |
|
186 | 1 | for row in _tabulate(obj, auto=auto): |
|
187 | writer.writerow(row) |
||
188 | return path |
||
189 | 1 | ||
190 | |||
191 | def _file_tsv(obj, path, auto=False): |
||
192 | """Create a TSV file at the given path. |
||
193 | |||
194 | :param obj: Item, list of Items, or Document to export |
||
195 | :param path: location to export TSV file |
||
196 | :param auto: include placeholders for new items on import |
||
197 | |||
198 | :return: path of created file |
||
199 | 1 | ||
200 | """ |
||
201 | return _file_csv(obj, path, delimiter='\t', auto=auto) |
||
202 | 1 | ||
203 | |||
204 | def _file_xlsx(obj, path, auto=False): |
||
205 | """Create an XLSX file at the given path. |
||
206 | |||
207 | :param obj: Item, list of Items, or Document to export |
||
208 | :param path: location to export XLSX file |
||
209 | :param auto: include placeholders for new items on import |
||
210 | |||
211 | :return: path of created file |
||
212 | 1 | ||
213 | 1 | """ |
|
214 | workbook = _get_xlsx(obj, auto) |
||
215 | 1 | workbook.save(path) |
|
216 | |||
217 | return path |
||
218 | 1 | ||
219 | |||
220 | def _get_xlsx(obj, auto): |
||
221 | """Create an XLSX workbook object. |
||
222 | |||
223 | :param obj: Item, list of Items, or Document to export |
||
224 | :param auto: include placeholders for new items on import |
||
225 | |||
226 | :return: new workbook |
||
227 | 1 | ||
228 | 1 | """ |
|
229 | col_widths: Dict[Any, float] = defaultdict(float) |
||
230 | col = 'A' |
||
231 | 1 | ||
232 | 1 | # Create a new workbook |
|
233 | workbook = openpyxl.Workbook() |
||
234 | worksheet = workbook.active |
||
235 | 1 | ||
236 | 1 | # Populate cells |
|
237 | 1 | for row, data in enumerate(_tabulate(obj, auto=auto), start=1): |
|
238 | 1 | for col_idx, value in enumerate(data, start=1): |
|
239 | cell = worksheet.cell(column=col_idx, row=row) |
||
240 | |||
241 | 1 | # wrap text in every cell |
|
242 | alignment = openpyxl.styles.Alignment( |
||
243 | vertical='top', horizontal='left', wrap_text=True |
||
244 | 1 | ) |
|
245 | cell.alignment = alignment |
||
246 | 1 | # and bold header rows |
|
247 | 1 | if row == 1: |
|
248 | 1 | cell.font = openpyxl.styles.Font(bold=True) |
|
249 | |||
250 | # convert incompatible Excel types: |
||
251 | # http://pythonhosted.org/openpyxl/api.html#openpyxl.cell.Cell.value |
||
252 | 1 | if isinstance(value, (int, float, datetime.datetime)): |
|
253 | 1 | cell.value = value |
|
254 | 1 | else: |
|
255 | cell.value = str(value) |
||
256 | |||
257 | 1 | # track cell width |
|
258 | col_widths[col_idx] = max(col_widths[col_idx], _width(str(value))) |
||
259 | |||
260 | 1 | # Add filter up to the last column |
|
261 | col_letter = openpyxl.utils.get_column_letter(len(col_widths)) |
||
262 | worksheet.auto_filter.ref = "A1:%s1" % col_letter |
||
263 | 1 | ||
264 | 1 | # Set column width based on column contents |
|
265 | 1 | for col in col_widths: |
|
266 | if col_widths[col] > XLSX_MAX_WIDTH: |
||
267 | 1 | width = XLSX_MAX_WIDTH |
|
268 | 1 | else: |
|
269 | width = col_widths[col] + XLSX_FILTER_PADDING |
||
270 | col_letter = openpyxl.utils.get_column_letter(col) |
||
271 | 1 | worksheet.column_dimensions[col_letter].width = width |
|
272 | |||
273 | 1 | # Freeze top row |
|
274 | worksheet.freeze_panes = worksheet.cell(row=2, column=1) |
||
275 | |||
276 | 1 | return workbook |
|
277 | |||
278 | 1 | ||
279 | 1 | def _width(text): |
|
280 | """Get the maximum length in a multiline string.""" |
||
281 | 1 | if text: |
|
282 | return max(len(line) for line in text.splitlines()) |
||
283 | else: |
||
284 | return 0 |
||
285 | 1 | ||
286 | |||
287 | 1 | # Mapping from file extension to lines generator |
|
288 | FORMAT_LINES = {'.yml': _lines_yaml} |
||
289 | # Mapping from file extension to file generator |
||
290 | FORMAT_FILE = {'.csv': _file_csv, '.tsv': _file_tsv, '.xlsx': _file_xlsx} |
||
291 | 1 | # Union of format dictionaries |
|
292 | FORMAT = dict(list(FORMAT_LINES.items()) + list(FORMAT_FILE.items())) # type: ignore |
||
293 | |||
294 | 1 | ||
295 | def check(ext, get_lines_gen=False, get_file_func=False): |
||
296 | """Confirm an extension is supported for export. |
||
297 | |||
298 | :param get_lines_func: return a lines generator if available |
||
299 | :param get_file_func: return a file creator if available |
||
300 | |||
301 | :raises: :class:`doorstop.common.DoorstopError` for unknown formats |
||
302 | |||
303 | :return: function requested if available |
||
304 | |||
305 | 1 | """ |
|
306 | 1 | exts = ', '.join(ext for ext in FORMAT) |
|
307 | 1 | lines_exts = ', '.join(ext for ext in FORMAT_LINES) |
|
308 | 1 | file_exts = ', '.join(ext for ext in FORMAT_FILE) |
|
309 | fmt = "unknown {{}} format: {} (options: {{}})".format(ext or None) |
||
310 | 1 | ||
311 | 1 | if get_lines_gen: |
|
312 | 1 | try: |
|
313 | 1 | gen = FORMAT_LINES[ext] |
|
314 | 1 | except KeyError: |
|
315 | 1 | exc = DoorstopError(fmt.format("lines export", lines_exts)) |
|
316 | raise exc from None |
||
317 | 1 | else: |
|
318 | 1 | log.debug("found lines generator for: {}".format(ext)) |
|
319 | return gen |
||
320 | 1 | ||
321 | 1 | if get_file_func: |
|
322 | 1 | try: |
|
323 | 1 | func = FORMAT_FILE[ext] |
|
324 | 1 | except KeyError: |
|
325 | 1 | exc = DoorstopError(fmt.format("file export", file_exts)) |
|
326 | raise exc from None |
||
327 | 1 | else: |
|
328 | 1 | log.debug("found file creator for: {}".format(ext)) |
|
329 | return func |
||
330 | 1 | ||
331 | 1 | if ext not in FORMAT: |
|
332 | 1 | exc = DoorstopError(fmt.format("export", exts)) |
|
333 | raise exc |
||
334 | |||
335 | return None |
||
336 |