from flask import flash, redirect, session, url_for from app.models import JellyfinUser, Playlist,Track from app import sp, cache, app, jellyfin ,jellyfin_admin_token, jellyfin_admin_id,device_id, cache from functools import wraps from celery.result import AsyncResult from app.tasks import download_missing_tracks,check_for_playlist_updates, update_all_playlists_track_status, update_jellyfin_id_for_downloaded_tracks from jellyfin.objects import PlaylistMetadata import re TASK_STATUS = { 'update_all_playlists_track_status': None, 'download_missing_tracks': None, 'check_for_playlist_updates': None, 'update_jellyfin_id_for_downloaded_tracks' : None } LOCK_KEYS = [ 'update_all_playlists_track_status_lock', 'download_missing_tracks_lock', 'check_for_playlist_updates_lock', 'update_jellyfin_id_for_downloaded_tracks_lock' , 'full_update_jellyfin_ids' ] def manage_task(task_name): task_id = TASK_STATUS.get(task_name) if task_id: result = AsyncResult(task_id) if result.state in ['PENDING', 'STARTED']: return result.state, result.info if result.info else {} if task_name == 'update_all_playlists_track_status': result = update_all_playlists_track_status.delay() elif task_name == 'download_missing_tracks': result = download_missing_tracks.delay() elif task_name == 'check_for_playlist_updates': result = check_for_playlist_updates.delay() elif task_name == 'update_jellyfin_id_for_downloaded_tracks': result = update_jellyfin_id_for_downloaded_tracks.delay() TASK_STATUS[task_name] = result.id return result.state, result.info if result.info else {} def prepPlaylistData(data): playlists = [] jellyfin_user = JellyfinUser.query.filter_by(jellyfin_user_id=session['jellyfin_user_id']).first() if not jellyfin_user: app.logger.error(f"jellyfin_user not set: session user id: {session['jellyfin_user_id']}. Logout and Login again") return None if not data.get('playlists'): data['playlists']= {} data['playlists']['items'] = [data] for playlist_data in data['playlists']['items']: # Fetch the playlist from the database if it exists db_playlist = Playlist.query.filter_by(spotify_playlist_id=playlist_data['id']).first() if db_playlist: # If the playlist is in the database, use the stored values if isinstance(playlist_data['tracks'],list): track_count = len(playlist_data['tracks'] ) else: track_count = playlist_data['tracks']['total'] or 0 tracks_available = db_playlist.tracks_available or 0 tracks_linked = len([track for track in db_playlist.tracks if track.jellyfin_id]) or 0 percent_available = (tracks_available / track_count * 100) if track_count > 0 else 0 # Determine playlist status if tracks_available == track_count and track_count > 0: status = 'green' # Fully available elif tracks_available > 0: status = 'yellow' # Partially available else: status = 'red' # Not available else: # If the playlist is not in the database, initialize with 0 track_count = 0 tracks_available = 0 tracks_linked = 0 percent_available = 0 status = 'red' # Not requested yet # Append playlist data to the list playlists.append({ 'name': playlist_data['name'], 'description': playlist_data['description'], 'image': playlist_data['images'][0]['url'] if playlist_data['images'] else 'default-image.jpg', 'url': playlist_data['external_urls']['spotify'], 'id': playlist_data['id'], 'jellyfin_id': db_playlist.jellyfin_id if db_playlist else '', 'can_add': (db_playlist not in jellyfin_user.playlists) if db_playlist else True, 'can_remove' : (db_playlist in jellyfin_user.playlists) if db_playlist else False, 'last_updated':db_playlist.last_updated if db_playlist else '', 'last_changed':db_playlist.last_changed if db_playlist else '', 'tracks_available': tracks_available, 'track_count': track_count, 'tracks_linked': tracks_linked, 'percent_available': percent_available, 'status': status # Red, yellow, or green based on availability }) return playlists def get_cached_spotify_playlists(playlist_ids): """ Fetches multiple Spotify playlists by their IDs, utilizing individual caching. :param playlist_ids: A list of Spotify playlist IDs. :return: A dictionary containing the fetched playlists. """ spotify_data = {'playlists': {'items': []}} for playlist_id in playlist_ids: playlist_data = get_cached_spotify_playlist(playlist_id) if playlist_data: spotify_data['playlists']['items'].append(playlist_data) else: app.logger.warning(f"Playlist data for ID {playlist_id} could not be retrieved.") return spotify_data @cache.memoize(timeout=3600) def get_cached_spotify_playlist(playlist_id): """ Fetches a Spotify playlist by its ID, utilizing caching to minimize API calls. :param playlist_id: The Spotify playlist ID. :return: Playlist data as a dictionary, or None if an error occurs. """ try: playlist_data = sp.playlist(playlist_id) # Fetch data from Spotify API return playlist_data except Exception as e: app.logger.error(f"Error fetching playlist {playlist_id} from Spotify: {str(e)}") return None @cache.memoize(timeout=3600*24*10) def get_cached_spotify_track(track_id): """ Fetches a Spotify track by its ID, utilizing caching to minimize API calls. :param track_id: The Spotify playlist ID. :return: Track data as a dictionary, or None if an error occurs. """ try: track_data = sp.track(track_id=track_id) # Fetch data from Spotify API return track_data except Exception as e: app.logger.error(f"Error fetching track {track_id} from Spotify: {str(e)}") return None def prepAlbumData(data): items = [] for item in data['albums']['items']: items.append({ 'name': item['name'], 'description': f"Released: {item['release_date']}", 'image': item['images'][0]['url'] if item['images'] else 'default-image.jpg', 'url': item['external_urls']['spotify'], 'id' : item['id'], 'can_add' : False }) return items def prepArtistData(data): items = [] for item in data['artists']['items']: items.append({ 'name': item['name'], 'description': f"Popularity: {item['popularity']}", 'image': item['images'][0]['url'] if item['images'] else 'default-image.jpg', 'url': item['external_urls']['spotify'], 'id' : item['id'], 'can_add' : False }) return items def getFeaturedPlaylists(country,offset): playlists_data = sp.featured_playlists(country=country, limit=16, offset=offset) return prepPlaylistData(playlists_data), playlists_data['playlists']['total'],'Featured Playlists' def getCategoryPlaylists(category,offset): playlists_data = sp.category_playlists(category_id=category, limit=16, offset=offset) return prepPlaylistData(playlists_data), playlists_data['playlists']['total'],f"Category {playlists_data['message']}" def getCategories(country,offset): categories_data = sp.categories(limit=16, offset= offset) categories = [] for cat in categories_data['categories']['items']: categories.append({ 'name': cat['name'], 'description': '', 'image': cat['icons'][0]['url'] if cat['icons'] else 'default-image.jpg', 'url': f"/playlists?cat={cat['id']}", 'id' : cat['id'], 'type':'category' }) return categories, categories_data['categories']['total'],'Browse Categories' def get_tracks_for_playlist(data): results = data tracks = [] is_admin = session.get('is_admin', False) for idx, item in enumerate(results['tracks']): track_data = item['track'] if track_data: duration_ms = track_data['duration_ms'] minutes = duration_ms // 60000 seconds = (duration_ms % 60000) // 1000 track_db = Track.query.filter_by(spotify_track_id=track_data['id']).first() if track_db: downloaded = track_db.downloaded filesystem_path = track_db.filesystem_path if is_admin else None jellyfin_id = track_db.jellyfin_id download_status = track_db.download_status else: downloaded = False filesystem_path = None jellyfin_id = None download_status = None tracks.append({ 'title': track_data['name'], 'artist': ', '.join([artist['name'] for artist in track_data['artists']]), 'url': track_data['external_urls']['spotify'], 'duration': f'{minutes}:{seconds:02d}', 'preview_url': track_data['preview_url'], 'downloaded': downloaded, 'filesystem_path': filesystem_path, 'jellyfin_id': jellyfin_id, 'spotify_id': track_data['id'], 'duration_ms': duration_ms, 'download_status' : download_status }) return tracks def get_full_playlist_data(playlist_id): playlist_data = get_cached_spotify_playlist(playlist_id) all_tracks = [] offset = 0 while True: response = sp.playlist_items(playlist_id, offset=offset, limit=100) items = response['items'] all_tracks.extend(items) if len(items) < 100: break offset += 100 playlist_data['tracks'] = all_tracks playlist_data['prepped_data'] = prepPlaylistData(playlist_data) return playlist_data def jellyfin_login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'jellyfin_user_name' not in session: flash('You need to log in using your Jellyfin Credentials to access this page.', 'warning') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function def jellyfin_admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session['is_admin']: flash('You need to be a Jellyfin admin.', 'warning') return 404 # Redirect to your login route return f(*args, **kwargs) return decorated_function def update_playlist_metadata(playlist,spotify_playlist_data): metadata = PlaylistMetadata() metadata.Tags = [f'jellyplist:playlist:{playlist.id}',f'{playlist.tracks_available} of {playlist.track_count} Tracks available'] metadata.Overview = spotify_playlist_data['description'] jellyfin.update_playlist_metadata(session_token=_get_api_token(),playlist_id=playlist.jellyfin_id,updates= metadata , user_id= _get_admin_id()) if spotify_playlist_data['images'] != None: jellyfin.set_playlist_cover_image(session_token= _get_api_token(),playlist_id= playlist.jellyfin_id,spotify_image_url= spotify_playlist_data['images'][0]['url']) def _get_token_from_sessioncookie() -> str: return session['jellyfin_access_token'] def _get_api_token() -> str: #return app.config['JELLYFIN_ACCESS_TOKEN'] return jellyfin_admin_token def _get_logged_in_user(): return JellyfinUser.query.filter_by(jellyfin_user_id=session['jellyfin_user_id']).first() def _get_admin_id(): #return JellyfinUser.query.filter_by(is_admin=True).first().jellyfin_user_id return jellyfin_admin_id def get_longest_substring(input_string): special_chars = ["'", "’", "‘", "‛", "`", "´", "‘"] pattern = "[" + re.escape("".join(special_chars)) + "]" substrings = re.split(pattern, input_string) longest_substring = max(substrings, key=len, default="") return longest_substring