| Total Complexity | 138 |
| Total Lines | 449 |
| Duplicated Lines | 81.74 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like collexions often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # --- Imports --- |
||
| 2 | import random |
||
| 3 | import logging |
||
| 4 | import time |
||
| 5 | import json |
||
| 6 | import os |
||
| 7 | import sys |
||
| 8 | import re |
||
| 9 | import requests |
||
| 10 | from plexapi.server import PlexServer |
||
| 11 | from plexapi.exceptions import NotFound, BadRequest |
||
| 12 | from datetime import datetime, timedelta |
||
| 13 | |||
| 14 | # --- Configuration & Constants --- |
||
| 15 | CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'config.json') |
||
| 16 | LOG_DIR = 'logs' |
||
| 17 | LOG_FILE = os.path.join(LOG_DIR, 'collexions.log') |
||
| 18 | SELECTED_COLLECTIONS_FILE = 'selected_collections.json' |
||
| 19 | |||
| 20 | # --- Setup Logging --- |
||
| 21 | if not os.path.exists(LOG_DIR): |
||
| 22 | try: os.makedirs(LOG_DIR) |
||
| 23 | except OSError as e: sys.stderr.write(f"Error creating log dir: {e}\n"); LOG_FILE = None |
||
| 24 | |||
| 25 | log_handlers = [logging.StreamHandler(sys.stdout)] |
||
| 26 | if LOG_FILE: |
||
| 27 | try: log_handlers.append(logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8')) |
||
| 28 | except Exception as e: sys.stderr.write(f"Error setting up file log: {e}\n") |
||
| 29 | |||
| 30 | logging.basicConfig( |
||
| 31 | level=logging.INFO, |
||
| 32 | format='%(asctime)s - %(levelname)s - [%(funcName)s] %(message)s', |
||
| 33 | handlers=log_handlers |
||
| 34 | ) |
||
| 35 | |||
| 36 | # --- Functions --- |
||
| 37 | |||
| 38 | View Code Duplication | def load_selected_collections(): |
|
| 39 | """Loads the history of previously pinned collections.""" |
||
| 40 | if os.path.exists(SELECTED_COLLECTIONS_FILE): |
||
| 41 | try: |
||
| 42 | with open(SELECTED_COLLECTIONS_FILE, 'r', encoding='utf-8') as f: |
||
| 43 | data = json.load(f); |
||
| 44 | if isinstance(data, dict): return data |
||
| 45 | else: logging.error(f"Invalid format in {SELECTED_COLLECTIONS_FILE}. Resetting."); return {} |
||
| 46 | except json.JSONDecodeError: logging.error(f"Error decoding {SELECTED_COLLECTIONS_FILE}. Resetting."); return {} |
||
| 47 | except Exception as e: logging.error(f"Error loading {SELECTED_COLLECTIONS_FILE}: {e}. Resetting."); return {} |
||
| 48 | return {} |
||
| 49 | |||
| 50 | def save_selected_collections(selected_collections): |
||
| 51 | """Saves the updated history of pinned collections.""" |
||
| 52 | try: |
||
| 53 | with open(SELECTED_COLLECTIONS_FILE, 'w', encoding='utf-8') as f: |
||
| 54 | json.dump(selected_collections, f, ensure_ascii=False, indent=4) |
||
| 55 | except Exception as e: logging.error(f"Error saving {SELECTED_COLLECTIONS_FILE}: {e}") |
||
| 56 | |||
| 57 | View Code Duplication | def get_recently_pinned_collections(selected_collections, config): |
|
| 58 | """Gets titles of non-special collections pinned within the repeat_block_hours window.""" |
||
| 59 | # Note: This function now only considers titles saved in the history file, |
||
| 60 | # which (with the main loop change) will exclude special collections. |
||
| 61 | repeat_block_hours = config.get('repeat_block_hours', 12) |
||
| 62 | if not isinstance(repeat_block_hours, (int, float)) or repeat_block_hours <= 0: |
||
| 63 | logging.warning(f"Invalid 'repeat_block_hours', defaulting 12."); repeat_block_hours = 12 |
||
| 64 | cutoff_time = datetime.now() - timedelta(hours=repeat_block_hours) |
||
| 65 | recent_titles = set() |
||
| 66 | timestamps_to_keep = {} |
||
| 67 | logging.info(f"Checking history since {cutoff_time.strftime('%Y-%m-%d %H:%M:%S')} for recently pinned non-special items") |
||
| 68 | for timestamp_str, titles in list(selected_collections.items()): |
||
| 69 | if not isinstance(titles, list): logging.warning(f"Cleaning invalid history: {timestamp_str}"); selected_collections.pop(timestamp_str, None); continue |
||
| 70 | try: |
||
| 71 | try: timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') |
||
| 72 | except ValueError: timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d') |
||
| 73 | if timestamp >= cutoff_time: |
||
| 74 | valid_titles = {t for t in titles if isinstance(t, str)}; recent_titles.update(valid_titles) |
||
| 75 | timestamps_to_keep[timestamp_str] = titles # Keep this entry in the temporary dict |
||
| 76 | except ValueError: logging.warning(f"Cleaning invalid date format: '{timestamp_str}'."); selected_collections.pop(timestamp_str, None) |
||
| 77 | except Exception as e: logging.error(f"Cleaning problematic history '{timestamp_str}': {e}."); selected_collections.pop(timestamp_str, None) |
||
| 78 | |||
| 79 | # Update the main selected_collections dict to only contain recent entries |
||
| 80 | keys_to_remove = set(selected_collections.keys()) - set(timestamps_to_keep.keys()) |
||
| 81 | if keys_to_remove: |
||
| 82 | logging.info(f"Removing {len(keys_to_remove)} old entries from history file.") |
||
| 83 | for key in keys_to_remove: selected_collections.pop(key, None) |
||
| 84 | save_selected_collections(selected_collections) # Save cleaned history immediately |
||
| 85 | |||
| 86 | if recent_titles: |
||
| 87 | logging.info(f"Recently pinned non-special collections (excluded): {', '.join(sorted(list(recent_titles)))}") |
||
| 88 | return recent_titles # This set now only contains non-special recently pinned items |
||
| 89 | |||
| 90 | View Code Duplication | def is_regex_excluded(title, patterns): |
|
| 91 | """Checks if a title matches any regex pattern.""" |
||
| 92 | if not patterns or not isinstance(patterns, list): return False |
||
| 93 | try: |
||
| 94 | for pattern in patterns: |
||
| 95 | if not isinstance(pattern, str) or not pattern: continue |
||
| 96 | if re.search(pattern, title, re.IGNORECASE): logging.info(f"Excluding '{title}' (regex: '{pattern}')"); return True |
||
| 97 | except re.error as e: logging.error(f"Invalid regex '{pattern}': {e}"); return False |
||
| 98 | except Exception as e: logging.error(f"Regex error for '{title}', pattern '{pattern}': {e}"); return False |
||
| 99 | return False |
||
| 100 | |||
| 101 | View Code Duplication | def load_config(): |
|
| 102 | """Loads configuration from config.json, exits on critical errors.""" |
||
| 103 | if os.path.exists(CONFIG_PATH): |
||
| 104 | try: |
||
| 105 | with open(CONFIG_PATH, 'r', encoding='utf-8') as f: # Use f |
||
| 106 | config_data = json.load(f) # Use f |
||
| 107 | if not isinstance(config_data, dict): raise ValueError("Config not JSON object.") |
||
| 108 | return config_data |
||
| 109 | except Exception as e: logging.critical(f"CRITICAL: Error load/parse {CONFIG_PATH}: {e}. Exit."); sys.exit(1) |
||
| 110 | else: logging.critical(f"CRITICAL: Config not found {CONFIG_PATH}. Exit."); sys.exit(1) |
||
| 111 | |||
| 112 | View Code Duplication | def connect_to_plex(config): |
|
| 113 | """Connects to Plex server, returns PlexServer object or None.""" |
||
| 114 | try: |
||
| 115 | logging.info("Connecting to Plex server...") |
||
| 116 | plex_url, plex_token = config.get('plex_url'), config.get('plex_token') |
||
| 117 | if not isinstance(plex_url, str) or not plex_url or not isinstance(plex_token, str) or not plex_token: |
||
| 118 | raise ValueError("Missing/invalid 'plex_url'/'plex_token'") |
||
| 119 | plex = PlexServer(plex_url, plex_token, timeout=60) |
||
| 120 | logging.info(f"Connected to Plex server '{plex.friendlyName}' successfully.") |
||
| 121 | return plex |
||
| 122 | except ValueError as e: logging.error(f"Config error for Plex: {e}"); return None |
||
| 123 | except Exception as e: logging.error(f"Failed to connect to Plex: {e}"); return None |
||
| 124 | |||
| 125 | View Code Duplication | def get_collections_from_all_libraries(plex, library_names): |
|
| 126 | """Fetches all collection objects from the specified library names.""" |
||
| 127 | all_collections = [] |
||
| 128 | if not plex or not library_names: return all_collections |
||
| 129 | for library_name in library_names: |
||
| 130 | if not isinstance(library_name, str): logging.warning(f"Invalid lib name: {library_name}"); continue |
||
| 131 | try: |
||
| 132 | library = plex.library.section(library_name) |
||
| 133 | collections_in_library = library.collections() |
||
| 134 | logging.info(f"Found {len(collections_in_library)} collections in '{library_name}'.") |
||
| 135 | all_collections.extend(collections_in_library) |
||
| 136 | except NotFound: logging.error(f"Library '{library_name}' not found.") |
||
| 137 | except Exception as e: logging.error(f"Error fetching from '{library_name}': {e}") |
||
| 138 | return all_collections |
||
| 139 | |||
| 140 | View Code Duplication | def pin_collections(collections, config): |
|
| 141 | """Pins the provided list of collections and sends individual Discord notifications.""" |
||
| 142 | if not collections: logging.info("Pin list is empty."); return |
||
| 143 | webhook_url = config.get('discord_webhook_url') |
||
| 144 | for collection in collections: |
||
| 145 | coll_title = getattr(collection, 'title', 'Untitled') |
||
| 146 | try: |
||
| 147 | if not hasattr(collection, 'visibility'): logging.warning(f"Skip invalid: '{coll_title}'."); continue |
||
| 148 | logging.info(f"Attempting to pin: '{coll_title}'") |
||
| 149 | hub = collection.visibility() |
||
| 150 | hub.promoteHome(); hub.promoteShared() |
||
| 151 | message = f"INFO - Collection '{coll_title}' pinned successfully." |
||
| 152 | logging.info(message) |
||
| 153 | if webhook_url: send_discord_message(webhook_url, message) |
||
| 154 | except Exception as e: logging.error(f"Error pinning '{coll_title}': {e}") |
||
| 155 | |||
| 156 | View Code Duplication | def send_discord_message(webhook_url, message): |
|
| 157 | """Sends a message to the specified Discord webhook URL.""" |
||
| 158 | if not webhook_url or not isinstance(webhook_url, str): return |
||
| 159 | data = {"content": message} |
||
| 160 | try: |
||
| 161 | response = requests.post(webhook_url, json=data, timeout=10) |
||
| 162 | response.raise_for_status() |
||
| 163 | logging.info(f"Discord msg sent (Status: {response.status_code})") |
||
| 164 | except requests.exceptions.RequestException as e: logging.error(f"Failed send to Discord: {e}") |
||
| 165 | except Exception as e: logging.error(f"Discord message error: {e}") |
||
| 166 | |||
| 167 | View Code Duplication | def unpin_collections(plex, library_names, exclusion_list): |
|
| 168 | """Unpins currently promoted collections, respecting exclusions, with enhanced error logging.""" |
||
| 169 | if not plex: return |
||
| 170 | logging.info(f"Starting unpin for: {library_names} (Excluding: {exclusion_list})") |
||
| 171 | unpinned_count = 0 |
||
| 172 | exclusion_set = set(exclusion_list) if isinstance(exclusion_list, list) else set() |
||
| 173 | for library_name in library_names: |
||
| 174 | try: |
||
| 175 | library = plex.library.section(library_name) |
||
| 176 | collections_in_library = library.collections() |
||
| 177 | logging.info(f"Checking {len(collections_in_library)} collections in '{library_name}'.") |
||
| 178 | for collection in collections_in_library: |
||
| 179 | coll_title = getattr(collection, 'title', 'Untitled') |
||
| 180 | if coll_title in exclusion_set: logging.info(f"Skip unpin excluded: '{coll_title}'"); continue |
||
| 181 | try: |
||
| 182 | hub = collection.visibility() |
||
| 183 | if hub._promoted: |
||
| 184 | logging.info(f"Attempting unpin: '{coll_title}'") |
||
| 185 | try: |
||
| 186 | hub.demoteHome(); hub.demoteShared() |
||
| 187 | logging.info(f"Unpinned '{coll_title}' OK.") |
||
| 188 | unpinned_count += 1 |
||
| 189 | except Exception as demote_error: logging.error(f"Failed demote for '{coll_title}': {demote_error}") |
||
| 190 | except Exception as vis_error: logging.error(f"Error check visibility for '{coll_title}': {vis_error}") |
||
| 191 | except NotFound: logging.error(f"Lib '{library_name}' not found for unpin.") |
||
| 192 | except Exception as e: logging.error(f"Error during unpin for '{library_name}': {e}") |
||
| 193 | logging.info(f"Unpinning complete. Unpinned {unpinned_count} collections.") |
||
| 194 | |||
| 195 | View Code Duplication | def get_active_special_collections(config): |
|
| 196 | """Determines which 'special' collections are active based on current date.""" |
||
| 197 | current_date = datetime.now().date() |
||
| 198 | active_titles = [] |
||
| 199 | special_configs = config.get('special_collections', []) |
||
| 200 | if not isinstance(special_configs, list): logging.warning("'special_collections' not list."); return [] |
||
| 201 | for special in special_configs: |
||
| 202 | if not isinstance(special, dict) or not all(k in special for k in ['start_date', 'end_date', 'collection_names']): continue |
||
| 203 | s_date, e_date, names = special.get('start_date'), special.get('end_date'), special.get('collection_names') |
||
| 204 | if not isinstance(names, list) or not s_date or not e_date: continue |
||
| 205 | try: |
||
| 206 | start = datetime.strptime(s_date, '%m-%d').replace(year=current_date.year).date() |
||
| 207 | end = datetime.strptime(e_date, '%m-%d').replace(year=current_date.year).date() |
||
| 208 | end_excl = end + timedelta(days=1) |
||
| 209 | is_active = (start <= current_date < end_excl) if start <= end else (start <= current_date or current_date < end_excl) |
||
| 210 | if is_active: active_titles.extend(n for n in names if isinstance(n, str)) |
||
| 211 | except ValueError: logging.error(f"Invalid date format in special: {special}. Use MM-DD.") |
||
| 212 | except Exception as e: logging.error(f"Error process special {names}: {e}") |
||
| 213 | unique_active = list(set(active_titles)) |
||
| 214 | if unique_active: logging.info(f"Active special collections: {unique_active}") |
||
| 215 | return unique_active |
||
| 216 | |||
| 217 | def get_fully_excluded_collections(config, active_special_collections): |
||
| 218 | """Combines explicit exclusions and inactive special collections.""" |
||
| 219 | exclusion_raw = config.get('exclusion_list', []); exclusion_set = set(n for n in exclusion_raw if isinstance(n, str)) |
||
| 220 | all_special = get_all_special_collection_names(config) # Use helper function |
||
| 221 | inactive = all_special - set(active_special_collections) |
||
| 222 | if inactive: logging.info(f"Excluding inactive special collections by title: {inactive}") |
||
| 223 | combined = exclusion_set.union(inactive) |
||
| 224 | logging.info(f"Total title exclusions (explicit + inactive special): {combined or 'None'}") |
||
| 225 | return combined |
||
| 226 | |||
| 227 | # --- NEW HELPER FUNCTION --- |
||
| 228 | View Code Duplication | def get_all_special_collection_names(config): |
|
| 229 | """Returns a set of all collection names defined in special_collections config.""" |
||
| 230 | all_special_titles = set() |
||
| 231 | special_configs = config.get('special_collections', []) |
||
| 232 | if not isinstance(special_configs, list): |
||
| 233 | logging.warning("'special_collections' in config is not a list. Cannot identify all special titles.") |
||
| 234 | return all_special_titles # Return empty set |
||
| 235 | |||
| 236 | for special in special_configs: |
||
| 237 | # Check structure before accessing keys |
||
| 238 | if isinstance(special, dict) and 'collection_names' in special and isinstance(special['collection_names'], list): |
||
| 239 | # Add all valid string names from this special entry |
||
| 240 | all_special_titles.update(name for name in special['collection_names'] if isinstance(name, str)) |
||
| 241 | else: |
||
| 242 | logging.warning(f"Skipping invalid entry when getting all special names: {special}") |
||
| 243 | |||
| 244 | if all_special_titles: |
||
| 245 | logging.info(f"Identified {len(all_special_titles)} unique titles defined across all special_collections entries.") |
||
| 246 | return all_special_titles |
||
| 247 | # --- END NEW HELPER FUNCTION --- |
||
| 248 | |||
| 249 | |||
| 250 | View Code Duplication | def select_from_categories(categories_config, all_collections, exclusion_set, remaining_slots, regex_patterns): |
|
| 251 | """Selects items from categories based on config (version from user script).""" |
||
| 252 | # This function now relies on the exclusion_set passed in, which includes recently pinned NON-SPECIAL items |
||
| 253 | collections_to_pin = [] |
||
| 254 | config_dict = categories_config if isinstance(categories_config, dict) else {} |
||
| 255 | always_call = config_dict.pop('always_call', True) |
||
| 256 | category_items = config_dict.items() |
||
| 257 | processed_titles_in_this_step = set() |
||
| 258 | for category, collection_names in category_items: |
||
| 259 | if remaining_slots <= 0: break |
||
| 260 | if not isinstance(collection_names, list): continue |
||
| 261 | potential_pins = [ |
||
| 262 | c for c in all_collections |
||
| 263 | if getattr(c, 'title', None) in collection_names |
||
| 264 | and getattr(c, 'title', None) not in exclusion_set # Checks against combined exclusions |
||
| 265 | and not is_regex_excluded(getattr(c, 'title', ''), regex_patterns) |
||
| 266 | and getattr(c, 'title', None) not in processed_titles_in_this_step |
||
| 267 | ] |
||
| 268 | if potential_pins: |
||
| 269 | if always_call or random.choice([True, False]): |
||
| 270 | selected = random.choice(potential_pins) |
||
| 271 | collections_to_pin.append(selected) |
||
| 272 | processed_titles_in_this_step.add(selected.title) |
||
| 273 | exclusion_set.add(selected.title) # Add to exclusion for this cycle |
||
| 274 | logging.info(f"Added '{selected.title}' from category '{category}'") |
||
| 275 | remaining_slots -= 1 |
||
| 276 | if isinstance(categories_config, dict): categories_config['always_call'] = always_call |
||
| 277 | return collections_to_pin, remaining_slots |
||
| 278 | |||
| 279 | |||
| 280 | View Code Duplication | def fill_with_random_collections(random_collections_pool, remaining_slots): |
|
| 281 | """Fills remaining slots with random choices (version from user script).""" |
||
| 282 | # Assumes random_collections_pool is already filtered |
||
| 283 | collections_to_pin = [] |
||
| 284 | available = random_collections_pool[:] |
||
| 285 | if not available: logging.info("No items left for random."); return collections_to_pin |
||
| 286 | random.shuffle(available) |
||
| 287 | num = min(remaining_slots, len(available)) |
||
| 288 | logging.info(f"Selecting up to {num} random collections from {len(available)}.") |
||
| 289 | selected = available[:num] |
||
| 290 | collections_to_pin.extend(selected) |
||
| 291 | for c in selected: logging.info(f"Added random collection '{getattr(c, 'title', 'Untitled')}'") |
||
| 292 | return collections_to_pin |
||
| 293 | |||
| 294 | |||
| 295 | View Code Duplication | def filter_collections(config, all_collections, active_special_collections, collection_limit, library_name, selected_collections): |
|
| 296 | """Filters collections and selects pins, using config threshold.""" |
||
| 297 | min_items_threshold = config.get('min_items_for_pinning', 10) # Reads from config |
||
| 298 | logging.info(f"Filtering: Min items required = {min_items_threshold}") |
||
| 299 | |||
| 300 | # Get exclusion sets: |
||
| 301 | # Note: get_recently_pinned_collections now only returns non-special items based on modified history |
||
| 302 | fully_excluded_collections = get_fully_excluded_collections(config, active_special_collections) |
||
| 303 | recently_pinned_non_special = get_recently_pinned_collections(selected_collections, config) |
||
| 304 | regex_patterns = config.get('regex_exclusion_patterns', []) |
||
| 305 | # Combine title exclusions: explicit, inactive special, recently pinned non-special |
||
| 306 | title_exclusion_set = fully_excluded_collections.union(recently_pinned_non_special) |
||
| 307 | |||
| 308 | eligible_collections = [] |
||
| 309 | logging.info(f"Starting with {len(all_collections)} collections in '{library_name}'.") |
||
| 310 | for c in all_collections: |
||
| 311 | coll_title = getattr(c, 'title', None); |
||
| 312 | if not coll_title: continue |
||
| 313 | # Check exclusions that apply to ALL types (including special) |
||
| 314 | if coll_title in fully_excluded_collections: continue # Explicit list or inactive special |
||
| 315 | if is_regex_excluded(coll_title, regex_patterns): continue |
||
| 316 | try: |
||
| 317 | if c.childCount < min_items_threshold: logging.info(f"Excluding '{coll_title}' (low count: {c.childCount})"); continue |
||
| 318 | except Exception as e: logging.warning(f"Excluding '{coll_title}' (count error: {e})"); continue |
||
| 319 | |||
| 320 | # Check recency exclusion ONLY if it's NOT an active special collection |
||
| 321 | if coll_title not in active_special_collections and coll_title in recently_pinned_non_special: |
||
| 322 | logging.info(f"Excluding '{coll_title}' (recently pinned non-special item).") |
||
| 323 | continue |
||
| 324 | |||
| 325 | # If all checks passed, it's eligible for selection based on priority |
||
| 326 | eligible_collections.append(c) |
||
| 327 | |||
| 328 | logging.info(f"Found {len(eligible_collections)} eligible collections for selection priority.") |
||
| 329 | |||
| 330 | collections_to_pin = []; pinned_titles = set(); remaining = collection_limit |
||
| 331 | |||
| 332 | # Step 1: Special (These have already passed size/regex/explicit/inactive checks) |
||
| 333 | # Recency check was skipped for them above. |
||
| 334 | specials = [c for c in eligible_collections if c.title in active_special_collections][:remaining] |
||
| 335 | collections_to_pin.extend(specials); pinned_titles.update(c.title for c in specials); remaining -= len(specials) |
||
| 336 | if specials: logging.info(f"Added {len(specials)} special: {[c.title for c in specials]}. Left: {remaining}") |
||
| 337 | |||
| 338 | # Step 2: Categories (Items here have passed all checks including recency) |
||
| 339 | if remaining > 0: |
||
| 340 | cat_conf = config.get('categories', {}).get(library_name, {}); |
||
| 341 | # Pass only items not already selected as special |
||
| 342 | eligible_cat = [c for c in eligible_collections if c.title not in pinned_titles] |
||
| 343 | # Exclusions passed to select_from_categories are now just for preventing category overlap within the step |
||
| 344 | # as main exclusions were already applied to create eligible_collections |
||
| 345 | cat_pins, remaining = select_from_categories(cat_conf, eligible_cat, pinned_titles.copy(), remaining, regex_patterns) # Pass copy of pinned_titles |
||
| 346 | collections_to_pin.extend(cat_pins); pinned_titles.update(c.title for c in cat_pins) |
||
| 347 | if cat_pins: logging.info(f"Added {len(cat_pins)} from categories. Left: {remaining}") |
||
| 348 | |||
| 349 | # Step 3: Random (Items here have passed all checks including recency) |
||
| 350 | if remaining > 0: |
||
| 351 | eligible_rand = [c for c in eligible_collections if c.title not in pinned_titles] |
||
| 352 | rand_pins = fill_with_random_collections(eligible_rand, remaining) |
||
| 353 | collections_to_pin.extend(rand_pins) |
||
| 354 | # No need to update pinned_titles here, it's the last step |
||
| 355 | |||
| 356 | logging.info(f"Final list for '{library_name}': {[c.title for c in collections_to_pin]}") |
||
| 357 | return collections_to_pin |
||
| 358 | |||
| 359 | |||
| 360 | # --- Main Function (UPDATED) --- |
||
| 361 | View Code Duplication | def main(): |
|
| 362 | """Main execution loop.""" |
||
| 363 | logging.info("Starting Collexions Script") |
||
| 364 | while True: |
||
| 365 | run_start = time.time() |
||
| 366 | config = load_config() |
||
| 367 | if not all(k in config for k in ['plex_url', 'plex_token', 'pinning_interval']): |
||
| 368 | logging.critical("Config essentials missing. Exit."); sys.exit(1) |
||
| 369 | |||
| 370 | pin_interval = config.get('pinning_interval', 60); |
||
| 371 | if not isinstance(pin_interval, (int, float)) or pin_interval <= 0: pin_interval = 60 |
||
| 372 | sleep_sec = pin_interval * 60 |
||
| 373 | |||
| 374 | plex = connect_to_plex(config) |
||
| 375 | if not plex: |
||
| 376 | logging.error(f"Plex connection failed. Retrying in {pin_interval} min.") |
||
| 377 | else: |
||
| 378 | exclusion_list = config.get('exclusion_list', []); |
||
| 379 | if not isinstance(exclusion_list, list): exclusion_list = [] |
||
| 380 | library_names = config.get('library_names', []) |
||
| 381 | if not isinstance(library_names, list): library_names = [] |
||
| 382 | collections_per_library_config = config.get('number_of_collections_to_pin', {}) |
||
| 383 | if not isinstance(collections_per_library_config, dict): collections_per_library_config = {} |
||
| 384 | |||
| 385 | selected_collections = load_selected_collections() # Load current history |
||
| 386 | current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
||
| 387 | # Don't pre-initialize timestamp entry, add only if non-specials are pinned |
||
| 388 | newly_pinned_titles_this_run = [] # Track all pins for this run |
||
| 389 | |||
| 390 | # --- Get all special titles ONCE per cycle --- |
||
| 391 | all_special_titles = get_all_special_collection_names(config) |
||
| 392 | # ------------------------------------------- |
||
| 393 | |||
| 394 | for library_name in library_names: |
||
| 395 | library_process_start = time.time() |
||
| 396 | if not isinstance(library_name, str): logging.warning(f"Skipping invalid library name: {library_name}"); continue |
||
| 397 | |||
| 398 | pin_limit = collections_per_library_config.get(library_name, 0); |
||
| 399 | if not isinstance(pin_limit, int) or pin_limit < 0: pin_limit = 0 |
||
| 400 | if pin_limit == 0: logging.info(f"Skip '{library_name}': 0 limit."); continue |
||
| 401 | |||
| 402 | logging.info(f"Processing '{library_name}' (Limit: {pin_limit})") |
||
| 403 | unpin_collections(plex, [library_name], exclusion_list) |
||
| 404 | active_specials = get_active_special_collections(config) |
||
| 405 | all_colls = get_collections_from_all_libraries(plex, [library_name]) |
||
| 406 | if not all_colls: logging.info(f"No collections in '{library_name}'."); continue |
||
| 407 | |||
| 408 | colls_to_pin = filter_collections(config, all_colls, active_specials, pin_limit, library_name, selected_collections) |
||
| 409 | if colls_to_pin: |
||
| 410 | pin_collections(colls_to_pin, config) |
||
| 411 | # Add ALL pinned titles to the list for THIS run's tracking |
||
| 412 | newly_pinned_titles_this_run.extend([c.title for c in colls_to_pin if hasattr(c, 'title')]) |
||
| 413 | else: logging.info(f"No collections selected for '{library_name}'.") |
||
| 414 | logging.info(f"Finished '{library_name}' in {time.time() - library_process_start:.2f}s.") |
||
| 415 | |||
| 416 | # --- Modified History Update Logic --- |
||
| 417 | if newly_pinned_titles_this_run: |
||
| 418 | unique_new_pins_all = set(newly_pinned_titles_this_run) |
||
| 419 | # Filter out special collections before saving to history |
||
| 420 | non_special_pins_for_history = { |
||
| 421 | title for title in unique_new_pins_all |
||
| 422 | if title not in all_special_titles # Use the set fetched earlier |
||
| 423 | } |
||
| 424 | # Only update history file if there were non-special items pinned |
||
| 425 | if non_special_pins_for_history: |
||
| 426 | history_entry = sorted(list(non_special_pins_for_history)) |
||
| 427 | selected_collections[current_timestamp] = history_entry |
||
| 428 | save_selected_collections(selected_collections) |
||
| 429 | logging.info(f"Updated history for {current_timestamp} with {len(history_entry)} non-special items.") |
||
| 430 | if len(unique_new_pins_all) > len(non_special_pins_for_history): |
||
| 431 | logging.info(f"Note: {len(unique_new_pins_all) - len(non_special_pins_for_history)} special collection(s) were pinned but not added to recency history.") |
||
| 432 | else: |
||
| 433 | logging.info("Only special collections were pinned this cycle. History not updated for recency blocking.") |
||
| 434 | else: |
||
| 435 | logging.info("Nothing pinned this cycle, history not updated.") |
||
| 436 | # --- End Modified History Update --- |
||
| 437 | |||
| 438 | run_end = time.time() |
||
| 439 | logging.info(f"Cycle finished in {run_end - run_start:.2f} seconds.") |
||
| 440 | logging.info(f"Sleeping for {pin_interval} minutes...") |
||
| 441 | try: time.sleep(sleep_sec) |
||
| 442 | except KeyboardInterrupt: logging.info("Script interrupted. Exiting."); break |
||
| 443 | |||
| 444 | # --- Script Entry Point --- |
||
| 445 | if __name__ == "__main__": |
||
| 446 | try: main() |
||
| 447 | except KeyboardInterrupt: logging.info("Script terminated by user.") |
||
| 448 | except Exception as e: logging.critical(f"UNHANDLED EXCEPTION: {e}", exc_info=True); sys.exit(1) |
||
| 449 |