core.flush_stderr()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
import ast
2
from collections import defaultdict
3
import os
4
from pathlib import Path
5
import re
6
import shlex
7
import time
8
from unittest.mock import patch
9
10
from behave import given
11
from behave import then
12
from behave import when
13
import keyring
14
import toml
15
import yaml
16
17
from jrnl import Journal
18
from jrnl import __version__
19
from jrnl import install
20
from jrnl import plugins
21
from jrnl.cli import cli
22
from jrnl.config import load_config
23
from jrnl.os_compat import on_windows
24
25
try:
26
    import parsedatetime.parsedatetime_consts as pdt
27
except ImportError:
28
    import parsedatetime as pdt
29
30
consts = pdt.Constants(usePyICU=False)
31
consts.DOWParseStyle = -1  # Prefers past weekdays
32
CALENDAR = pdt.Calendar(consts)
33
34
35
class TestKeyring(keyring.backend.KeyringBackend):
36
    """A test keyring that just stores its values in a hash"""
37
38
    priority = 1
39
    keys = defaultdict(dict)
40
41
    def set_password(self, servicename, username, password):
42
        self.keys[servicename][username] = password
43
44
    def get_password(self, servicename, username):
45
        return self.keys[servicename].get(username)
46
47
    def delete_password(self, servicename, username):
48
        self.keys[servicename][username] = None
49
50
51
class NoKeyring(keyring.backend.KeyringBackend):
52
    """A keyring that simulated an environment with no keyring backend."""
53
54
    priority = 2
55
    keys = defaultdict(dict)
56
57
    def set_password(self, servicename, username, password):
58
        raise keyring.errors.NoKeyringError
59
60
    def get_password(self, servicename, username):
61
        raise keyring.errors.NoKeyringError
62
63
    def delete_password(self, servicename, username):
64
        raise keyring.errors.NoKeyringError
65
66
67
class FailedKeyring(keyring.backend.KeyringBackend):
68
    """
69
    A keyring that simulates an environment with a keyring that has passwords, but fails
70
    to return them.
71
    """
72
73
    priority = 2
74
    keys = defaultdict(dict)
75
76
    def set_password(self, servicename, username, password):
77
        self.keys[servicename][username] = password
78
79
    def get_password(self, servicename, username):
80
        raise keyring.errors.NoKeyringError
81
82
    def delete_password(self, servicename, username):
83
        self.keys[servicename][username] = None
84
85
86
# set a default keyring
87
keyring.set_keyring(TestKeyring())
88
89
90
def ushlex(command):
91
    return shlex.split(command, posix=not on_windows)
92
93
94
def read_journal(journal_name="default"):
95
    config = load_config(install.CONFIG_FILE_PATH)
96
    with open(config["journals"][journal_name]) as journal_file:
97
        journal = journal_file.read()
98
    return journal
99
100
101
def open_journal(journal_name="default"):
102
    config = load_config(install.CONFIG_FILE_PATH)
103
    journal_conf = config["journals"][journal_name]
104
105
    # We can override the default config on a by-journal basis
106
    if type(journal_conf) is dict:
107
        config.update(journal_conf)
108
    # But also just give them a string to point to the journal file
109
    else:
110
        config["journal"] = journal_conf
111
112
    return Journal.open_journal(journal_name, config)
113
114
115
def read_value_from_string(string):
116
    if string[0] == "{":
117
        # Handle value being a dictionary
118
        return ast.literal_eval(string)
119
120
    # Takes strings like "bool:true" or "int:32" and coerces them into proper type
121
    t, value = string.split(":")
122
    value = {"bool": lambda v: v.lower() == "true", "int": int, "str": str}[t](value)
123
    return value
124
125
126
@given('we use the config "{config_file}"')
127
def set_config(context, config_file):
128
    full_path = os.path.join("features/configs", config_file)
129
130
    install.CONFIG_FILE_PATH = os.path.abspath(full_path)
131
    if config_file.endswith("yaml") and os.path.exists(full_path):
