implemented browse_all and browse_page , should be enough for jellyplist
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
from dataclasses import dataclass
|
||||
import os
|
||||
from app.providers.base import AccountAttributes, Album, Artist, Image, MusicProviderClient, Owner,Playlist, PlaylistTrack, Profile,Track,ExternalUrl,Category
|
||||
import requests
|
||||
@@ -12,6 +13,21 @@ import logging
|
||||
|
||||
l = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class BrowseCard:
|
||||
title: str
|
||||
uri: str
|
||||
background_color: str
|
||||
artwork: List[Image]
|
||||
|
||||
|
||||
@dataclass
|
||||
class BrowseSection:
|
||||
title: str
|
||||
items: List[BrowseCard]
|
||||
uri: str
|
||||
|
||||
|
||||
class SpotifyClient(MusicProviderClient):
|
||||
"""
|
||||
Spotify implementation of the MusicProviderClient.
|
||||
@@ -225,11 +241,97 @@ class SpotifyClient(MusicProviderClient):
|
||||
uri=owner_data.get("uri", ""),
|
||||
external_urls=self._parse_external_urls(owner_data.get("uri", ""), "user")
|
||||
)
|
||||
def _parse_card_artwork(self, sources: List[Dict]) -> List[Image]:
|
||||
"""
|
||||
Parse artwork for a browse card.
|
||||
|
||||
:param sources: List of artwork source dictionaries.
|
||||
:return: A list of CardArtwork instances.
|
||||
"""
|
||||
return [Image(url=source["url"], height=source.get("height"), width=source.get("width")) for source in sources]
|
||||
|
||||
|
||||
def _parse_browse_card(self, card_data: Dict) -> BrowseCard:
|
||||
"""
|
||||
Parse a single browse card.
|
||||
|
||||
:param card_data: Dictionary containing card data.
|
||||
:return: A BrowseCard instance.
|
||||
"""
|
||||
card_content = card_data["content"]["data"]["data"]["cardRepresentation"]
|
||||
artwork_sources = card_content["artwork"]["sources"]
|
||||
|
||||
return BrowseCard(
|
||||
title=card_content["title"]["transformedLabel"],
|
||||
uri=card_data["uri"],
|
||||
background_color=card_content["backgroundColor"]["hex"],
|
||||
artwork=self._parse_card_artwork(artwork_sources)
|
||||
)
|
||||
|
||||
def _parse_playlist(self, playlist_data: Dict) -> Playlist:
|
||||
"""
|
||||
Parse a playlist object from API response data.
|
||||
|
||||
:param playlist_data: Dictionary containing playlist data.
|
||||
:return: A Playlist object.
|
||||
"""
|
||||
images = self._parse_images(playlist_data.get("images", {}).get("items", []))
|
||||
|
||||
owner_data = playlist_data.get("ownerV2", {}).get("data", {})
|
||||
owner = self._parse_owner(owner_data)
|
||||
|
||||
tracks = [
|
||||
self._parse_track(item["itemV2"]["data"])
|
||||
for item in playlist_data.get("content", {}).get("items", [])
|
||||
]
|
||||
|
||||
return Playlist(
|
||||
id=playlist_data.get("uri", "").split(":")[-1],
|
||||
name=playlist_data.get("name", ""),
|
||||
uri=playlist_data.get("uri", ""),
|
||||
external_urls=self._parse_external_urls(playlist_data.get("uri", "").split(":")[-1], "playlist"),
|
||||
description=playlist_data.get("description", ""),
|
||||
public=playlist_data.get("public", None),
|
||||
collaborative=playlist_data.get("collaborative", None),
|
||||
followers=playlist_data.get("followers", 0),
|
||||
images=images,
|
||||
owner=owner,
|
||||
tracks=[
|
||||
PlaylistTrack(
|
||||
added_at=item.get("addedAt", {}).get("isoString", ""),
|
||||
added_by=None,
|
||||
is_local=False,
|
||||
track=track
|
||||
)
|
||||
for item, track in zip(
|
||||
playlist_data.get("content", {}).get("items", []),
|
||||
tracks
|
||||
)
|
||||
]
|
||||
)
|
||||
def _parse_browse_section(self, section_data: Dict) -> BrowseSection:
|
||||
"""
|
||||
Parse a single browse section.
|
||||
|
||||
:param section_data: Dictionary containing section data.
|
||||
:return: A BrowseSection instance.
|
||||
"""
|
||||
section_title = section_data["data"]["title"]["transformedLabel"]
|
||||
section_items = [
|
||||
item for item in section_data["sectionItems"]["items"]
|
||||
if not item["uri"].startswith("spotify:xlink")
|
||||
]
|
||||
return BrowseSection(
|
||||
title=section_title,
|
||||
items=[self._parse_browse_card(item) for item in section_items],
|
||||
uri=section_data["uri"]
|
||||
)
|
||||
|
||||
#endregion
|
||||
|
||||
def get_playlist(self, playlist_id: str) -> Playlist:
|
||||
"""
|
||||
Fetch a playlist by ID with all tracks, using the defined generic classes.
|
||||
Fetch a playlist by ID with all tracks.
|
||||
"""
|
||||
limit = 50
|
||||
offset = 0
|
||||
@@ -262,35 +364,8 @@ class SpotifyClient(MusicProviderClient):
|
||||
|
||||
offset += limit
|
||||
|
||||
# Use utility methods to parse tracks
|
||||
tracks = [self._parse_track(item["itemV2"]["data"]) for item in all_items]
|
||||
|
||||
images = self._parse_images(playlist_data.get("images", {}).get("items", []))
|
||||
|
||||
owner_data = playlist_data.get("ownerV2", {}).get("data", {})
|
||||
owner = self._parse_owner(owner_data)
|
||||
|
||||
return Playlist(
|
||||
id=playlist_id,
|
||||
name=playlist_data.get("name", ""),
|
||||
uri=playlist_data.get("uri", ""),
|
||||
external_urls=self._parse_external_urls(playlist_id, "playlist"),
|
||||
description=playlist_data.get("description", ""),
|
||||
public=playlist_data.get("public", None),
|
||||
collaborative=playlist_data.get("collaborative", None),
|
||||
followers=playlist_data.get("followers", 0),
|
||||
images=images,
|
||||
owner=owner,
|
||||
tracks=[
|
||||
PlaylistTrack(
|
||||
added_at=item.get("addedAt", {}).get("isoString", ""),
|
||||
added_by=None,
|
||||
is_local=False,
|
||||
track=track
|
||||
)
|
||||
for item, track in zip(all_items, tracks)
|
||||
]
|
||||
)
|
||||
playlist_data["content"]["items"] = all_items
|
||||
return self._parse_playlist(playlist_data)
|
||||
|
||||
def search_tracks(self, query: str, limit: int = 10) -> List[Track]:
|
||||
"""
|
||||
@@ -428,3 +503,83 @@ class SpotifyClient(MusicProviderClient):
|
||||
except Exception as e:
|
||||
print(f"An error occurred while fetching account attributes: {e}")
|
||||
return None
|
||||
def browse_all(self, page_limit: int = 50, section_limit: int = 99) -> List[BrowseSection]:
|
||||
"""
|
||||
Fetch all browse sections with cards.
|
||||
|
||||
:param page_limit: Maximum number of pages to fetch.
|
||||
:param section_limit: Maximum number of sections per page.
|
||||
:return: A list of BrowseSection objects.
|
||||
"""
|
||||
query_parameters = {
|
||||
"operationName": "browseAll",
|
||||
"variables": json.dumps({
|
||||
"pagePagination": {"offset": 0, "limit": page_limit},
|
||||
"sectionPagination": {"offset": 0, "limit": section_limit}
|
||||
}),
|
||||
"extensions": json.dumps({
|
||||
"persistedQuery": {
|
||||
"version": 1,
|
||||
"sha256Hash": "cd6fcd0ce9d1849477645646601a6d444597013355467e24066dad2c1dc9b740"
|
||||
}
|
||||
})
|
||||
}
|
||||
encoded_query = urlencode(query_parameters)
|
||||
url = f"pathfinder/v1/query?{encoded_query}"
|
||||
|
||||
try:
|
||||
response = self._make_request(url)
|
||||
browse_data = response.get("data", {}).get("browseStart", {}).get("sections", {})
|
||||
sections = browse_data.get("items", [])
|
||||
|
||||
return [self._parse_browse_section(section) for section in sections]
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred while fetching browse sections: {e}")
|
||||
return []
|
||||
|
||||
def browse_page(self, card: BrowseCard) -> List[Playlist]:
|
||||
"""
|
||||
Fetch the content of a browse page using the URI from a BrowseCard.
|
||||
|
||||
:param card: A BrowseCard instance with a URI starting with 'spotify:page'.
|
||||
:return: A list of Playlist objects from the browse page.
|
||||
"""
|
||||
if not card.uri.startswith("spotify:page"):
|
||||
raise ValueError("The BrowseCard URI must start with 'spotify:page'.")
|
||||
|
||||
query_parameters = {
|
||||
"operationName": "browsePage",
|
||||
"variables": json.dumps({
|
||||
"pagePagination": {"offset": 0, "limit": 10},
|
||||
"sectionPagination": {"offset": 0, "limit": 10},
|
||||
"uri": card.uri
|
||||
}),
|
||||
"extensions": json.dumps({
|
||||
"persistedQuery": {
|
||||
"version": 1,
|
||||
"sha256Hash": "d8346883162a16a62a5b69e73e70c66a68c27b14265091cd9e1517f48334bbb3"
|
||||
}
|
||||
})
|
||||
}
|
||||
encoded_query = urlencode(query_parameters)
|
||||
url = f"pathfinder/v1/query?{encoded_query}"
|
||||
|
||||
try:
|
||||
response = self._make_request(url)
|
||||
browse_data = response.get("data", {}).get("browse", {})
|
||||
sections = browse_data.get("sections", {}).get("items", [])
|
||||
|
||||
playlists = []
|
||||
for section in sections:
|
||||
section_items = section.get("sectionItems", {}).get("items", [])
|
||||
for item in section_items:
|
||||
content = item.get("content", {}).get("data", {})
|
||||
if content.get("__typename") == "Playlist":
|
||||
playlists.append(self._parse_playlist(content))
|
||||
|
||||
return playlists
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred while fetching the browse page: {e}")
|
||||
return []
|
||||
Reference in New Issue
Block a user