From 7232b3223d600b21dc55a000438973514977f46c Mon Sep 17 00:00:00 2001 From: Kamil Date: Fri, 29 Nov 2024 20:49:36 +0000 Subject: [PATCH] further implementations --- app/providers/base.py | 22 +++ app/providers/spotify.py | 353 ++++++++++++++++++++++++++++++++------- 2 files changed, 310 insertions(+), 65 deletions(-) diff --git a/app/providers/base.py b/app/providers/base.py index a49d322..9fa0956 100644 --- a/app/providers/base.py +++ b/app/providers/base.py @@ -2,6 +2,8 @@ from dataclasses import dataclass, field from typing import List, Optional from abc import ABC, abstractmethod + + @dataclass class ExternalUrl: url: str @@ -12,6 +14,26 @@ class ItemBase: name: str uri: str external_urls: Optional[List[ExternalUrl]] + +@dataclass +class Profile: + avatar: Optional[str] # Avatar URL or None + avatar_background_color: Optional[int] + name: str + uri: str + username: str + +@dataclass +class AccountAttributes: + catalogue: str + dsa_mode_available: bool + dsa_mode_enabled: bool + multi_user_plan_current_size: Optional[int] + multi_user_plan_member_type: Optional[str] + on_demand: bool + opt_in_trial_premium_only_market: bool + country: str + product: str @dataclass class Image: diff --git a/app/providers/spotify.py b/app/providers/spotify.py index 6fcf145..03ad022 100644 --- a/app/providers/spotify.py +++ b/app/providers/spotify.py @@ -1,5 +1,5 @@ import os -from app.providers.base import Album, MusicProviderClient,Playlist,Track,ExternalUrl,Category +from app.providers.base import AccountAttributes, Album, Artist, Image, MusicProviderClient, Owner,Playlist, PlaylistTrack, Profile,Track,ExternalUrl,Category import requests import json @@ -8,6 +8,9 @@ from bs4 import BeautifulSoup from urllib.parse import urlencode from typing import List, Dict, Optional from http.cookiejar import MozillaCookieJar +import logging + +l = logging.getLogger(__name__) class SpotifyClient(MusicProviderClient): """ @@ -30,62 +33,83 @@ class SpotifyClient(MusicProviderClient): :param cookie_file: Path to the cookie file. """ if not os.path.exists(cookie_file): + l.error(f"Cookie file not found: {cookie_file}") raise FileNotFoundError(f"Cookie file not found: {cookie_file}") cookie_jar = MozillaCookieJar(cookie_file) cookie_jar.load(ignore_discard=True, ignore_expires=True) self.cookies = requests.utils.dict_from_cookiejar(cookie_jar) - def authenticate(self, credentials: dict) -> None: + def authenticate(self, credentials: Optional[dict] = None) -> None: """ - Authenticate with Spotify by fetching session data and client token. + Authenticate with Spotify using cookies if available, or fetch session and config data. + + :param credentials: Optional dictionary of credentials. """ - def fetch_session_data(): - url = f'https://open.spotify.com/' - headers = { - 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', - } - response = requests.get(url, headers=headers) - response.raise_for_status() - soup = BeautifulSoup(response.text, 'html.parser') - session_script = soup.find('script', {'id': 'session'}) - config_script = soup.find('script', {'id': 'config'}) - if session_script and config_script: - return json.loads(session_script.string), json.loads(config_script.string) - else: - raise ValueError("Failed to fetch session or config data.") + if self.cookies: + l.debug("Authenticating using cookies.") + self.session_data, self.config_data = self._fetch_session_data() + self.client_token = self._fetch_client_token() + else: + l.debug("Authenticating without cookies.") + self.session_data, self.config_data = self._fetch_session_data(fetch_with_cookies=False) + self.client_token = self._fetch_client_token() - - self.session_data, self.config_data = fetch_session_data() + def _fetch_session_data(self, fetch_with_cookies: bool = True): + """ + Fetch session data from Spotify. - def fetch_client_token(): - url = f'https://clienttoken.spotify.com/v1/clienttoken' - headers = { - 'accept': 'application/json', - 'content-type': 'application/json', - 'origin': 'https://open.spotify.com', - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', - } - payload = { - "client_data": { - "client_version": "1.2.52.404.gcb99a997", - "client_id": self.session_data.get("clientId", ""), - "js_sdk_data": { - "device_brand": "unknown", - "device_model": "unknown", - "os": "windows", - "os_version": "NT 10.0", - "device_id": self.config_data.get("correlationId", ""), - "device_type": "computer" - } + :param fetch_with_cookies: Whether to include cookies in the request. + :return: Tuple containing session and config data. + """ + url = 'https://open.spotify.com/' + headers = { + 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', + } + cookies = self.cookies if fetch_with_cookies else None + response = requests.get(url, headers=headers, cookies=cookies) + response.raise_for_status() + soup = BeautifulSoup(response.text, 'html.parser') + session_script = soup.find('script', {'id': 'session'}) + config_script = soup.find('script', {'id': 'config'}) + if session_script and config_script: + l.debug("fetched session and config scripts") + return json.loads(session_script.string), json.loads(config_script.string) + else: + raise ValueError("Failed to fetch session or config data.") + + def _fetch_client_token(self): + """ + Fetch the client token using session data and cookies. + + :return: The client token as a string. + """ + url = f'https://clienttoken.spotify.com/v1/clienttoken' + headers = { + 'accept': 'application/json', + 'content-type': 'application/json', + 'origin': 'https://open.spotify.com', + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', + } + payload = { + "client_data": { + "client_version": "1.2.52.404.gcb99a997", + "client_id": self.session_data.get("clientId", ""), + "js_sdk_data": { + "device_brand": "unknown", + "device_model": "unknown", + "os": "windows", + "os_version": "NT 10.0", + "device_id": self.config_data.get("correlationId", ""), + "device_type": "computer" } } - response = requests.post(url, headers=headers, json=payload) - response.raise_for_status() - return response.json().get("granted_token", "") - - self.client_token = fetch_client_token() + } + response = requests.post(url, headers=headers, json=payload, cookies=self.cookies) + response.raise_for_status() + l.debug("fetched granted_token") + return response.json().get("granted_token", "") def _make_request(self, endpoint: str, params: dict = None) -> dict: """ @@ -97,13 +121,115 @@ class SpotifyClient(MusicProviderClient): 'authorization': f'Bearer {self.session_data.get("accessToken", "")}', 'client-token': self.client_token.get('token',''), } - response = requests.get(f"{self.base_url}/{endpoint}", headers=headers, params=params) + l.debug(f"starting request: {self.base_url}/{endpoint}") + response = requests.get(f"{self.base_url}/{endpoint}", headers=headers, params=params, cookies=self.cookies) + response.raise_for_status() return response.json() + + #region utility functions to help parsing objects + def _parse_external_urls(self, uri: str, entity_type: str) -> List[ExternalUrl]: + """ + Create ExternalUrl instances for an entity. + + :param uri: The URI of the entity. + :param entity_type: The type of entity ('track', 'album', 'artist', 'playlist', etc.). + :return: A list of ExternalUrl instances. + """ + return [ExternalUrl(url=f"https://open.spotify.com/{entity_type}/{uri.split(':')[-1]}")] + + + def _parse_images(self, image_data: List[Dict]) -> List[Image]: + """ + Parse images from the API response. + + :param image_data: List of dictionaries containing image data. + :return: A list of Image objects. + """ + images = [] + for img in image_data: + # Extract the first source if available + sources = img.get("sources", []) + if sources: + source = sources[0] # Take the first source as the default + images.append(Image( + url=source.get("url"), + height=source.get("height"), + width=source.get("width") + )) + return images + + + def _parse_artist(self, artist_data: Dict) -> Artist: + """ + Parse an artist object from API data. + + :param artist_data: Dictionary representing an artist. + :return: An Artist instance. + """ + return Artist( + id=artist_data["uri"].split(":")[-1], + name=artist_data["profile"]["name"], + uri=artist_data["uri"], + external_urls=self._parse_external_urls(artist_data["uri"], "artist") + ) + + + def _parse_album(self, album_data: Dict) -> Album: + """ + Parse an album object from API data. + + :param album_data: Dictionary representing an album. + :return: An Album instance. + """ + return Album( + id=album_data["uri"].split(":")[-1], + name=album_data["name"], + uri=album_data["uri"], + external_urls=self._parse_external_urls(album_data["uri"], "album"), + artists=[self._parse_artist(artist) for artist in album_data["artists"]["items"]], + images=self._parse_images(album_data["coverArt"]["sources"]) + ) + + + def _parse_track(self, track_data: Dict) -> Track: + """ + Parse a track object from API data. + + :param track_data: Dictionary representing a track. + :return: A Track instance. + """ + return Track( + id=track_data["uri"].split(":")[-1], + name=track_data["name"], + uri=track_data["uri"], + external_urls=self._parse_external_urls(track_data["uri"], "track"), + duration_ms=track_data["trackDuration"]["totalMilliseconds"], + explicit=track_data.get("explicit", False), + album=self._parse_album(track_data["albumOfTrack"]), + artists=[self._parse_artist(artist) for artist in track_data["artists"]["items"]] + ) + def _parse_owner(self, owner_data: Dict) -> Optional[Owner]: + """ + Parse an owner object from API data. + + :param owner_data: Dictionary representing an owner. + :return: An Owner instance or None if the owner data is empty. + """ + if not owner_data: + return None + + return Owner( + id=owner_data.get("uri", "").split(":")[-1], + name=owner_data.get("name", ""), + uri=owner_data.get("uri", ""), + external_urls=self._parse_external_urls(owner_data.get("uri", ""), "user") + ) + #endregion def get_playlist(self, playlist_id: str) -> Playlist: """ - Fetch a playlist by ID with all tracks. + Fetch a playlist by ID with all tracks, using the defined generic classes. """ limit = 50 offset = 0 @@ -136,27 +262,34 @@ class SpotifyClient(MusicProviderClient): offset += limit - tracks = [ - Track( - id=item["itemV2"]["data"]["uri"].split(":")[-1], - name=item["itemV2"]["data"]["name"], - uri=item["itemV2"]["data"]["uri"], - duration_ms=item["itemV2"]["data"]["trackDuration"]["totalMilliseconds"], - explicit=False, # Default as Spotify API doesn't provide explicit info here - album=item["itemV2"]["data"]["albumOfTrack"]["name"], - artists=[ - artist["profile"]["name"] - for artist in item["itemV2"]["data"]["albumOfTrack"]["artists"]["items"] - ] - ) - for item in all_items - ] + # Use utility methods to parse tracks + tracks = [self._parse_track(item["itemV2"]["data"]) for item in all_items] + + images = self._parse_images(playlist_data.get("images", {}).get("items", [])) + + owner_data = playlist_data.get("ownerV2", {}).get("data", {}) + owner = self._parse_owner(owner_data) + return Playlist( id=playlist_id, name=playlist_data.get("name", ""), - description=playlist_data.get("description", ""), uri=playlist_data.get("uri", ""), - tracks=tracks, + external_urls=self._parse_external_urls(playlist_id, "playlist"), + description=playlist_data.get("description", ""), + public=playlist_data.get("public", None), + collaborative=playlist_data.get("collaborative", None), + followers=playlist_data.get("followers", 0), + images=images, + owner=owner, + tracks=[ + PlaylistTrack( + added_at=item.get("addedAt", {}).get("isoString", ""), + added_by=None, + is_local=False, + track=track + ) + for item, track in zip(all_items, tracks) + ] ) def search_tracks(self, query: str, limit: int = 10) -> List[Track]: @@ -204,4 +337,94 @@ class SpotifyClient(MusicProviderClient): :return: A list of Category objects. """ print(f"get_categories: Placeholder for categories with limit {limit}.") - return [] \ No newline at end of file + return [] + + + # non generic method implementations: + def get_profile(self) -> Optional[Profile]: + """ + Fetch the profile attributes of the authenticated Spotify user. + + :return: A Profile object containing the user's profile information or None if an error occurs. + """ + query_parameters = { + "operationName": "profileAttributes", + "variables": json.dumps({}), + "extensions": json.dumps({ + "persistedQuery": { + "version": 1, + "sha256Hash": "53bcb064f6cd18c23f752bc324a791194d20df612d8e1239c735144ab0399ced" + } + }) + } + + encoded_query = urlencode(query_parameters) + + url = f"pathfinder/v1/query?{encoded_query}" + + try: + response = self._make_request(url) + profile_data = response.get('data', {}).get('me', {}).get('profile', {}) + if not profile_data: + raise ValueError("Invalid profile data received.") + return Profile( + avatar=profile_data.get("avatar"), + avatar_background_color=profile_data.get("avatarBackgroundColor"), + name=profile_data.get("name", ""), + uri=profile_data.get("uri", ""), + username=profile_data.get("username", "") + ) + + except Exception as e: + print(f"An error occurred while fetching profile attributes: {e}") + return None + def get_account_attributes(self) -> Optional[AccountAttributes]: + """ + Fetch the account attributes of the authenticated Spotify user. + + :return: An AccountAttributes object containing the user's account information or None if an error occurs. + """ + # Define the query parameters + query_parameters = { + "operationName": "accountAttributes", + "variables": json.dumps({}), # Empty variables for this query + "extensions": json.dumps({ + "persistedQuery": { + "version": 1, + "sha256Hash": "4fbd57be3c6ec2157adcc5b8573ec571f61412de23bbb798d8f6a156b7d34cdf" + } + }) + } + + # Encode the query parameters + encoded_query = urlencode(query_parameters) + + # API endpoint + url = f"pathfinder/v1/query?{encoded_query}" + + try: + # Perform the request + response = self._make_request(url) + + # Extract and validate the account data + account_data = response.get('data', {}).get('me', {}).get('account', {}) + attributes = account_data.get("attributes", {}) + if not attributes or not account_data.get("country") or not account_data.get("product"): + raise ValueError("Invalid account data received.") + + # Map the response to the AccountAttributes class + return AccountAttributes( + catalogue=attributes.get("catalogue", ""), + dsa_mode_available=attributes.get("dsaModeAvailable", False), + dsa_mode_enabled=attributes.get("dsaModeEnabled", False), + multi_user_plan_current_size=attributes.get("multiUserPlanCurrentSize"), + multi_user_plan_member_type=attributes.get("multiUserPlanMemberType"), + on_demand=attributes.get("onDemand", False), + opt_in_trial_premium_only_market=attributes.get("optInTrialPremiumOnlyMarket", False), + country=account_data.get("country", ""), + product=account_data.get("product", "") + ) + + except Exception as e: + print(f"An error occurred while fetching account attributes: {e}") + return None \ No newline at end of file