132
        # Add jrnl version to file for 2.x journals
133
        with open(install.CONFIG_FILE_PATH, "a") as cf:
134
            cf.write("version: {}".format(__version__))
135
136
137
@given('we use the password "{password}" if prompted')
138
def use_password_forever(context, password):
139
    context.password = password
140
141
142
@given('we use the password "{password}" {num:d} times if prompted')
143
def use_password(context, password, num=1):
144
    context.password = iter([password] * num)
145
146
147
@given("we have a keyring")
148
def set_keyring(context):
149
    keyring.set_keyring(TestKeyring())
150
151
152
@given("we do not have a keyring")
153
def disable_keyring(context):
154
    keyring.core.set_keyring(NoKeyring())
155
156
157
@when('we change directory to "{path}"')
158
def move_up_dir(context, path):
159
    os.chdir(path)
160
161
162
@when("we open the editor and {method}")
163
@when('we open the editor and {method} "{text}"')
164
@when("we open the editor and {method} nothing")
165
@when("we open the editor and {method} nothing")
166
def open_editor_and_enter(context, method, text=""):
167
    text = text or context.text or ""
168
169
    if method == "enter":
170
        file_method = "w+"
171
    elif method == "append":
172
        file_method = "a"
173
    else:
174
        file_method = "r+"
175
176
    def _mock_editor(command):
177
        context.editor_command = command
178
        tmpfile = command[-1]
179
        with open(tmpfile, file_method) as f:
180
            f.write(text)
181
182
        return tmpfile
183
184
    if "password" in context:
185
        password = context.password
186
    else:
187
        password = ""
188
189
    # fmt: off
190
    # see: https://github.com/psf/black/issues/664
191
    with \
192
        patch("subprocess.call", side_effect=_mock_editor) as mock_editor, \
193
        patch("getpass.getpass", side_effect=_mock_getpass(password)) as mock_getpass, \
194
        patch("sys.stdin.isatty", return_value=True) \
195
    :
196
        context.editor = mock_editor
197
        context.getpass = mock_getpass
198
        cli(["--edit"])
199
    # fmt: on
200
201
202
@then("the editor should have been called")
203
@then("the editor should have been called with {num} arguments")
204
def count_editor_args(context, num=None):
205
    assert context.editor.called
206
207
    if isinstance(num, int):
208
        assert len(context.editor_command) == int(num)
209
210
211
@then("the editor should not have been called")
212
def no_editor_called(context, num=None):
213
    assert "editor" not in context or not context.editor.called
214
215
216
@then('one editor argument should be "{arg}"')
217
def contains_editor_arg(context, arg):
218
    args = context.editor_command
219
    assert (
220
        arg in args and args.count(arg) == 1
221
    ), f"\narg not in args exactly 1 time:\n{arg}\n{str(args)}"
222
223
224
@then('one editor argument should match "{regex}"')
225
def matches_editor_arg(context, regex):
226
    args = context.editor_command
227
    matches = list(filter(lambda x: re.search(regex, x), args))
228
    assert (
229
        len(matches) == 1
230
    ), f"\nRegex didn't match exactly 1 time:\n{regex}\n{str(args)}"
231
232
233
@then("the editor file content should {method}")
234
@then("the editor file content should {method} empty")
235
@then('the editor file content should {method} "{text}"')
236
def contains_editor_file(context, method, text=""):
237
    text = text or context.text or ""
238
    content = context.editor_file.get("content")
239
    format = f'\n"""\n{content}\n"""\n'
240
    if method == "be":
241
        assert content == text, format
242
    elif method == "contain":
243
        assert text in content, format
244
    else:
245
        assert False, f"Method '{method}' not supported"
246
247
248
def _mock_getpass(inputs):
249
    def prompt_return(prompt=""):
250
        if type(inputs) == str:
251
            return inputs
252
        try:
253
            return next(inputs)
254
        except StopIteration:
255
            raise KeyboardInterrupt
256
257
    return prompt_return
