Passed
Push — main ( fa329c...3a65c6 )
by Jason
01:06 queued 15s
created

ColleXions.get_fully_excluded_collections()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
import random
2
import logging
3
import time
4
import json
5
import os
6
import sys
7
import requests
8
from plexapi.server import PlexServer
9
from datetime import datetime, timedelta
10
11
# Define log file path
12
LOG_DIR = 'logs'
13
LOG_FILE = os.path.join(LOG_DIR, 'collexions.log')
14
15
# Ensure the logs directory exists
16
if not os.path.exists(LOG_DIR):
17
    os.makedirs(LOG_DIR)
18
    logging.info(f"Created log directory: {LOG_DIR}")
19
20
# Configure logging to file with UTF-8 encoding for console output
21
logging.basicConfig(
22
    level=logging.INFO,
23
    format='%(asctime)s - %(levelname)s - %(message)s',
24
    handlers=[
25
        logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8'),
26
        logging.StreamHandler(sys.stdout)
27
    ]
28
)
29
30
# Configuration file path
31
CONFIG_FILE = 'config.json'
32
SELECTED_COLLECTIONS_FILE = 'selected_collections.json'
33
34
def load_selected_collections():
35
    if os.path.exists(SELECTED_COLLECTIONS_FILE):
36
        with open(SELECTED_COLLECTIONS_FILE, 'r', encoding='utf-8') as f:
37
            selected_collections = json.load(f)
38
    else:
39
        selected_collections = {}
40
41
    # Clean up entries older than 3 days
42
    current_date = datetime.now().date()
43
    cutoff_date = current_date - timedelta(days=3)
44
    selected_collections = {
45
        day: collections for day, collections in selected_collections.items()
46
        if datetime.strptime(day, '%Y-%m-%d').date() >= cutoff_date
47
    }
48
    return selected_collections
49
50
def save_selected_collections(selected_collections):
51
    current_date = datetime.now().date()
52
    cutoff_date = current_date - timedelta(days=3)
53
    selected_collections = {
54
        day: collections for day, collections in selected_collections.items()
55
        if datetime.strptime(day, '%Y-%m-%d').date() >= cutoff_date
56
    }
57
    with open(SELECTED_COLLECTIONS_FILE, 'w', encoding='utf-8') as f:
58
        json.dump(selected_collections, f, ensure_ascii=False, indent=4)
59
60
def load_config():
61
    if not os.path.exists(CONFIG_FILE):
62
        logging.error(f"Configuration file '{CONFIG_FILE}' not found.")
63
        raise FileNotFoundError(f"Configuration file '{CONFIG_FILE}' not found.")
64
    with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
65
        config = json.load(f)
66
    return config
67
68
def connect_to_plex(config):
69
    logging.info("Connecting to Plex server...")
70
    plex = PlexServer(config['plex_url'], config['plex_token'])
71
    logging.info("Connected to Plex server successfully.")
72
    return plex
73
74
def get_collections_from_all_libraries(plex, library_names):
75
    all_collections = []
76
    for library_name in library_names:
77
        library = plex.library.section(library_name)
78
        collections = library.collections()
79
        all_collections.extend(collections)
80
    return all_collections
81
82
def pin_collections(collections, config):
83
    for collection in collections:
84
        try:
85
            logging.info(f"Attempting to pin collection: {collection.title}")
86
            hub = collection.visibility()
87
            hub.promoteHome()
88
            hub.promoteShared()
89
            message = f"INFO - Collection '**{collection.title}**' pinned successfully."
90
            logging.info(message)
91
            if 'discord_webhook_url' in config and config['discord_webhook_url']:
92
                send_discord_message(config['discord_webhook_url'], message)
93
        except Exception as e:
94
            logging.error(f"Error while pinning collection: {collection.title}. Error: {str(e)}")
95
96
def send_discord_message(webhook_url, message):
97
    data = {"content": message}
98
    response = requests.post(webhook_url, json=data)
99
    if response.status_code == 204:
100
        logging.info(f"Message sent to Discord: {message}")
101
    else:
102
        logging.error(f"Failed to send message to Discord. Status code: {response.status_code}")
103
104
def unpin_collections(plex, library_names, exclusion_list):
105
    logging.info("Unpinning currently pinned collections...")
106
    for library_name in library_names:
107
        for collection in plex.library.section(library_name).collections():
108
            if collection.title in exclusion_list:
109
                logging.info(f"Skipping unpinning for collection: {collection.title} (in exclusion list)")
110
                continue
111
            hub = collection.visibility()
112
            if hub._promoted:
113
                hub.demoteHome()
114
                hub.demoteShared()
115
                logging.info(f"Collection '{collection.title}' unpinned successfully.")
116
117
from datetime import datetime, timedelta
118
119
def get_active_special_collections(config):
120
    current_date = datetime.now().date()  # Today's date without time component
121
    active_special_collections = []
122
123
    for special in config.get('special_collections', []):
124
        # Parse start and end dates as date objects for the current year
125
        start_date = datetime.strptime(special['start_date'], '%m-%d').replace(year=current_date.year).date()
126
        end_date = datetime.strptime(special['end_date'], '%m-%d').replace(year=current_date.year).date()
127
128
        # Adjust end date to be exclusive, so it includes only up to but not including the end date
129
        end_date_exclusive = end_date + timedelta(days=1)
130
131
        # Handle cross-year range by dividing it into two segments if needed
132
        if start_date > end_date:
