|
1
|
|
|
import sys |
|
2
|
|
|
import os |
|
3
|
|
|
import json |
|
4
|
|
|
import argparse |
|
5
|
|
|
import getpass |
|
6
|
|
|
import logging |
|
7
|
|
|
from dotenv import load_dotenv |
|
8
|
|
|
from LORIS_query import number_extraction, getCNBP, putCNBP, login |
|
9
|
|
|
from LORIS_candidates import check_DCCID, checkDCCIDExist |
|
10
|
|
|
|
|
11
|
|
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
|
12
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
def visit_number_extraction(string): |
|
15
|
|
|
""" |
|
16
|
|
|
A wrapper for number_extraction by calling it on a string and then return the latest one. |
|
17
|
|
|
Used to deal with visitnumber list. |
|
18
|
|
|
:param string: |
|
19
|
|
|
:return: |
|
20
|
|
|
""" |
|
21
|
|
|
logger = logging.getLogger('visit_number_extraction') |
|
22
|
|
|
|
|
23
|
|
|
number_extracted = number_extraction(string) |
|
24
|
|
|
|
|
25
|
|
|
#return last number from the timepoint string: usually it should be V2 or T3 things like that. |
|
26
|
|
|
if len(number_extracted) > 1: |
|
27
|
|
|
return number_extracted[len(number_extracted)-1] |
|
28
|
|
|
else: |
|
29
|
|
|
return number_extracted[0] |
|
30
|
|
|
|
|
31
|
|
|
|
|
32
|
|
|
def findLatestTimePoint(token, DCCID): |
|
33
|
|
|
''' |
|
34
|
|
|
Find and return the latest timepoint. Note that since DCCID exist, the record MUST ALREADY exist within the local SQLite database! |
|
35
|
|
|
:param DCCID: |
|
36
|
|
|
:return: |
|
37
|
|
|
''' |
|
38
|
|
|
logger = logging.getLogger('') |
|
39
|
|
|
assert(check_DCCID(DCCID)) # Ensure it actually first exist. |
|
40
|
|
|
|
|
41
|
|
|
response_success, candidate_json = getCNBP(token, r"candidates/" + str(DCCID)) # should exist as earlier check revealed. |
|
42
|
|
|
|
|
43
|
|
|
# preliminary exit condition |
|
44
|
|
|
if not response_success: |
|
45
|
|
|
return response_success, None |
|
46
|
|
|
|
|
47
|
|
|
candidate_visits_list = candidate_json.get("Visits") |
|
48
|
|
|
|
|
49
|
|
|
if len(candidate_visits_list) > 0: |
|
50
|
|
|
return candidate_visits_list[len(candidate_visits_list)-1] # return the LAST TIME POINT! |
|
51
|
|
|
return None |
|
52
|
|
|
|
|
53
|
|
|
def increaseTimepoint(token, DCCID): |
|
54
|
|
|
""" |
|
55
|
|
|
Increment the existing timepoint by check if DCCID existed, then get the latest one, then increment its number by creating a new timepoint. |
|
56
|
|
|
:param token: auth token |
|
57
|
|
|
:param DCCID: the DCCID of the existing subject. |
|
58
|
|
|
:return: if creation request is successful, and what label is actually created. |
|
59
|
|
|
""" |
|
60
|
|
|
#todo: must handle the special edge case where timepoint reach V10 etc where two or three more digits are there! |
|
61
|
|
|
|
|
62
|
|
|
# ensure valid input and subject actually exist. |
|
63
|
|
|
assert (check_DCCID(DCCID)) |
|
64
|
|
|
success, subject_exist = checkDCCIDExist(token, DCCID) |
|
65
|
|
|
if not subject_exist or not success: |
|
66
|
|
|
return False, None |
|
67
|
|
|
|
|
68
|
|
|
latest_timepoint = findLatestTimePoint(token, DCCID) |
|
69
|
|
|
|
|
70
|
|
|
if latest_timepoint is None: |
|
71
|
|
|
success = createTimepoint(token, DCCID, "V1") |
|
72
|
|
|
else: |
|
73
|
|
|
visit_number = visit_number_extraction(latest_timepoint) |
|
74
|
|
|
new_visit_number = int(visit_number) + 1 |
|
75
|
|
|
|
|
76
|
|
|
load_dotenv() |
|
77
|
|
|
prefix = os.getenv("timepoint_prefix") |
|
78
|
|
|
|
|
79
|
|
|
timepoint_label = prefix + str(new_visit_number) |
|
80
|
|
|
|
|
81
|
|
|
success = createTimepoint(token, DCCID, timepoint_label) |
|
82
|
|
|
|
|
83
|
|
|
return success, timepoint_label |
|
|
|
|
|
|
84
|
|
|
|
|
85
|
|
|
|
|
86
|
|
|
def createTimepoint(token, DCCID, time_point): |
|
87
|
|
|
""" |
|
88
|
|
|
Create a timepoint of the given DCCID subject based on the timepoint provided. |
|
89
|
|
|
:param token: |
|
90
|
|
|
:param DCCID: |
|
91
|
|
|
:param time_point: |
|
92
|
|
|
:return: |
|
93
|
|
|
""" |
|
94
|
|
|
endpoint = r"/candidates/"+str(DCCID)+r"/"+time_point |
|
95
|
|
|
MetaData = {"CandID": DCCID, "Visit": time_point, "Battery":"CNBP"} # default to CNBP for NOW |
|
96
|
|
|
Meta = {"Meta": MetaData} |
|
97
|
|
|
JSON = json.dumps(Meta) |
|
98
|
|
|
success, response = putCNBP(token, endpoint, JSON) |
|
99
|
|
|
# response should be null! |
|
100
|
|
|
return success |
|
101
|
|
|
|
|
102
|
|
|
|
|
103
|
|
|
# Only executed when running directly. |
|
104
|
|
|
if __name__ == '__main__': |
|
105
|
|
|
# print(login()) |
|
106
|
|
|
# getCNBP("projects") |
|
107
|
|
|
# assert(checkPSCIDExist("CNBP0020002")) |
|
108
|
|
|
Success, token = login() |
|
109
|
|
|
# createTimePoint(token, 559776, "V9") |
|
110
|
|
|
success, latest_timepoint = increaseTimepoint(token, 635425) |
|
111
|
|
|
print (success) |
|
112
|
|
|
print (latest_timepoint) |
|
113
|
|
|
# print("Test complete") |