diff --git a/app/providers/spotify.py b/app/providers/spotify.py index 03ad022..cefa375 100644 --- a/app/providers/spotify.py +++ b/app/providers/spotify.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass import os from app.providers.base import AccountAttributes, Album, Artist, Image, MusicProviderClient, Owner,Playlist, PlaylistTrack, Profile,Track,ExternalUrl,Category import requests @@ -12,6 +13,21 @@ import logging l = logging.getLogger(__name__) +@dataclass +class BrowseCard: + title: str + uri: str + background_color: str + artwork: List[Image] + + +@dataclass +class BrowseSection: + title: str + items: List[BrowseCard] + uri: str + + class SpotifyClient(MusicProviderClient): """ Spotify implementation of the MusicProviderClient. @@ -225,11 +241,97 @@ class SpotifyClient(MusicProviderClient): uri=owner_data.get("uri", ""), external_urls=self._parse_external_urls(owner_data.get("uri", ""), "user") ) + def _parse_card_artwork(self, sources: List[Dict]) -> List[Image]: + """ + Parse artwork for a browse card. + + :param sources: List of artwork source dictionaries. + :return: A list of CardArtwork instances. + """ + return [Image(url=source["url"], height=source.get("height"), width=source.get("width")) for source in sources] + + + def _parse_browse_card(self, card_data: Dict) -> BrowseCard: + """ + Parse a single browse card. + + :param card_data: Dictionary containing card data. + :return: A BrowseCard instance. + """ + card_content = card_data["content"]["data"]["data"]["cardRepresentation"] + artwork_sources = card_content["artwork"]["sources"] + + return BrowseCard( + title=card_content["title"]["transformedLabel"], + uri=card_data["uri"], + background_color=card_content["backgroundColor"]["hex"], + artwork=self._parse_card_artwork(artwork_sources) + ) + + def _parse_playlist(self, playlist_data: Dict) -> Playlist: + """ + Parse a playlist object from API response data. + + :param playlist_data: Dictionary containing playlist data. + :return: A Playlist object. + """ + images = self._parse_images(playlist_data.get("images", {}).get("items", [])) + + owner_data = playlist_data.get("ownerV2", {}).get("data", {}) + owner = self._parse_owner(owner_data) + + tracks = [ + self._parse_track(item["itemV2"]["data"]) + for item in playlist_data.get("content", {}).get("items", []) + ] + + return Playlist( + id=playlist_data.get("uri", "").split(":")[-1], + name=playlist_data.get("name", ""), + uri=playlist_data.get("uri", ""), + external_urls=self._parse_external_urls(playlist_data.get("uri", "").split(":")[-1], "playlist"), + description=playlist_data.get("description", ""), + public=playlist_data.get("public", None), + collaborative=playlist_data.get("collaborative", None), + followers=playlist_data.get("followers", 0), + images=images, + owner=owner, + tracks=[ + PlaylistTrack( + added_at=item.get("addedAt", {}).get("isoString", ""), + added_by=None, + is_local=False, + track=track + ) + for item, track in zip( + playlist_data.get("content", {}).get("items", []), + tracks + ) + ] + ) + def _parse_browse_section(self, section_data: Dict) -> BrowseSection: + """ + Parse a single browse section. + + :param section_data: Dictionary containing section data. + :return: A BrowseSection instance. + """ + section_title = section_data["data"]["title"]["transformedLabel"] + section_items = [ + item for item in section_data["sectionItems"]["items"] + if not item["uri"].startswith("spotify:xlink") + ] + return BrowseSection( + title=section_title, + items=[self._parse_browse_card(item) for item in section_items], + uri=section_data["uri"] + ) + #endregion def get_playlist(self, playlist_id: str) -> Playlist: """ - Fetch a playlist by ID with all tracks, using the defined generic classes. + Fetch a playlist by ID with all tracks. """ limit = 50 offset = 0 @@ -262,35 +364,8 @@ class SpotifyClient(MusicProviderClient): offset += limit - # Use utility methods to parse tracks - tracks = [self._parse_track(item["itemV2"]["data"]) for item in all_items] - - images = self._parse_images(playlist_data.get("images", {}).get("items", [])) - - owner_data = playlist_data.get("ownerV2", {}).get("data", {}) - owner = self._parse_owner(owner_data) - - return Playlist( - id=playlist_id, - name=playlist_data.get("name", ""), - uri=playlist_data.get("uri", ""), - external_urls=self._parse_external_urls(playlist_id, "playlist"), - description=playlist_data.get("description", ""), - public=playlist_data.get("public", None), - collaborative=playlist_data.get("collaborative", None), - followers=playlist_data.get("followers", 0), - images=images, - owner=owner, - tracks=[ - PlaylistTrack( - added_at=item.get("addedAt", {}).get("isoString", ""), - added_by=None, - is_local=False, - track=track - ) - for item, track in zip(all_items, tracks) - ] - ) + playlist_data["content"]["items"] = all_items + return self._parse_playlist(playlist_data) def search_tracks(self, query: str, limit: int = 10) -> List[Track]: """ @@ -427,4 +502,84 @@ class SpotifyClient(MusicProviderClient): except Exception as e: print(f"An error occurred while fetching account attributes: {e}") - return None \ No newline at end of file + return None + def browse_all(self, page_limit: int = 50, section_limit: int = 99) -> List[BrowseSection]: + """ + Fetch all browse sections with cards. + + :param page_limit: Maximum number of pages to fetch. + :param section_limit: Maximum number of sections per page. + :return: A list of BrowseSection objects. + """ + query_parameters = { + "operationName": "browseAll", + "variables": json.dumps({ + "pagePagination": {"offset": 0, "limit": page_limit}, + "sectionPagination": {"offset": 0, "limit": section_limit} + }), + "extensions": json.dumps({ + "persistedQuery": { + "version": 1, + "sha256Hash": "cd6fcd0ce9d1849477645646601a6d444597013355467e24066dad2c1dc9b740" + } + }) + } + encoded_query = urlencode(query_parameters) + url = f"pathfinder/v1/query?{encoded_query}" + + try: + response = self._make_request(url) + browse_data = response.get("data", {}).get("browseStart", {}).get("sections", {}) + sections = browse_data.get("items", []) + + return [self._parse_browse_section(section) for section in sections] + + except Exception as e: + print(f"An error occurred while fetching browse sections: {e}") + return [] + + def browse_page(self, card: BrowseCard) -> List[Playlist]: + """ + Fetch the content of a browse page using the URI from a BrowseCard. + + :param card: A BrowseCard instance with a URI starting with 'spotify:page'. + :return: A list of Playlist objects from the browse page. + """ + if not card.uri.startswith("spotify:page"): + raise ValueError("The BrowseCard URI must start with 'spotify:page'.") + + query_parameters = { + "operationName": "browsePage", + "variables": json.dumps({ + "pagePagination": {"offset": 0, "limit": 10}, + "sectionPagination": {"offset": 0, "limit": 10}, + "uri": card.uri + }), + "extensions": json.dumps({ + "persistedQuery": { + "version": 1, + "sha256Hash": "d8346883162a16a62a5b69e73e70c66a68c27b14265091cd9e1517f48334bbb3" + } + }) + } + encoded_query = urlencode(query_parameters) + url = f"pathfinder/v1/query?{encoded_query}" + + try: + response = self._make_request(url) + browse_data = response.get("data", {}).get("browse", {}) + sections = browse_data.get("sections", {}).get("items", []) + + playlists = [] + for section in sections: + section_items = section.get("sectionItems", {}).get("items", []) + for item in section_items: + content = item.get("content", {}).get("data", {}) + if content.get("__typename") == "Playlist": + playlists.append(self._parse_playlist(content)) + + return playlists + + except Exception as e: + print(f"An error occurred while fetching the browse page: {e}") + return [] \ No newline at end of file