Passed
Push — main ( 3a65c6...49fe68 )
by Jason
04:37
created

ColleXions.main()   A

Complexity

Conditions 4

Size

Total Lines 38
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 38
rs 9.16
c 0
b 0
f 0
cc 4
nop 0
1
# (No changes to import section)
2
import random
3
import logging
4
import time
5
import json
6
import os
7
import sys
8
import re
9
import requests
10
from plexapi.server import PlexServer
11
from datetime import datetime, timedelta
12
13
CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'config.json')
14
LOG_DIR = 'logs'
15
LOG_FILE = os.path.join(LOG_DIR, 'collexions.log')
16
17
if not os.path.exists(LOG_DIR):
18
    os.makedirs(LOG_DIR)
19
20
logging.basicConfig(
21
    level=logging.INFO,
22
    format='%(asctime)s - %(levelname)s - %(message)s',
23
    handlers=[
24
        logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8'),
25
        logging.StreamHandler(sys.stdout)
26
    ]
27
)
28
29
SELECTED_COLLECTIONS_FILE = 'selected_collections.json'
30
31
32
def load_selected_collections():
33
    if os.path.exists(SELECTED_COLLECTIONS_FILE):
34
        with open(SELECTED_COLLECTIONS_FILE, 'r', encoding='utf-8') as f:
35
            return json.load(f)
36
    return {}
37
38
39
def save_selected_collections(selected_collections):
40
    with open(SELECTED_COLLECTIONS_FILE, 'w', encoding='utf-8') as f:
41
        json.dump(selected_collections, f, ensure_ascii=False, indent=4)
42
43
44
def get_recently_pinned_collections(selected_collections, config):
45
    cutoff_time = datetime.now() - timedelta(hours=config.get('repeat_block_hours', 12))
46
    recent_titles = set()
47
48
    for timestamp_str, titles in selected_collections.items():
49
        try:
50
            timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
51
        except ValueError:
52
            try:
53
                timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d')
54
            except ValueError:
55
                logging.warning(f"Unrecognized date format in selected_collections: {timestamp_str}")
56
                continue
57
        if timestamp >= cutoff_time:
58
            recent_titles.update(titles)
59
60
    return recent_titles
61
62
63
def is_regex_excluded(title, patterns):
64
    for pattern in patterns:
65
        if re.search(pattern, title, re.IGNORECASE):
66
            logging.info(f"Excluded by regex: '{title}' matched pattern '{pattern}'")
67
            return True
68
    return False
69
70
71
def load_config():
72
    if os.path.exists(CONFIG_PATH):
73
        with open(CONFIG_PATH, 'r') as file:
74
            return json.load(file)
75
    logging.warning("Config file not found.")
76
    return {}
77
78
79
def connect_to_plex(config):
80
    logging.info("Connecting to Plex server...")
81
    plex = PlexServer(config['plex_url'], config['plex_token'])
82
    logging.info("Connected to Plex server successfully.")
83
    return plex
84
85
86
def get_collections_from_all_libraries(plex, library_names):
87
    all_collections = []
88
    for library_name in library_names:
89
        library = plex.library.section(library_name)
90
        all_collections.extend(library.collections())
91
    return all_collections
92
93
94
def pin_collections(collections, config):
95
    for collection in collections:
96
        try:
97
            logging.info(f"Attempting to pin collection: {collection.title}")
98
            hub = collection.visibility()
99
            hub.promoteHome()
100
            hub.promoteShared()
101
            message = f"INFO - Collection '**{collection.title}**' pinned successfully."
102
            logging.info(message)
103
            if config.get('discord_webhook_url'):
104
                send_discord_message(config['discord_webhook_url'], message)
105
        except Exception as e:
106
            logging.error(f"Error while pinning collection: {collection.title}. Error: {str(e)}")
107
108
109
def send_discord_message(webhook_url, message):
110
    data = {"content": message}
111
    response = requests.post(webhook_url, json=data)
112
    if response.status_code == 204:
113
        logging.info(f"Message sent to Discord: {message}")
114
    else:
115
        logging.error(f"Failed to send message to Discord. Status code: {response.status_code}, Response: {response.text}")
116
117
118
def unpin_collections(plex, library_names, exclusion_list):
119
    logging.info("Unpinning currently pinned collections...")
120
    for library_name in library_names:
121
        for collection in plex.library.section(library_name).collections():
122
            if collection.title in exclusion_list:
123
                logging.info(f"Skipping unpinning for collection: {collection.title} (in exclusion list)")
124
                continue
125
            hub = collection.visibility()
126
            if hub._promoted:
127
                hub.demoteHome()
128
                hub.demoteShared()
129
                logging.info(f"Collection '{collection.title}' unpinned successfully.")
130
131
132
def get_active_special_collections(config):
133
    current_date = datetime.now().date()
134
    active_special_collections = []
135
136
    for special in config.get('special_collections', []):
137
        start_date = datetime.strptime(special['start_date'], '%m-%d').replace(year=current_date.year).date()
138
        end_date = datetime.strptime(special['end_date'], '%m-%d').replace(year=current_date.year).date()
139
        end_date_exclusive = end_date + timedelta(days=1)
140
141
        if start_date > end_date:
142
            if (start_date <= current_date <= datetime(current_date.year, 12, 31).date()) or \
143
               (datetime(current_date.year + 1, 1, 1).date() <= current_date < end_date_exclusive):
144
                active_special_collections.extend(special['collection_names'])
145
        else:
146
            if start_date <= current_date < end_date_exclusive:
147
                active_special_collections.extend(special['collection_names'])
148
149
    return active_special_collections
150
151
152
def get_fully_excluded_collections(config, active_special_collections):
153
    exclusion_set = set(config.get('exclusion_list', []))
154
    all_special_collections = set(
155
        col for special in config.get('special_collections', []) for col in special['collection_names']
156
    )
157
    return exclusion_set.union(all_special_collections - set(active_special_collections))
158
159
160
def select_from_categories(categories_config, all_collections, exclusion_set, remaining_slots, regex_patterns):
161
    collections_to_pin = []
162
    always_call = categories_config.pop('always_call', True)
163
    for category, collection_names in categories_config.items():
164
        category_collections = [
165
            c for c in all_collections
166
            if c.title in collection_names
167
            and c.title not in exclusion_set
168
            and not is_regex_excluded(c.title, regex_patterns)
169
        ]
170
        if category_collections and remaining_slots > 0:
171
            if always_call or random.choice([True, False]):
172
                selected_collection = random.choice(category_collections)
173
                collections_to_pin.append(selected_collection)
174
                logging.info(f"Added '{selected_collection.title}' from category '{category}' to pinning list")
175
                remaining_slots -= 1
176
    return collections_to_pin, remaining_slots
177
178
179
def fill_with_random_collections(random_collections, remaining_slots):
180
    collections_to_pin = []
181
    random_collections = random_collections[:]
182
    while remaining_slots > 0 and random_collections:
183
        selected_collection = random.choice(random_collections)
184
        collections_to_pin.append(selected_collection)
185
        logging.info(f"Added random collection '{selected_collection.title}' to pinning list")
186
        remaining_slots -= 1
187
        random_collections.remove(selected_collection)
188
    return collections_to_pin
189
190
191
def filter_collections(config, all_collections, active_special_collections, collection_limit, library_name, selected_collections):
192
    fully_excluded_collections = get_fully_excluded_collections(config, active_special_collections)
193
    recently_pinned = get_recently_pinned_collections(selected_collections, config)
194
    regex_patterns = config.get('regex_exclusion_patterns', [])
195
196
    total_exclusion_set = fully_excluded_collections.union(recently_pinned)
197
198
    collections_to_pin = []
199
200
    # Step 1: Special collections
201
    collections_to_pin.extend([
202
        c for c in all_collections
203
        if c.title in active_special_collections
204
        and c.title not in total_exclusion_set
205
        and not is_regex_excluded(c.title, regex_patterns)
206
    ])
207
    remaining_slots = collection_limit - len(collections_to_pin)
208
209
    # Step 2: Categories
210
    if remaining_slots > 0:
211
        categories_config = config.get('categories', {}).get(library_name, {})
212
        category_pins, remaining_slots = select_from_categories(categories_config, all_collections, total_exclusion_set, remaining_slots, regex_patterns)
213
        collections_to_pin.extend(category_pins)
214
215
    # Step 3: Random fill
216
    random_collections = [
217
        c for c in all_collections
218
        if c.title not in total_exclusion_set
219
        and c.title not in [c.title for c in collections_to_pin]
220
        and not is_regex_excluded(c.title, regex_patterns)
221
    ]
222
    collections_to_pin.extend(fill_with_random_collections(random_collections, remaining_slots))
223
224
    return collections_to_pin
225
226
227
def main():
228
    while True:
229
        config = load_config()
230
        plex = connect_to_plex(config)
231
        exclusion_list = config.get('exclusion_list', [])
232
        library_names = config.get('library_names', ['Movies', 'TV Shows'])
233
        pinning_interval_seconds = config['pinning_interval'] * 60
234
235
        selected_collections = load_selected_collections()
236
        current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
237
        selected_collections[current_timestamp] = []
238
239
        for library_name in library_names:
240
            collections_to_pin_for_library = config['number_of_collections_to_pin'].get(library_name, 0)
241
            logging.info(f"Processing library: {library_name} with {collections_to_pin_for_library} collections to pin.")
242
243
            unpin_collections(plex, [library_name], exclusion_list)
244
            active_special_collections = get_active_special_collections(config)
245
            all_collections = get_collections_from_all_libraries(plex, [library_name])
246
247
            collections_to_pin = filter_collections(
248
                config,
249
                all_collections,
250
                active_special_collections,
251
                collections_to_pin_for_library,
252
                library_name,
253
                selected_collections
254
            )
255
256
            if collections_to_pin:
257
                pin_collections(collections_to_pin, config)
258
                selected_collections[current_timestamp].extend([c.title for c in collections_to_pin])
259
                save_selected_collections(selected_collections)
260
            else:
261
                logging.info(f"No collections available to pin for library: {library_name}.")
262
263
        logging.info(f"Sleeping for {config['pinning_interval']} minutes before next run.")
264
        time.sleep(pinning_interval_seconds)
265
266
267
if __name__ == "__main__":
268
    main()
269