258
259
260
def _mock_input(inputs):
261
    def prompt_return(prompt=""):
262
        try:
263
            val = next(inputs)
264
            print(prompt, val)
265
            return val
266
        except StopIteration:
267
            raise KeyboardInterrupt
268
269
    return prompt_return
270
271
272
@when('we run "{command}" and enter')
273
@when('we run "{command}" and enter nothing')
274
@when('we run "{command}" and enter "{inputs}"')
275
def run_with_input(context, command, inputs=""):
276
    # create an iterator through all inputs. These inputs will be fed one by one
277
    # to the mocked calls for 'input()', 'util.getpass()' and 'sys.stdin.read()'
278
    if context.text:
279
        text = iter(context.text.split("\n"))
280
    else:
281
        text = iter([inputs])
282
283
    args = ushlex(command)[1:]
284
285
    def _mock_editor(command):
286
        context.editor_command = command
287
        tmpfile = command[-1]
288
        with open(tmpfile, "r") as editor_file:
289
            file_content = editor_file.read()
290
        context.editor_file = {"name": tmpfile, "content": file_content}
291
        Path(tmpfile).touch()
292
293
    if "password" in context:
294
        password = context.password
295
    else:
296
        password = text
297
298
    # fmt: off
299
    # see: https://github.com/psf/black/issues/664
300
    with \
301
        patch("builtins.input", side_effect=_mock_input(text)) as mock_input, \
302
        patch("getpass.getpass", side_effect=_mock_getpass(password)) as mock_getpass, \
303
        patch("sys.stdin.read", side_effect=text) as mock_read, \
304
        patch("subprocess.call", side_effect=_mock_editor) as mock_editor \
305
    :
306
        try:
307
            cli(args or [])
308
            context.exit_status = 0
309
        except SystemExit as e:
310
            context.exit_status = e.code
311
312
        # put mocks into context so they can be checked later in "then" statements
313
        context.editor = mock_editor
314
        context.input = mock_input
315
        context.getpass = mock_getpass
316
        context.read = mock_read
317
        context.iter_text = text
318
319
        context.execute_steps('''
320
            Then all input was used
321
            And at least one input method was called
322
        ''')
323
324
    # fmt: on
325
326
327
@then("at least one input method was called")
328
def inputs_were_called(context):
329
    assert (
330
        context.input.called
331
        or context.getpass.called
332
        or context.read.called
333
        or context.editor.called
334
    )
335
336
337
@then("we should be prompted for a password")
338
def password_was_called(context):
339
    assert context.getpass.called
340
341
342
@then("we should not be prompted for a password")
343
def password_was_not_called(context):
344
    assert not context.getpass.called
345
346
347
@then("all input was used")
348
def all_input_was_used(context):
349
    # all inputs were used (ignore if empty string)
350
    for temp in context.iter_text:
351
        assert "" == temp, "Not all inputs were consumed"
352
353
354
@when('we run "{command}"')
355
@when('we run "{command}" and pipe')
356
@when('we run "{command}" and pipe "{text}"')
357
def run(context, command, text=""):
358
    text = text or context.text or ""
359
360
    if "cache_dir" in context and context.cache_dir is not None:
361
        cache_dir = os.path.join("features", "cache", context.cache_dir)
362
        command = command.format(cache_dir=cache_dir)
363
364
    args = ushlex(command)
365
366
    def _mock_editor(command):
367
        context.editor_command = command
368
        tmpfile = command[-1]
369
        with open(tmpfile, "r") as editor_file:
370
            file_content = editor_file.read()
371
        context.editor_file = {"name": tmpfile, "content": file_content}
372
        Path(tmpfile).touch()
373
374
    if "password" in context:
375
        password = context.password
376
    else:
377
        password = iter(text)
378
379
    try:
380
        # fmt: off
381
        # see: https://github.com/psf/black/issues/664
382
        with \
383
            patch("sys.argv", args), \
384
            patch("getpass.getpass", side_effect=_mock_getpass(password)) as mock_getpass, \
