diff --git a/app/__init__.py b/app/__init__.py index 8b769b9..ff74384 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -160,10 +160,6 @@ app.logger.debug(f"Debug logging active") from app.routes import pl_bp, routes, jellyfin_routes app.register_blueprint(pl_bp) -from . import tasks -if "worker" in sys.argv: - tasks.release_lock("download_missing_tracks_lock") - from app import filters # Import the filters dictionary # Register all filters diff --git a/app/functions.py b/app/functions.py index c188087..f82805a 100644 --- a/app/functions.py +++ b/app/functions.py @@ -17,48 +17,6 @@ from spotipy.exceptions import SpotifyException import re -TASK_STATUS = { - 'update_all_playlists_track_status': None, - 'download_missing_tracks': None, - 'check_for_playlist_updates': None, - 'update_jellyfin_id_for_downloaded_tracks' : None, - - -} -if app.config['LIDARR_API_KEY']: - TASK_STATUS['request_lidarr'] = None - -LOCK_KEYS = [ - 'update_all_playlists_track_status_lock', - 'download_missing_tracks_lock', - 'check_for_playlist_updates_lock', - 'update_jellyfin_id_for_downloaded_tracks_lock' , - 'full_update_jellyfin_ids', - 'request_lidarr_lock' -] - -def manage_task(task_name): - task_id = TASK_STATUS.get(task_name) - - if task_id: - result = AsyncResult(task_id) - if result.state in ['PENDING', 'STARTED']: - return result.state, result.info if result.info else {} - if task_name == 'update_all_playlists_track_status': - result = tasks.update_all_playlists_track_status.delay() - elif task_name == 'download_missing_tracks': - result = tasks.download_missing_tracks.delay() - elif task_name == 'check_for_playlist_updates': - result = tasks.check_for_playlist_updates.delay() - elif task_name == 'update_jellyfin_id_for_downloaded_tracks': - result = tasks.update_jellyfin_id_for_downloaded_tracks.delay() - elif task_name == 'request_lidarr': - result = tasks.request_lidarr.delay() - - TASK_STATUS[task_name] = result.id - return result.state, result.info if result.info else {} - - def prepPlaylistData(playlist: base.Playlist) -> Optional[CombinedPlaylistData]: jellyfin_user = JellyfinUser.query.filter_by(jellyfin_user_id=session['jellyfin_user_id']).first() if not jellyfin_user: diff --git a/app/providers/spotify.py b/app/providers/spotify.py index 07c8da9..82d4e80 100644 --- a/app/providers/spotify.py +++ b/app/providers/spotify.py @@ -127,6 +127,13 @@ class SpotifyClient(MusicProviderClient): } l.debug(f"starting request: {self.base_url}/{endpoint}") response = requests.get(f"{self.base_url}/{endpoint}", headers=headers, params=params, cookies=self.cookies) + # if the response is unauthorized, we need to reauthenticate + if response.status_code == 401: + l.debug("reauthenticating") + self.authenticate() + headers['authorization'] = f'Bearer {self.session_data.get("accessToken", "")}' + headers['client-token'] = self.client_token.get('token','') + response = requests.get(f"{self.base_url}/{endpoint}", headers=headers, params=params, cookies=self.cookies) response.raise_for_status() return response.json() diff --git a/app/routes/routes.py b/app/routes/routes.py index ef9da3d..c45252c 100644 --- a/app/routes/routes.py +++ b/app/routes/routes.py @@ -68,14 +68,11 @@ def save_lidarr_config(): @functions.jellyfin_admin_required def task_manager(): statuses = {} - for task_name, task_id in functions.TASK_STATUS.items(): - if task_id: - result = AsyncResult(task_id) - statuses[task_name] = {'state': result.state, 'info': result.info if result.info else {}} - else: - statuses[task_name] = {'state': 'NOT STARTED', 'info': {}} + for task_name, task_id in tasks.task_manager.tasks.items(): + statuses[task_name] = tasks.task_manager.get_task_status(task_name) + - return render_template('admin/tasks.html', tasks=statuses,lock_keys = functions.LOCK_KEYS) + return render_template('admin/tasks.html', tasks=statuses) @app.route('/admin') @app.route('/admin/link_issues') @@ -115,7 +112,7 @@ def link_issues(): @app.route('/run_task/', methods=['POST']) @functions.jellyfin_admin_required def run_task(task_name): - status, info = functions.manage_task(task_name) + status, info = tasks.task_manager.start_task(task_name) # Rendere nur die aktualisierte Zeile der Task task_info = {task_name: {'state': status, 'info': info}} @@ -126,12 +123,9 @@ def run_task(task_name): @functions.jellyfin_admin_required def task_status(): statuses = {} - for task_name, task_id in functions.TASK_STATUS.items(): - if task_id: - result = AsyncResult(task_id) - statuses[task_name] = {'state': result.state, 'info': result.info if result.info else {}} - else: - statuses[task_name] = {'state': 'NOT STARTED', 'info': {}} + for task_name, task_id in tasks.task_manager.tasks.items(): + statuses[task_name] = tasks.task_manager.get_task_status(task_name) + # Render the HTML partial template instead of returning JSON return render_template('partials/_task_status.html', tasks=statuses) @@ -396,5 +390,6 @@ def unlock_key(): @pl_bp.route('/test') def test(): + tasks.update_all_playlists_track_status() return '' \ No newline at end of file diff --git a/app/tasks.py b/app/tasks.py index 22e26f7..4baddb3 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -11,19 +11,13 @@ from app.models import JellyfinUser,Playlist,Track, user_playlists, playlist_tra import os import redis from celery import current_task,signals +from celery.result import AsyncResult from app.providers import base from app.registry.music_provider_registry import MusicProviderRegistry from lidarr.classes import Artist -def acquire_lock(lock_name, expiration=60): - return redis_client.set(lock_name, "locked", ex=expiration, nx=True) -def release_lock(lock_name): - redis_client.delete(lock_name) -def prepare_logger(): - FORMAT = "[%(asctime)s][%(filename)18s:%(lineno)4s - %(funcName)20s() ] %(message)s" - logging.basicConfig(format=FORMAT) @signals.celeryd_init.connect def setup_log_format(sender, conf, **kwargs): @@ -36,7 +30,7 @@ def setup_log_format(sender, conf, **kwargs): def update_all_playlists_track_status(self): lock_key = "update_all_playlists_track_status_lock" - if acquire_lock(lock_key, expiration=600): + if task_manager.acquire_lock(lock_key, expiration=600): try: with app.app_context(): playlists = Playlist.query.all() @@ -51,19 +45,26 @@ def update_all_playlists_track_status(self): for playlist in playlists: total_tracks = 0 available_tracks = 0 - app.logger.debug(f"Current Playlist: {playlist.name} [{playlist.id}:{playlist.provider_playlist_id}]" ) + app.logger.info(f"Current Playlist: {playlist.name} [{playlist.id}:{playlist.provider_playlist_id}]" ) for track in playlist.tracks: total_tracks += 1 + app.logger.debug(f"Processing track: {track.name} [{track.provider_track_id}]") + app.logger.debug(f"\tPath = {track.filesystem_path}") + if track.filesystem_path: + app.logger.debug(f"\tPath exists = {os.path.exists(track.filesystem_path)}") + app.logger.debug(f"\tJellyfinID = {track.jellyfin_id}") if track.filesystem_path and os.path.exists(track.filesystem_path): - app.logger.debug(f"Track {track.name} is already downloaded at {track.filesystem_path}.") + app.logger.info(f"Track {track.name} is already downloaded at {track.filesystem_path}.") available_tracks += 1 track.downloaded = True db.session.commit() #If not found in filesystem, but a jellyfin_id is set, query the jellyfin server for the track and populate the filesystem_path from the response with the path elif track.jellyfin_id: jellyfin_track = jellyfin.get_item(jellyfin_admin_token, track.jellyfin_id) + app.logger.debug(f"\tJellyfin Path: {jellyfin_track['Path']}") + app.logger.debug(f"\tJellyfin Path exists: {os.path.exists(jellyfin_track['Path'])}") if jellyfin_track and os.path.exists(jellyfin_track['Path']): - app.logger.debug(f"Track {track.name} found in Jellyfin at {jellyfin_track['Path']}.") + app.logger.info(f"Track {track.name} found in Jellyfin at {jellyfin_track['Path']}.") track.filesystem_path = jellyfin_track['Path'] track.downloaded = True db.session.commit() @@ -94,7 +95,7 @@ def update_all_playlists_track_status(self): app.logger.info("All playlists' track statuses updated.") return {'status': 'All playlists updated', 'total': total_playlists, 'processed': processed_playlists} finally: - release_lock(lock_key) + task_manager.release_lock(lock_key) else: app.logger.info("Skipping task. Another instance is already running.") return {'status': 'Task skipped, another instance is running'} @@ -104,7 +105,7 @@ def update_all_playlists_track_status(self): def download_missing_tracks(self): lock_key = "download_missing_tracks_lock" - if acquire_lock(lock_key, expiration=1800): + if task_manager.acquire_lock(lock_key, expiration=1800): try: app.logger.info("Starting track download job...") @@ -243,7 +244,7 @@ def download_missing_tracks(self): 'failed': failed_downloads } finally: - release_lock(lock_key) + task_manager.release_lock(lock_key) if app.config['REFRESH_LIBRARIES_AFTER_DOWNLOAD_TASK']: libraries = jellyfin.get_libraries(jellyfin_admin_token) for lib in libraries: @@ -257,7 +258,7 @@ def download_missing_tracks(self): def check_for_playlist_updates(self): lock_key = "check_for_playlist_updates_lock" - if acquire_lock(lock_key, expiration=600): + if task_manager.acquire_lock(lock_key, expiration=600): try: app.logger.info('Starting playlist update check...') with app.app_context(): @@ -355,7 +356,7 @@ def check_for_playlist_updates(self): return {'status': 'Playlist update check completed', 'total': total_playlists, 'processed': processed_playlists} finally: - release_lock(lock_key) + task_manager.release_lock(lock_key) else: app.logger.info("Skipping task. Another instance is already running.") return {'status': 'Task skipped, another instance is running'} @@ -363,15 +364,15 @@ def check_for_playlist_updates(self): @celery.task(bind=True) def update_jellyfin_id_for_downloaded_tracks(self): lock_key = "update_jellyfin_id_for_downloaded_tracks_lock" - full_update_key = 'full_update_jellyfin_ids' - if acquire_lock(lock_key, expiration=600): # Lock for 10 minutes + full_update_key = 'full_update_jellyfin_ids_lock' + if task_manager.acquire_lock(lock_key, expiration=600): # Lock for 10 minutes try: app.logger.info("Starting Jellyfin ID update for tracks...") with app.app_context(): downloaded_tracks = Track.query.filter_by(downloaded=True, jellyfin_id=None).all() - if acquire_lock(full_update_key, expiration=60*60*24): + if task_manager.acquire_lock(full_update_key, expiration=60*60*24): app.logger.info(f"performing full update on jellyfin track ids. (Update tracks and playlists if better quality will be found)") downloaded_tracks = Track.query.all() else: @@ -415,7 +416,7 @@ def update_jellyfin_id_for_downloaded_tracks(self): return {'status': 'All tracks updated', 'total': total_tracks, 'processed': processed_tracks} finally: - release_lock(lock_key) + task_manager.release_lock(lock_key) else: app.logger.info("Skipping task. Another instance is already running.") return {'status': 'Task skipped, another instance is running'} @@ -424,7 +425,7 @@ def update_jellyfin_id_for_downloaded_tracks(self): def request_lidarr(self): lock_key = "request_lidarr_lock" - if acquire_lock(lock_key, expiration=600): + if task_manager.acquire_lock(lock_key, expiration=600): with app.app_context(): if app.config['LIDARR_API_KEY'] and app.config['LIDARR_URL']: from app import lidarr_client @@ -497,11 +498,11 @@ def request_lidarr(self): app.logger.info(f'Requests sent to Lidarr. Total items: {total_items}') return {'status': 'Request sent to Lidarr'} finally: - release_lock(lock_key) + task_manager.release_lock(lock_key) else: app.logger.info('Lidarr API key or URL not set. Skipping request.') - release_lock(lock_key) + task_manager.release_lock(lock_key) else: @@ -594,3 +595,48 @@ def compute_quality_score(result, use_ffprobe=False) -> float: app.logger.warning(f"No valid file path for track {result.get('Name')} - Skipping ffprobe analysis.") return score + + + +class TaskManager: + def __init__(self): + self.tasks = { + 'update_all_playlists_track_status': None, + 'download_missing_tracks': None, + 'check_for_playlist_updates': None, + 'update_jellyfin_id_for_downloaded_tracks': None + } + if app.config['LIDARR_API_KEY']: + self.tasks['request_lidarr'] = None + + def start_task(self, task_name, *args, **kwargs): + if task_name not in self.tasks: + raise ValueError(f"Task {task_name} is not defined.") + task = globals()[task_name].delay(*args, **kwargs) + self.tasks[task_name] = task.id + return task.id + + def get_task_status(self, task_name): + if task_name not in self.tasks: + raise ValueError(f"Task {task_name} is not defined.") + task_id = self.tasks[task_name] + if not task_id: + return {'state': 'NOT STARTED', 'info': {}, 'lock_status': False} + result = AsyncResult(task_id) + lock_status = True if self.get_lock(f"{task_name}_lock") else False + return {'state': result.state, 'info': result.info if result.info else {}, 'lock_status': lock_status} + + def acquire_lock(self, lock_name, expiration=60): + return redis_client.set(lock_name, "locked", ex=expiration, nx=True) + + def release_lock(self, lock_name): + redis_client.delete(lock_name) + + def get_lock(self, lock_name): + return redis_client.get(lock_name) + + def prepare_logger(self): + FORMAT = "[%(asctime)s][%(filename)18s:%(lineno)4s - %(funcName)20s() ] %(message)s" + logging.basicConfig(format=FORMAT) + +task_manager = TaskManager() \ No newline at end of file diff --git a/jellyfin/client.py b/jellyfin/client.py index ae4ab0a..2eb84d7 100644 --- a/jellyfin/client.py +++ b/jellyfin/client.py @@ -310,6 +310,7 @@ class JellyfinClient: response = requests.delete(url, headers=self._get_headers(session_token=session_token), timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") + logging.getLogger('requests').setLevel(logging.WARNING) if response.status_code == 204: # 204 No Content indicates successful deletion return {"status": "success", "message": "Playlist removed successfully"} @@ -318,11 +319,8 @@ class JellyfinClient: def get_item(self, session_token: str, item_id: str): url = f'{self.base_url}/Items/{item_id}' - self.logger.debug(f"Url={url}") - + logging.getLogger('requests').setLevel(logging.WARNING) response = requests.get(url, headers=self._get_headers(session_token=session_token), timeout = self.timeout) - self.logger.debug(f"Response = {response.status_code}") - if response.status_code == 200: return response.json() else: diff --git a/templates/admin/tasks.html b/templates/admin/tasks.html index d0296be..ee75fdb 100644 --- a/templates/admin/tasks.html +++ b/templates/admin/tasks.html @@ -4,6 +4,7 @@ + diff --git a/templates/partials/_task_status.html b/templates/partials/_task_status.html index 53a02ad..da36087 100644 --- a/templates/partials/_task_status.html +++ b/templates/partials/_task_status.html @@ -1,5 +1,12 @@ {% for task_name, task in tasks.items() %} +
Locked Task Name Status Progress
+ {% if task.lock_status %} + + {% else %} + + {% endif %} + {{ task_name }} {{ task.state }}