133
            # Cross-year case: Collection is active if today is in the segment from start_date to Dec 31
134
            # or in the segment from Jan 1 to end_date in the next year
135
            if (start_date <= current_date <= datetime(current_date.year, 12, 31).date()) or \
136
               (datetime(current_date.year + 1, 1, 1).date() <= current_date < end_date_exclusive):
137
                active_special_collections.extend(special['collection_names'])
138
                logging.info(f"Collection '{special['collection_names']}' is active due to cross-year range.")
139
            else:
140
                logging.info(f"Collection '{special['collection_names']}' is NOT active (cross-year case).")
141
        else:
142
            # Standard date range within the same year
143
            if start_date <= current_date < end_date_exclusive:
144
                active_special_collections.extend(special['collection_names'])
145
                logging.info(f"Collection '{special['collection_names']}' is active for pinning.")
146
            else:
147
                logging.info(f"Collection '{special['collection_names']}' is NOT active (standard case).")
148
149
    logging.info(f"Final active special collections: {active_special_collections}")
150
    return active_special_collections
151
152
153
def get_fully_excluded_collections(config, active_special_collections):
154
    exclusion_set = set(config.get('exclusion_list', []))
155
    all_special_collections = set(
156
        col for special in config.get('special_collections', []) for col in special['collection_names']
157
    )
158
    return exclusion_set.union(all_special_collections - set(active_special_collections))
159
160
def select_from_special_collections(active_special_collections, all_collections, exclusion_set):
161
    return [
162
        c for special in active_special_collections
163
        for c in all_collections if c.title == special and c.title not in exclusion_set
164
    ]
165
166
def select_from_categories(categories_config, all_collections, exclusion_set, remaining_slots):
167
    collections_to_pin = []
168
    always_call = categories_config.pop('always_call', True)
169
    for category, collection_names in categories_config.items():
170
        category_collections = [
171
            c for c in all_collections if c.title in collection_names and c.title not in exclusion_set
172
        ]
173
        if category_collections and remaining_slots > 0:
174
            if always_call or random.choice([True, False]):
175
                selected_collection = random.choice(category_collections)
176
                collections_to_pin.append(selected_collection)
177
                logging.info(f"Added '{selected_collection.title}' from category '{category}' to pinning list")
178
                remaining_slots -= 1
179
    return collections_to_pin, remaining_slots
180
181
def fill_with_random_collections(random_collections, remaining_slots):
182
    collections_to_pin = []
183
    while remaining_slots > 0 and random_collections:
184
        selected_collection = random.choice(random_collections)
185
        collections_to_pin.append(selected_collection)
186
        logging.info(f"Added random collection '{selected_collection.title}' to pinning list")
187
        remaining_slots -= 1
188
        random_collections.remove(selected_collection)
189
    return collections_to_pin
190
191
def filter_collections(config, all_collections, active_special_collections, collection_limit, library_name):
192
    fully_excluded_collections = get_fully_excluded_collections(config, active_special_collections)
193
    collections_to_pin = []
194
195
    # Step 1: Pin only active special collections
196
    collections_to_pin.extend(select_from_special_collections(active_special_collections, all_collections, fully_excluded_collections))
197
    remaining_slots = collection_limit - len(collections_to_pin)
198
199
    # Step 2: Pin collections from categories if slots remain
200
    if remaining_slots > 0:
201
        categories_config = config.get('categories', {}).get(library_name, {})
202
        category_pins, remaining_slots = select_from_categories(categories_config, all_collections, fully_excluded_collections, remaining_slots)
203
        collections_to_pin.extend(category_pins)
204
205
    # Step 3: Fill remaining slots with random collections
206
    random_collections = [c for c in all_collections if c.title not in fully_excluded_collections]
207
    collections_to_pin.extend(fill_with_random_collections(random_collections, remaining_slots))
208
209
    return collections_to_pin
210
211
212
213
def main():
214
    config = load_config()
215
    plex = connect_to_plex(config)
216
    exclusion_list = config.get('exclusion_list', [])
217
    library_names = config.get('library_names', ['Movies', 'TV Shows'])
218
    pinning_interval_seconds = config['pinning_interval'] * 60
219
220
    selected_collections = load_selected_collections()
221
    current_day = datetime.now().strftime('%Y-%m-%d')
222
    if current_day not in selected_collections:
223
        selected_collections[current_day] = []
224
225
    while True:
226
        for library_name in library_names:
227
            collections_to_pin_for_library = config['number_of_collections_to_pin'].get(library_name, 0)
228
            
229
            logging.info(f"Processing library: {library_name} with {collections_to_pin_for_library} collections to pin.")
230
231
            unpin_collections(plex, [library_name], exclusion_list)
232
233
            active_special_collections = get_active_special_collections(config)
234
            all_collections = get_collections_from_all_libraries(plex, [library_name])
235
236
            collections_to_pin = filter_collections(config, all_collections, active_special_collections, collections_to_pin_for_library, library_name)
237
238
            if collections_to_pin:
239
                pin_collections(collections_to_pin, config)
240
                selected_collections[current_day].extend([c.title for c in collections_to_pin])
241
                save_selected_collections(selected_collections)
242
            else:
243
                logging.info(f"No collections available to pin for library: {library_name}.")
244
245
        logging.info(f"Scheduler set to change pinned collections every {config['pinning_interval']} minutes.")
246
        time.sleep(pinning_interval_seconds)
247
248
if __name__ == "__main__":
249
    main()
250