385
            patch("subprocess.call", side_effect=_mock_editor) as mock_editor, \
386
            patch("sys.stdin.read", side_effect=lambda: text) \
387
        :
388
            context.editor = mock_editor
389
            context.getpass = mock_getpass
390
            cli(args[1:])
391
            context.exit_status = 0
392
        # fmt: on
393
    except SystemExit as e:
394
        context.exit_status = e.code
395
396
397
@given('we load template "{filename}"')
398
def load_template(context, filename):
399
    full_path = os.path.join("features/data/templates", filename)
400
    exporter = plugins.template_exporter.__exporter_from_file(full_path)
401
    plugins.__exporter_types[exporter.names[0]] = exporter
402
403
404
@when('we set the keyring password of "{journal}" to "{password}"')
405
def set_keyring_password(context, journal, password):
406
    keyring.set_password("jrnl", journal, password)
407
408
409
@then("we should get an error")
410
def has_error(context):
411
    assert context.exit_status != 0, context.exit_status
412
413
414
@then("we should get no error")
415
def no_error(context):
416
    assert context.exit_status == 0, context.exit_status
417
418
419
@then("we flush the output")
420
def flush_stdout(context):
421
    context.stdout_capture.truncate(0)
422
    context.stdout_capture.seek(0)
423
424
425
@then("we flush the error output")
426
def flush_stderr(context):
427
    context.stderr_capture.truncate(0)
428
    context.stderr_capture.seek(0)
429
430
431
@then("we flush all the output")
432
def flush_all_output(context):
433
    context.execute_steps(
434
        """
435
        Then we flush the output
436
        Then we flush the error output
437
    """
438
    )
439
440
441
@then("the output should be")
442
@then("the output should be empty")
443
@then('the output should be "{text}"')
444
def check_output(context, text=None):
445
    text = (text or context.text or "").strip().splitlines()
446
    out = context.stdout_capture.getvalue().strip().splitlines()
447
    assert len(text) == len(out), "Output has {} lines (expected: {})".format(
448
        len(out), len(text)
449
    )
450
    for line_text, line_out in zip(text, out):
451
        assert line_text.strip() == line_out.strip(), [
452
            line_text.strip(),
453
            line_out.strip(),
454
        ]
455
456
457
@then('the output should contain "{text}" in the local time')
458
def check_output_time_inline(context, text):
459
    out = context.stdout_capture.getvalue()
460
    date, flag = CALENDAR.parse(text)
461
    output_date = time.strftime("%Y-%m-%d %H:%M", date)
462
    assert output_date in out, output_date
463
464
465
@then("the output should contain pyproject.toml version")
466
def check_output_version_inline(context):
467
    out = context.stdout_capture.getvalue()
468
    pyproject = (Path(__file__) / ".." / ".." / ".." / "pyproject.toml").resolve()
469
    pyproject_contents = toml.load(pyproject)
470
    pyproject_version = pyproject_contents["tool"]["poetry"]["version"]
471
    assert pyproject_version in out, pyproject_version
472
473
474
@then("the output should contain")
475
@then('the output should contain "{text}"')
476
@then('the output should contain "{text}" or "{text2}"')
477
def check_output_inline(context, text=None, text2=None):
478
    text = text or context.text
479
    out = context.stdout_capture.getvalue()
480
    assert (text and text in out) or (text2 and text2 in out)
481
482
483
@then("the error output should contain")
484
@then('the error output should contain "{text}"')
485
@then('the error output should contain "{text}" or "{text2}"')
486
def check_error_output_inline(context, text=None, text2=None):
487
    text = text or context.text
488
    out = context.stderr_capture.getvalue()
489
    assert (text and text in out) or (text2 and text2 in out)
490
491
492
@then('the output should match "{regex}"')
493
@then('the output should match "{regex}" {num} times')
494
def matches_std_output(context, regex, num=1):
495
    out = context.stdout_capture.getvalue()
496
    matches = re.findall(regex, out)
