import os import re import subprocess import tempfile from typing import Optional import numpy as np import requests import base64 import acoustid import chromaprint import logging from jellyfin.objects import PlaylistMetadata def _clean_query(query): # Regex to match any word containing problematic characters: ', `, or ´ pattern = re.compile(r"[`´'’]") # Split the query into words and filter out words with problematic characters cleaned_words = [word for word in query.split() if not pattern.search(word)] # Join the cleaned words back into a query string cleaned_query = " ".join(cleaned_words) return cleaned_query class JellyfinClient: def __init__(self, base_url, timeout = 10): """ Initialize the Jellyfin client with the base URL of the server. :param base_url: The base URL of the Jellyfin server (e.g., 'http://localhost:8096') """ self.base_url = base_url self.timeout = timeout self.logger = logging.getLogger(self.__class__.__name__) self.logger.setLevel(os.getenv('LOG_LEVEL', 'INFO').upper()) FORMAT = "[%(asctime)s][%(filename)18s:%(lineno)4s - %(funcName)23s() ] %(levelname)7s - %(message)s" logging.basicConfig(format=FORMAT) self.logger.debug(f"Initialized Jellyfin API Client. Base = '{self.base_url}', timeout = {timeout}") def _get_headers(self, session_token: str): """ Get the authentication headers for requests. :return: A dictionary of headers """ return { 'X-Emby-Token': session_token, } def login_with_password(self, username: str, password: str, device_id = 'JellyPlist'): """ Log in to Jellyfin using a username and password. :param username: The username of the user. :param password: The password of the user. :return: Access token and user ID """ url = f'{self.base_url}/Users/AuthenticateByName' headers = { 'Content-Type': 'application/json', 'X-Emby-Authorization': f'MediaBrowser Client="JellyPlist", Device="Web", DeviceId="{device_id}", Version="1.0"' } data = { 'Username': username, 'Pw': password } self.logger.debug(f"Url={url}") response = requests.post(url, json=data, headers=headers) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 200: result = response.json() return result['AccessToken'], result['User']['Id'], result['User']['Name'],result['User']['Policy']['IsAdministrator'] else: raise Exception(f"Login failed: {response.content}") def create_music_playlist(self, session_token: str, name: str, song_ids, user_id : str): """ Create a new music playlist. :param access_token: The access token of the authenticated user. :param user_id: The user ID. :param name: The name of the playlist. :param song_ids: A list of song IDs to include in the playlist. :return: The newly created playlist object """ url = f'{self.base_url}/Playlists' data = { 'Name': name, 'UserId': user_id, 'MediaType': 'Audio', 'Ids': ','.join(song_ids), # Join song IDs with commas 'IsPublic' : False } self.logger.debug(f"Url={url}") response = requests.post(url, json=data, headers=self._get_headers(session_token=session_token), timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 200: return response.json() else: raise Exception(f"Failed to create playlist: {response.content}") def update_music_playlist(self, session_token: str, playlist_id: str, song_ids): """ Update an existing music playlist by adding or removing songs. :param playlist_id: The ID of the playlist to update. :param song_ids: A list of song IDs to include in the playlist. :return: The updated playlist object """ url = f'{self.base_url}/Playlists/{playlist_id}/Items' data = { 'Ids': ','.join(song_ids) # Join song IDs with commas } self.logger.debug(f"Url={url}") response = requests.post(url, json=data, headers=self._get_headers(session_token=session_token), timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 204: # 204 No Content indicates success for updating return {"status": "success", "message": "Playlist updated successfully"} else: raise Exception(f"Failed to update playlist: {response.content}") def get_playlist_metadata(self, session_token: str, user_id: str,playlist_id: str) -> PlaylistMetadata: url = f'{self.base_url}/Items/{playlist_id}' params = { 'UserId' : user_id } self.logger.debug(f"Url={url}") response = requests.get(url, headers=self._get_headers(session_token=session_token), timeout = self.timeout, params = params) self.logger.debug(f"Response = {response.status_code}") if response.status_code != 200: raise Exception(f"Failed to fetch playlist metadata: {response.content}") return PlaylistMetadata( response.json()) def update_playlist_metadata(self, session_token: str, user_id: str, playlist_id: str, updates: PlaylistMetadata): """ Update the metadata of an existing playlist using a PlaylistMetadata object. :param session: The user's session containing the Jellyfin access token. :param user_id: the user id, since we are updating the playlist using an api key, we do it with the user id of the first logged in admin :param playlist_id: The ID of the playlist to update. :param updates: A PlaylistMetadata object containing the metadata to update. :return: Response data indicating the result of the update operation. """ # Fetch the existing metadata for the playlist params = { 'UserId' : user_id } # Initialize PlaylistMetadata with current data and apply updates metadata_obj = self.get_playlist_metadata(session_token=session_token, user_id= user_id, playlist_id= playlist_id) # Update only the provided fields in the updates object for key, value in updates.to_dict().items(): if value is not None: setattr(metadata_obj, key, value) # Send the updated metadata to Jellyfin url = f'{self.base_url}/Items/{playlist_id}' self.logger.debug(f"Url={url}") response = requests.post(url, json=metadata_obj.to_dict(), headers=self._get_headers(session_token= session_token), timeout = self.timeout, params = params) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 204: return {"status": "success", "message": "Playlist metadata updated successfully"} else: raise Exception(f"Failed to update playlist metadata: {response.content} \nReason: {response.reason}") def get_playlists(self, session_token: str): """ Get all music playlists for the currently authenticated user. :return: A list of the user's music playlists """ url = f'{self.base_url}/Items' params = { 'IncludeItemTypes': 'Playlist', # Retrieve only playlists 'Recursive': 'true', # Include nested playlists 'Fields': 'OpenAccess' # Fields we want } self.logger.debug(f"Url={url}") response = requests.get(url, headers=self._get_headers(session_token=session_token), params=params , timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 200: return response.json()['Items'] else: raise Exception(f"Failed to get playlists: {response.content}") def get_libraries(self, session_token: str): url = f'{self.base_url}/Library/VirtualFolders' params = { } self.logger.debug(f"Url={url}") response = requests.get(url, headers=self._get_headers(session_token=session_token), params=params , timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 200: return response.json() else: raise Exception(f"Failed to get playlists: {response.content}") def refresh_library(self, session_token: str, library_id: str) -> bool: url = f'{self.base_url}/Items/{library_id}/Refresh' params = { "Recursive": "true", "ImageRefreshMode": "Default", "MetadataRefreshMode": "Default", "ReplaceAllImages": "false", "RegenerateTrickplay": "false", "ReplaceAllMetadata": "false" } self.logger.debug(f"Url={url}") response = requests.post(url, headers=self._get_headers(session_token=session_token), params=params , timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 204: return True else: raise Exception(f"Failed to update library: {response.content}") def search_music_tracks(self, session_token: str, search_query: str): """ Search for music tracks by title, song name, and optionally Spotify-ID. :param search_query: The search term (title or song name). :return: A list of matching songs. """ url = f'{self.base_url}/Items' params = { 'SearchTerm': search_query.replace('\'',"´").replace('’','´'), 'IncludeItemTypes': 'Audio', # Search only for audio items 'Recursive': 'true', # Search within all folders 'Fields': 'Name,Id,Album,Artists,Path' # Retrieve the name and ID of the song } self.logger.debug(f"Url={url}") response = requests.get(url, headers=self._get_headers(session_token=session_token), params=params, timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 200: return response.json()['Items'] else: raise Exception(f"Failed to search music tracks: {response.content}") def add_songs_to_playlist(self, session_token: str, user_id: str, playlist_id: str, song_ids: list[str]): """ Add songs to an existing playlist. :param playlist_id: The ID of the playlist to update. :param song_ids: A list of song IDs to add. :return: A success message. """ # Construct the API URL with query parameters url = f'{self.base_url}/Playlists/{playlist_id}/Items' params = { 'ids': ','.join(song_ids), # Comma-separated song IDs 'userId': user_id } self.logger.debug(f"Url={url}") # Send the request to Jellyfin API with query parameters response = requests.post(url, headers=self._get_headers(session_token=session_token), params=params, timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") # Check for success if response.status_code == 204: # 204 No Content indicates success return {"status": "success", "message": "Songs added to playlist successfully"} else: raise Exception(f"Failed to add songs to playlist: {response.status_code} - {response.content}") def remove_songs_from_playlist(self, session_token: str, playlist_id: str, song_ids): """ Remove songs from an existing playlist. :param playlist_id: The ID of the playlist to update. :param song_ids: A list of song IDs to remove. :return: A success message. """ url = f'{self.base_url}/Playlists/{playlist_id}/Items' params = { 'EntryIds': ','.join(song_ids) # Join song IDs with commas } self.logger.debug(f"Url={url}") response = requests.delete(url, headers=self._get_headers(session_token=session_token), params=params, timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 204: # 204 No Content indicates success for updating return {"status": "success", "message": "Songs removed from playlist successfully"} else: raise Exception(f"Failed to remove songs from playlist: {response.content}") def remove_item(self, session_token: str, playlist_id: str): """ Remove an existing playlist by its ID. :param playlist_id: The ID of the playlist to remove. :return: A success message upon successful deletion. """ url = f'{self.base_url}/Items/{playlist_id}' self.logger.debug(f"Url={url}") response = requests.delete(url, headers=self._get_headers(session_token=session_token), timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") logging.getLogger('requests').setLevel(logging.WARNING) if response.status_code == 204: # 204 No Content indicates successful deletion return {"status": "success", "message": "Playlist removed successfully"} else: raise Exception(f"Failed to remove playlist: {response.content}") def get_item(self, session_token: str, item_id: str): url = f'{self.base_url}/Items/{item_id}' logging.getLogger('requests').setLevel(logging.WARNING) response = requests.get(url, headers=self._get_headers(session_token=session_token), timeout = self.timeout) if response.status_code == 200: return response.json() else: raise Exception(f"Failed to get item: {response.content}") def remove_user_from_playlist(self, session_token: str, playlist_id: str, user_id: str): """ Remove a user from a playlist. :param session: The user's session containing the Jellyfin access token. :param playlist_id: The ID of the playlist from which to remove the user. :param user_id: The ID of the user to be removed from the playlist. :return: Success message or raises an exception on failure. """ # Construct the API endpoint URL url = f'{self.base_url}/Playlists/{playlist_id}/Users/{user_id}' self.logger.debug(f"Url={url}") # Send the DELETE request to remove the user from the playlist response = requests.delete(url, headers=self._get_headers(session_token= session_token), timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if response.status_code == 204: # 204 No Content indicates the user was successfully removed return {"status": "success", "message": f"User {user_id} removed from playlist {playlist_id}"} else: # Raise an exception if the request failed raise Exception(f"Failed to remove user from playlist: {response.content}") def remove_user_from_playlist2(self, session_token: str, playlist_id: str, user_id: str, admin_user_id : str): #TODO: This is a workaround for the issue where the above method does not work metadata = self.get_playlist_metadata(session_token= session_token, user_id= admin_user_id, playlist_id= playlist_id) # Construct the API URL url = f'{self.base_url}/Playlists/{playlist_id}' users_data = [] current_users = self.get_playlist_users(session_token=session_token, playlist_id= playlist_id) for cu in current_users: # This way we remove the user if cu['UserId'] != user_id: users_data.append({'UserId': cu['UserId'], 'CanEdit': cu['CanEdit']}) data = { 'Users' : users_data } # Prepare the headers headers = self._get_headers(session_token=session_token) # Send the request to Jellyfin API response = requests.post(url, headers=headers, json=data,timeout = self.timeout) # Check for success if response.status_code == 204: self.update_playlist_metadata(session_token= session_token, user_id= admin_user_id, playlist_id= playlist_id , updates= metadata) return {"status": "success", "message": f"Users added to playlist {playlist_id}."} else: raise Exception(f"Failed to add users to playlist: {response.status_code} - {response.content}") def set_playlist_cover_image(self, session_token: str, playlist_id: str, provider_image_url: str): """ Set the cover image of a playlist in Jellyfin using an image URL from Spotify. :param session: The user's session containing the Jellyfin access token. :param playlist_id: The ID of the playlist in Jellyfin. :param spotify_image_url: The URL of the image from Spotify. :return: Success message or raises an exception on failure. """ # Step 1: Download the image from the Spotify URL response = requests.get(provider_image_url, timeout = self.timeout) if response.status_code != 200: raise Exception(f"Failed to download image from Spotify: {response.content}") # Step 2: Check the image content type (assume it's JPEG or PNG based on the content type from the response) content_type = response.headers.get('Content-Type').lower() if content_type not in ['image/jpeg', 'image/png', 'image/webp', 'application/octet-stream']: raise Exception(f"Unsupported image format: {content_type}") # Todo: if content_type == 'application/octet-stream': content_type = 'image/jpeg' # Step 3: Encode the image content as Base64 image_base64 = base64.b64encode(response.content).decode('utf-8') # Step 4: Prepare the headers for the Jellyfin API request headers = self._get_headers(session_token= session_token) headers['Content-Type'] = content_type # Set to the correct image type headers['Accept'] = '*/*' # url 5: Upload the Base64-encoded image to Jellyfin as a plain string in the request body url = f'{self.base_url}/Items/{playlist_id}/Images/Primary' self.logger.debug(f"Url={url}") # Send the Base64-encoded image data upload_response = requests.post(url, headers=headers, data=image_base64, timeout = self.timeout) self.logger.debug(f"Response = {response.status_code}") if upload_response.status_code == 204: # 204 No Content indicates success return {"status": "success", "message": "Playlist cover image updated successfully"} else: raise Exception(f"Failed to upload image to Jellyfin: {upload_response.status_code} - {upload_response.content}") def add_users_to_playlist(self, session_token: str,user_id: str, playlist_id: str, user_ids: list[str], can_edit: bool = False): """ Add users to a Jellyfin playlist with no editing rights by default. :param session: The user's session containing the Jellyfin access token. :param playlist_id: The ID of the playlist in Jellyfin. :param user_ids: List of user IDs to add to the playlist. :param can_edit: Set to True if users should have editing rights (default is False). :return: Success message or raises an exception on failure. """ # FOr some reason when updating the users, all metadata gets wiped metadata = self.get_playlist_metadata(session_token= session_token, user_id= user_id, playlist_id= playlist_id) # Construct the API URL url = f'{self.base_url}/Playlists/{playlist_id}' users_data = [{'UserId': user_id, 'CanEdit': can_edit} for user_id in user_ids] # get current users: current_users = self.get_playlist_users(session_token=session_token, playlist_id= playlist_id) for cu in current_users: users_data.append({'UserId': cu['UserId'], 'CanEdit': cu['CanEdit']}) data = { 'Users' : users_data } # Prepare the headers headers = self._get_headers(session_token=session_token) # Send the request to Jellyfin API response = requests.post(url, headers=headers, json=data,timeout = self.timeout) # Check for success if response.status_code == 204: self.update_playlist_metadata(session_token= session_token, user_id= user_id, playlist_id= playlist_id , updates= metadata) return {"status": "success", "message": f"Users added to playlist {playlist_id}."} else: raise Exception(f"Failed to add users to playlist: {response.status_code} - {response.content}") def get_me(self, session_token: str): """ """ me_url = f'{self.base_url}/Users/Me' response = requests.get(me_url, headers=self._get_headers(session_token=session_token), timeout = self.timeout) if response.status_code == 200: return response.json() else: raise Exception(f"Failed to get playlists: {response.content}") def get_playlist_users(self, session_token: str, playlist_id: str): url = f'{self.base_url}/Playlists/{playlist_id}/Users' response = requests.get(url, headers=self._get_headers(session_token=session_token), timeout = self.timeout) if response.status_code != 200: raise Exception(f"Failed to fetch playlist metadata: {response.content}") return response.json() def get_users(self, session_token: str, user_id: Optional[str] = None): url = f'{self.base_url}/Users' if user_id: url = f'{url}/{user_id}' response = requests.get(url, headers=self._get_headers(session_token=session_token), timeout = self.timeout) if response.status_code != 200: raise Exception(f"Failed to fetch users: {response.content}") return response.json() def search_track_in_jellyfin(self, session_token: str, preview_url: str, song_name: str, artist_names: list): """ Search for a track in Jellyfin by comparing the preview audio to tracks in the library. :param session_token: The session token for Jellyfin API access. :param preview_url: The URL to the Spotify preview audio. :param song_name: The name of the song to search for. :param artist_names: A list of artist names. :return: Tuple (match_found: bool, jellyfin_file_path: Optional[str]) """ try: # Download the Spotify preview audio self.logger.debug(f"Downloading preview {preview_url} to tmp file") tmp = self.download_preview_to_tempfile(preview_url=preview_url) if tmp is None: self.logger.error(f"Downloading preview {preview_url} to tmp file failed, not continuing") return False, None # Convert the preview file to a normalized WAV file self.logger.debug(f"Converting preview to WAV file") tmp_wav = self.convert_to_wav(tmp) if tmp_wav is None: self.logger.error(f"Converting preview to WAV failed, not continuing") os.remove(tmp) return False, None # Fingerprint the normalized preview WAV file self.logger.debug(f"Performing fingerprinting on preview {tmp_wav}") _, tmp_fp = acoustid.fingerprint_file(tmp_wav) tmp_fp_dec, version = chromaprint.decode_fingerprint(tmp_fp) tmp_fp_dec = np.array(tmp_fp_dec, dtype=np.uint32) self.logger.debug(f"decoded fingerprint for preview: {tmp_fp_dec[:5]}") # Search for matching tracks in Jellyfin using only the song name search_query = song_name # Only use the song name in the search query jellyfin_results = self.search_music_tracks(session_token, search_query) matches = [] # Prepare the list of Spotify artists in lowercase spotify_artists = [artist.lower() for artist in artist_names] for result in jellyfin_results: jellyfin_artists = [artist.lower() for artist in result.get("Artists", [])] # Check for matching artists u artist_match = any(artist in spotify_artists for artist in jellyfin_artists) if not artist_match: continue # Skip if no artist matches jellyfin_file_path = result.get("Path") if not jellyfin_file_path: continue # Convert the full Jellyfin track to a normalized WAV file jellyfin_wav = self.convert_to_wav(jellyfin_file_path) if jellyfin_wav is None: continue # Fingerprint the normalized Jellyfin WAV file _, full_fp = acoustid.fingerprint_file(jellyfin_wav) full_fp_dec, version2 = chromaprint.decode_fingerprint(full_fp) full_fp_dec = np.array(full_fp_dec, dtype=np.uint32) # Compare fingerprints using the sliding similarity function sim, best_offset = self.sliding_fingerprint_similarity(full_fp_dec, tmp_fp_dec) # Clean up temporary files os.remove(jellyfin_wav) # Store the match data matches.append({ 'jellyfin_file_path': jellyfin_file_path, 'similarity': sim, 'best_offset': best_offset, 'track_name': result.get('Name'), 'artists': jellyfin_artists, }) # Clean up the preview files os.remove(tmp_wav) os.remove(tmp) # After processing all tracks, select the best match if matches: best_match = max(matches, key=lambda x: x['similarity']) if best_match['similarity'] > 60: # Adjust the threshold as needed return True, best_match['jellyfin_file_path'] else: return False, None else: return False, None except Exception as e: # Log the error (assuming you have a logging mechanism) print(f"Error in search_track_in_jellyfin: {str(e)}") return False, None # Helper methods used in search_track_in_jellyfin def download_preview_to_tempfile(self, preview_url): try: response = requests.get(preview_url, timeout = self.timeout) if response.status_code != 200: return None # Save to a temporary file tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") tmp_file.write(response.content) tmp_file.close() return tmp_file.name except Exception as e: print(f"Error downloading preview: {str(e)}") return None def convert_to_wav(self, input_file_path): try: output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") output_file.close() # Use ffmpeg to convert to WAV and normalize audio command = [ "ffmpeg", "-y", "-i", input_file_path, "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", output_file.name ] result = subprocess.run(command, capture_output=True, text=True) if result.returncode != 0: self.logger.error(f"Error converting to WAV, subprocess exitcode: {result.returncode} , input_file_path = {input_file_path}") self.logger.error(f"\tprocess stdout: {result.stdout}") self.logger.error(f"\tprocess stderr: {result.stderr}") os.remove(output_file.name) return None return output_file.name except Exception as e: self.logger.error(f"Error converting to WAV: {str(e)}") return None def sliding_fingerprint_similarity(self, full_fp, preview_fp): len_full = len(full_fp) len_preview = len(preview_fp) best_score = float('inf') best_offset = 0 max_offset = len_full - len_preview if max_offset < 0: return 0, 0 total_bits = len_preview * 32 # Total bits in the preview fingerprint for offset in range(max_offset + 1): segment = full_fp[offset:offset + len_preview] xored = np.bitwise_xor(segment, preview_fp) diff_bits = np.unpackbits(xored.view(np.uint8)).sum() score = diff_bits / total_bits # Lower score is better if score < best_score: best_score = score best_offset = offset similarity = (1 - best_score) * 100 # Convert to percentage return similarity, best_offset