From b010c8950ee545cb3f3a6aeefd89579402c79ed0 Mon Sep 17 00:00:00 2001 From: Kamil Date: Mon, 25 Nov 2024 17:39:22 +0000 Subject: [PATCH] tasks rework: - more verbose logging - better handling of already downloaded files - perform upgrades if file with better quality is found in jellyfin --- app/tasks.py | 332 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 228 insertions(+), 104 deletions(-) diff --git a/app/tasks.py b/app/tasks.py index 906f2ad..f8c4bb5 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -1,4 +1,5 @@ from datetime import datetime,timezone +import logging import subprocess from sqlalchemy import insert @@ -7,7 +8,7 @@ from app import celery, app, db, functions, sp, jellyfin, jellyfin_admin_token, from app.models import JellyfinUser,Playlist,Track, user_playlists, playlist_tracks import os import redis -from celery import current_task +from celery import current_task,signals import asyncio import requests @@ -17,6 +18,16 @@ def acquire_lock(lock_name, expiration=60): def release_lock(lock_name): redis_client.delete(lock_name) +def prepare_logger(): + FORMAT = "[%(asctime)s][%(filename)18s:%(lineno)4s - %(funcName)20s() ] %(message)s" + logging.basicConfig(format=FORMAT) + +@signals.celeryd_init.connect +def setup_log_format(sender, conf, **kwargs): + FORMAT = "[%(asctime)s][%(filename)18s:%(lineno)4s - %(funcName)23s() ] %(levelname)7s - %(message)s" + + conf.worker_log_format = FORMAT.strip().format(sender) + conf.worker_task_log_format = FORMAT.format(sender) @celery.task(bind=True) def update_all_playlists_track_status(self): @@ -37,7 +48,7 @@ def update_all_playlists_track_status(self): for playlist in playlists: total_tracks = 0 available_tracks = 0 - + app.logger.debug(f"Current Playlist: {playlist.name} [{playlist.id}:{playlist.spotify_playlist_id}]" ) for track in playlist.tracks: total_tracks += 1 if track.filesystem_path and os.path.exists(track.filesystem_path): @@ -92,23 +103,28 @@ def download_missing_tracks(self): processed_tracks = 0 failed_downloads = 0 for track in undownloaded_tracks: - app.logger.info(f"Processing track: {track.name} ({track.spotify_track_id})") + app.logger.info(f"Processing track: {track.name} [{track.spotify_track_id}]") # Check if the track already exists in the output directory file_path = f"{output_dir.replace('{track-id}', track.spotify_track_id)}.mp3" - - if os.path.exists(file_path): - app.logger.info(f"Track {track.name} is already downloaded at {file_path}. Marking as downloaded.") - track.downloaded = True - track.filesystem_path = file_path - db.session.commit() - continue - - # If search_before_download is enabled, perform matching + # region search before download if search_before_download: app.logger.info(f"Searching for track in Jellyfin: {track.name}") - # Retrieve the Spotify track and preview URL spotify_track = functions.get_cached_spotify_track(track.spotify_track_id) + # at first try to find the track without fingerprinting it + best_match = find_best_match_from_jellyfin(track) + if best_match: + track.downloaded = True + if track.jellyfin_id != best_match['Id']: + track.jellyfin_id = best_match['Id'] + app.logger.info(f"Updated Jellyfin ID for track: {track.name} ({track.spotify_track_id})") + if track.filesystem_path != best_match['Path']: + track.filesystem_path = best_match['Path'] + + db.session.commit() + continue + + # region search with fingerprinting preview_url = spotify_track.get('preview_url') if not preview_url: app.logger.error(f"Preview URL not found for track {track.name}.") @@ -133,6 +149,18 @@ def download_missing_tracks(self): continue else: app.logger.info(f"No match found in Jellyfin for track {track.name}. Proceeding to download.") + #endregion + + #endregion + + if os.path.exists(file_path): + app.logger.info(f"Track {track.name} is already downloaded at {file_path}. Marking as downloaded.") + track.downloaded = True + track.filesystem_path = file_path + db.session.commit() + continue + + # Attempt to download the track using spotdl try: @@ -157,6 +185,10 @@ def download_missing_tracks(self): app.logger.info(f"Track {track.name} downloaded successfully to {file_path}.") else: app.logger.error(f"Download failed for track {track.name}.") + if result.stdout: + app.logger.error(f"\t stdout: {result.stdout}") + if result.stderr: + app.logger.error(f"\t stderr: {result.stderr} ") failed_downloads += 1 track.download_status = result.stdout[:2048] except Exception as e: @@ -213,72 +245,73 @@ def check_for_playlist_updates(self): for playlist in playlists: playlist.last_updated = datetime.now( timezone.utc) sp_playlist = sp.playlist(playlist.spotify_playlist_id) - + full_update = True app.logger.info(f'Checking updates for playlist: {playlist.name}, s_snapshot = {sp_playlist['snapshot_id']}') db.session.commit() if sp_playlist['snapshot_id'] == playlist.snapshot_id: app.logger.info(f'playlist: {playlist.name} , no changes detected, snapshot_id {sp_playlist['snapshot_id']}') - continue + full_update = False try: #region Check for updates # Fetch all playlist data from Spotify - spotify_tracks = {} - offset = 0 - playlist.snapshot_id = sp_playlist['snapshot_id'] - while True: - playlist_data = sp.playlist_items(playlist.spotify_playlist_id, offset=offset, limit=100) - items = playlist_data['items'] - spotify_tracks.update({offset + idx: track['track'] for idx, track in enumerate(items) if track['track']}) + if full_update: + spotify_tracks = {} + offset = 0 + playlist.snapshot_id = sp_playlist['snapshot_id'] + while True: + playlist_data = sp.playlist_items(playlist.spotify_playlist_id, offset=offset, limit=100) + items = playlist_data['items'] + spotify_tracks.update({offset + idx: track['track'] for idx, track in enumerate(items) if track['track']}) + + if len(items) < 100: # No more tracks to fetch + break + offset += 100 # Move to the next batch - if len(items) < 100: # No more tracks to fetch - break - offset += 100 # Move to the next batch - - existing_tracks = {track.spotify_track_id: track for track in playlist.tracks} + existing_tracks = {track.spotify_track_id: track for track in playlist.tracks} - # Determine tracks to add and remove - tracks_to_add = [] - for idx, track_info in spotify_tracks.items(): - if track_info: - track_id = track_info['id'] - if track_id not in existing_tracks: - track = Track.query.filter_by(spotify_track_id=track_id).first() - if not track: - track = Track(name=track_info['name'], spotify_track_id=track_id, spotify_uri=track_info['uri'], downloaded=False) - db.session.add(track) - db.session.commit() - app.logger.info(f'Added new track: {track.name}') - tracks_to_add.append((track, idx)) + # Determine tracks to add and remove + tracks_to_add = [] + for idx, track_info in spotify_tracks.items(): + if track_info: + track_id = track_info['id'] + if track_id not in existing_tracks: + track = Track.query.filter_by(spotify_track_id=track_id).first() + if not track: + track = Track(name=track_info['name'], spotify_track_id=track_id, spotify_uri=track_info['uri'], downloaded=False) + db.session.add(track) + db.session.commit() + app.logger.info(f'Added new track: {track.name}') + tracks_to_add.append((track, idx)) - tracks_to_remove = [ - existing_tracks[track_id] - for track_id in existing_tracks - if track_id not in {track['id'] for track in spotify_tracks.values() if track} - ] + tracks_to_remove = [ + existing_tracks[track_id] + for track_id in existing_tracks + if track_id not in {track['id'] for track in spotify_tracks.values() if track} + ] - if tracks_to_add or tracks_to_remove: - playlist.last_changed = datetime.now( timezone.utc) + if tracks_to_add or tracks_to_remove: + playlist.last_changed = datetime.now( timezone.utc) - # Add and remove tracks while maintaining order - - if tracks_to_add: + # Add and remove tracks while maintaining order - for track, track_order in tracks_to_add: - stmt = insert(playlist_tracks).values( - playlist_id=playlist.id, - track_id=track.id, - track_order=track_order - ) - db.session.execute(stmt) - db.session.commit() - app.logger.info(f'Added {len(tracks_to_add)} tracks to playlist: {playlist.name}') + if tracks_to_add: + + for track, track_order in tracks_to_add: + stmt = insert(playlist_tracks).values( + playlist_id=playlist.id, + track_id=track.id, + track_order=track_order + ) + db.session.execute(stmt) + db.session.commit() + app.logger.info(f'Added {len(tracks_to_add)} tracks to playlist: {playlist.name}') - if tracks_to_remove: - for track in tracks_to_remove: - playlist.tracks.remove(track) - db.session.commit() - app.logger.info(f'Removed {len(tracks_to_remove)} tracks from playlist: {playlist.name}') - #endregion + if tracks_to_remove: + for track in tracks_to_remove: + playlist.tracks.remove(track) + db.session.commit() + app.logger.info(f'Removed {len(tracks_to_remove)} tracks from playlist: {playlist.name}') + #endregion #region Update Playlist Items and Metadata functions.update_playlist_metadata(playlist, sp_playlist) @@ -314,62 +347,41 @@ def check_for_playlist_updates(self): @celery.task(bind=True) def update_jellyfin_id_for_downloaded_tracks(self): lock_key = "update_jellyfin_id_for_downloaded_tracks_lock" - + full_update_key = 'full_update_jellyfin_ids' if acquire_lock(lock_key, expiration=600): # Lock for 10 minutes try: - app.logger.info("Starting Jellyfin ID update for downloaded tracks...") + app.logger.info("Starting Jellyfin ID update for tracks...") with app.app_context(): downloaded_tracks = Track.query.filter_by(downloaded=True, jellyfin_id=None).all() + + if acquire_lock(full_update_key, expiration=60*60*24): + app.logger.info(f"performing full update on jellyfin track ids. (Update tracks and playlists if better quality will be found)") + downloaded_tracks = Track.query.all() + else: + app.logger.debug(f"doing update on tracks with downloaded = True and jellyfin_id = None") total_tracks = len(downloaded_tracks) if not downloaded_tracks: app.logger.info("No downloaded tracks without Jellyfin ID found.") return {'status': 'No tracks to update'} - app.logger.info(f"Found {total_tracks} tracks to update with Jellyfin IDs.") + app.logger.info(f"Found {total_tracks} tracks to update ") processed_tracks = 0 for track in downloaded_tracks: - app.logger.info(f"Fetching track details from Spotify: {track.name} ({track.spotify_track_id})") - search_results = jellyfin.search_music_tracks(jellyfin_admin_token,track.name) - spotify_track = None - try: - best_match = None - for result in search_results: - # if there is only one result , assume it´s the right track. - if len(search_results) == 1: - best_match = result - break - # Ensure the result is structured as expected - jellyfin_track_name = result.get('Name', '').lower() - jellyfin_artists = [artist.lower() for artist in result.get('Artists', [])] - jellyfin_path = result.get('Path','') - if jellyfin_path == track.filesystem_path: - best_match = result - break - elif not spotify_track: - try: - spotify_track = functions.get_cached_spotify_track(track.spotify_track_id) - spotify_track_name = spotify_track['name'] - spotify_artists = [artist['name'] for artist in spotify_track['artists']] - spotify_album = spotify_track['album']['name'] - except Exception as e: - app.logger.error(f"Error fetching track details from Spotify for {track.name}: {str(e)}") - continue - # Compare name, artists, and album (case-insensitive comparison) - if (spotify_track_name.lower() == jellyfin_track_name and - set(artist.lower() for artist in spotify_artists) == set(jellyfin_artists) ): - best_match = result - break # Stop when a match is found - - # Step 4: If a match is found, update jellyfin_id + best_match = find_best_match_from_jellyfin(track) if best_match: - track.jellyfin_id = best_match['Id'] + track.downloaded = True + if track.jellyfin_id != best_match['Id']: + track.jellyfin_id = best_match['Id'] + app.logger.info(f"Updated Jellyfin ID for track: {track.name} ({track.spotify_track_id})") + if track.filesystem_path != best_match['Path']: + track.filesystem_path = best_match['Path'] + db.session.commit() - app.logger.info(f"Updated Jellyfin ID for track: {track.name} ({track.spotify_track_id})") else: - app.logger.info(f"No matching track found in Jellyfin for {track.name}.") + app.logger.warning(f"No matching track found in Jellyfin for {track.name}.") spotify_track = None @@ -388,3 +400,115 @@ def update_jellyfin_id_for_downloaded_tracks(self): else: app.logger.info("Skipping task. Another instance is already running.") return {'status': 'Task skipped, another instance is running'} + +def find_best_match_from_jellyfin(track: Track): + app.logger.debug(f"Trying to find best match from Jellyfin server for track: {track.name}") + search_results = jellyfin.search_music_tracks(jellyfin_admin_token, functions.get_longest_substring(track.name)) + spotify_track = None + try: + best_match = None + best_quality_score = -1 # Initialize with the lowest possible score + + + for result in search_results: + if len(search_results) == 1: + app.logger.debug(f"Only 1 search_result, assuming best match: {result['Id']} ({app.config['JELLYFIN_SERVER_URL']}/web/#/details?id={result['Id']})") + best_match = result + break + + jellyfin_path = result.get('Path', '') + # if jellyfin_path == track.filesystem_path: + # app.logger.debug(f"Best match found through equal file-system paths: {result['Id']} ({app.config['JELLYFIN_SERVER_URL']}/web/#/details?id={result['Id']})") + # best_match = result + # break + + if not spotify_track: + try: + spotify_track = functions.get_cached_spotify_track(track.spotify_track_id) + spotify_track_name = spotify_track['name'] + spotify_artists = [artist['name'] for artist in spotify_track['artists']] + except Exception as e: + app.logger.error(f"Error fetching track details from Spotify for {track.name}: {str(e)}") + continue + + jellyfin_track_name = result.get('Name', '').lower() + jellyfin_artists = [artist.lower() for artist in result.get('Artists', [])] + if (spotify_track_name.lower() == jellyfin_track_name and + set(artist.lower() for artist in spotify_artists) == set(jellyfin_artists)): + quality_score = compute_quality_score(result, app.config['FIND_BEST_MATCH_USE_FFPROBE']) + app.logger.debug(f"Quality score for track {result['Name']}: {quality_score} [{result['Path']}]") + + if quality_score > best_quality_score: + best_match = result + best_quality_score = quality_score + + return best_match + except Exception as e: + app.logger.error(f"Error searching Jellyfin for track {track.name}: {str(e)}") + return None + +def compute_quality_score(result, use_ffprobe=False): + """ + Compute a quality score for a track based on its metadata or detailed analysis using ffprobe. + """ + score = 0 + container = result.get('Container', '').lower() + if container == 'flac': + score += 100 + elif container == 'wav': + score += 50 + elif container == 'mp3': + score += 10 + elif container == 'aac': + score += 5 + + if result.get('HasLyrics'): + score += 10 + + runtime_ticks = result.get('RunTimeTicks', 0) + score += runtime_ticks / 1e6 + + if use_ffprobe: + path = result.get('Path') + if path: + ffprobe_score = analyze_audio_quality_with_ffprobe(path) + score += ffprobe_score + else: + app.logger.warning(f"No valid file path for track {result.get('Name')} - Skipping ffprobe analysis.") + + return score + +def analyze_audio_quality_with_ffprobe(filepath): + """ + Use ffprobe to extract quality attributes from an audio file and compute a score. + """ + try: + # ffprobe command to extract bitrate, sample rate, and channel count + cmd = [ + 'ffprobe', '-v', 'error', '-select_streams', 'a:0', + '-show_entries', 'stream=bit_rate,sample_rate,channels', + '-show_format', + '-of', 'json', filepath + ] + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: + app.logger.error(f"ffprobe error for file {filepath}: {result.stderr}") + return 0 + + # Parse ffprobe output + import json + data = json.loads(result.stdout) + stream = data.get('streams', [{}])[0] + bitrate = int(stream.get('bit_rate', 0)) // 1000 # Convert to kbps + if bitrate == 0: + bitrate = int(data.get('format')['bit_rate']) // 1000 + sample_rate = int(stream.get('sample_rate', 0)) # Hz + + channels = int(stream.get('channels', 0)) + + # Compute score based on extracted quality parameters + score = bitrate + (sample_rate // 1000) + (channels * 10) # Example scoring formula + return score + except Exception as e: + app.logger.error(f"Error analyzing audio quality with ffprobe: {str(e)}") + return 0 \ No newline at end of file