Passed
Pull Request — 2.x (#1933)
by Ramon
05:34
created

senaite.core.scripts   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 88
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 11
eloc 65
dl 0
loc 88
rs 10
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
B run_it() 0 43 6
A reindex() 0 2 1
A resolve_module() 0 7 2
A upgrade_sites() 0 2 1
A zope_passwd() 0 2 1
1
# -*- coding: utf-8 -*-
2
3
import argparse
4
import logging
5
import os
6
import time
7
import types
8
9
from senaite.core import logger
10
11
parser = argparse.ArgumentParser(
12
    description="Run a SENAITE script")
13
parser.add_argument("-s", "--site-id", dest="site_id", default=None,
14
                    help="ID of the SENAITE instance")
15
parser.add_argument("-c", "--config", dest="zope_conf",
16
                    help="Path to ZOPE configuration file")
17
parser.add_argument("-v", "--verbose", dest="verbose",
18
                    action="store_true",
19
                    help="Verbose logging")
20
21
this_dir = os.path.dirname(os.path.realpath(__file__))
22
23
24
def resolve_module(module):
25
    """Resolve module
26
    """
27
    if isinstance(module, types.ModuleType):
28
        return module
29
    from zope.dottedname.resolve import resolve
30
    return resolve("senaite.core.scripts." + module)
31
32
33
def run_it(module):
34
    module = resolve_module(module)
35
    args, _ = parser.parse_known_args()
36
    cwd = os.getcwd()
37
    conf_path = None
38
    lookup_paths = [
39
        os.path.join(cwd, "parts/client_reserved/etc/zope.conf"),
40
        os.path.join(cwd, "parts/client1/etc/zope.conf"),
41
        os.path.join(cwd, "parts/instance/etc/zope.conf"),
42
    ]
43
    if args.zope_conf:
44
        lookup_paths.insert(0, args.zope_conf)
45
    for path in lookup_paths:
46
        if os.path.exists(path):
47
            conf_path = path
48
            break
49
    if conf_path is None:
50
        raise Exception("Could not find zope.conf in {}".format(lookup_paths))
51
52
    from Zope2 import configure
53
    configure(conf_path)
54
    import Zope2
55
    app = Zope2.app()
56
    from Testing.ZopeTestCase.utils import makerequest
57
    app = makerequest(app)
58
    app.REQUEST["PARENTS"] = [app]
59
    from zope.globalrequest import setRequest
60
    setRequest(app.REQUEST)
61
    from AccessControl.SpecialUsers import system as user
62
    from AccessControl.SecurityManagement import newSecurityManager
63
    newSecurityManager(None, user)
64
65
    # verbose logging
66
    if args.verbose:
67
        logger.setLevel(logging.DEBUG)
68
    else:
69
        logger.setLevel(logging.INFO)
70
    logger.addHandler(logging.StreamHandler())
71
72
    start = time.time()
73
    module.run(app)
74
    end = time.time()
75
    logger.info("Script execution took: %.2f seconds" % float(end-start))
76
77
78
def zope_passwd():
79
    return run_it("_zope_passwd")
80
81
82
def upgrade_sites():
83
    return run_it("_upgrade_sites")
84
85
86
def reindex():
87
    return run_it("_reindex")
88