497
    assert (
498
        matches and len(matches) == num
499
    ), f"\nRegex didn't match exactly {num} time(s):\n{regex}\n{str(out)}\n{str(matches)}"
500
501
502
@then('the error output should match "{regex}"')
503
@then('the error output should match "{regex}" {num} times')
504
def matches_err_ouput(context, regex, num=1):
505
    out = context.stderr_capture.getvalue()
506
    matches = re.findall(regex, out)
507
    assert (
508
        matches and len(matches) == num
509
    ), f"\nRegex didn't match exactly {num} time(s):\n{regex}\n{str(out)}\n{str(matches)}"
510
511
512
@then('the output should not contain "{text}"')
513
def check_output_not_inline(context, text):
514
    out = context.stdout_capture.getvalue()
515
    assert text not in out
516
517
518
@then('we should see the message "{text}"')
519
@then('the error output should be "{text}"')
520
def check_message(context, text):
521
    out = context.stderr_capture.getvalue()
522
    assert text in out, [text, out]
523
524
525
@then('we should not see the message "{text}"')
526
def check_not_message(context, text):
527
    out = context.stderr_capture.getvalue()
528
    assert text not in out, [text, out]
529
530
531
@then('the journal should contain "{text}"')
532
@then('journal "{journal_name}" should contain "{text}"')
533
def check_journal_content(context, text, journal_name="default"):
534
    journal = read_journal(journal_name)
535
    assert text in journal, journal
536
537
538
@then('the journal should not contain "{text}"')
539
@then('journal "{journal_name}" should not contain "{text}"')
540
def check_not_journal_content(context, text, journal_name="default"):
541
    journal = read_journal(journal_name)
542
    assert text not in journal, journal
543
544
545
@then("the journal should not exist")
546
@then('journal "{journal_name}" should not exist')
547
def journal_doesnt_exist(context, journal_name="default"):
548
    config = load_config(install.CONFIG_FILE_PATH)
549
550
    journal_path = config["journals"][journal_name]
551
    assert not os.path.exists(journal_path)
552
553
554
@then("the journal should exist")
555
@then('journal "{journal_name}" should exist')
556
def journal_exists(context, journal_name="default"):
557
    config = load_config(install.CONFIG_FILE_PATH)
558
559
    journal_path = config["journals"][journal_name]
560
    assert os.path.exists(journal_path)
561
562
563
@then('the config should have "{key}" set to')
564
@then('the config should have "{key}" set to "{value}"')
565
@then('the config for journal "{journal}" should have "{key}" set to "{value}"')
566
def config_var(context, key, value="", journal=None):
567
    value = read_value_from_string(value or context.text or "")
568
    config = load_config(install.CONFIG_FILE_PATH)
569
570
    if journal:
571
        config = config["journals"][journal]
572
573
    assert key in config
574
    assert config[key] == value
575
576
577
@then('the config for journal "{journal}" should not have "{key}" set')
578
def config_no_var(context, key, value="", journal=None):
579
    config = load_config(install.CONFIG_FILE_PATH)
580
581
    if journal:
582
        config = config["journals"][journal]
583
584
    assert key not in config
585
586
587
@then("the journal should have {number:d} entries")
588
@then("the journal should have {number:d} entry")
589
@then('journal "{journal_name}" should have {number:d} entries')
590
@then('journal "{journal_name}" should have {number:d} entry')
591
def check_journal_entries(context, number, journal_name="default"):
592
    journal = open_journal(journal_name)
593
    assert len(journal.entries) == number
594
595
596
@when("the journal directory is listed")
597
def list_journal_directory(context, journal="default"):
598
    with open(install.CONFIG_FILE_PATH) as config_file:
599
        config = yaml.load(config_file, Loader=yaml.FullLoader)
600
    journal_path = config["journals"][journal]
601
    for root, dirnames, f in os.walk(journal_path):
602
        for file in f:
603
            print(os.path.join(root, file))
604
605
606
@then("fail")
607
def debug_fail(context):
608
    assert False
609