v0.1.0
This commit is contained in:
295
app/functions.py
Normal file
295
app/functions.py
Normal file
@@ -0,0 +1,295 @@
|
||||
from flask import flash, redirect, session, url_for
|
||||
from app.models import JellyfinUser, Playlist,Track
|
||||
from app import sp, cache, app, jellyfin ,jellyfin_admin_token, jellyfin_admin_id,device_id, cache
|
||||
from functools import wraps
|
||||
from celery.result import AsyncResult
|
||||
from app.tasks import download_missing_tracks,check_for_playlist_updates, update_all_playlists_track_status, update_jellyfin_id_for_downloaded_tracks
|
||||
from jellyfin.objects import PlaylistMetadata
|
||||
|
||||
TASK_STATUS = {
|
||||
'update_all_playlists_track_status': None,
|
||||
'download_missing_tracks': None,
|
||||
'check_for_playlist_updates': None,
|
||||
'update_jellyfin_id_for_downloaded_tracks' : None
|
||||
}
|
||||
|
||||
def manage_task(task_name):
|
||||
task_id = TASK_STATUS.get(task_name)
|
||||
|
||||
if task_id:
|
||||
result = AsyncResult(task_id)
|
||||
if result.state in ['PENDING', 'STARTED']:
|
||||
return result.state, result.info if result.info else {}
|
||||
if task_name == 'update_all_playlists_track_status':
|
||||
result = update_all_playlists_track_status.delay()
|
||||
elif task_name == 'download_missing_tracks':
|
||||
result = download_missing_tracks.delay()
|
||||
elif task_name == 'check_for_playlist_updates':
|
||||
result = check_for_playlist_updates.delay()
|
||||
elif task_name == 'update_jellyfin_id_for_downloaded_tracks':
|
||||
result = update_jellyfin_id_for_downloaded_tracks.delay()
|
||||
|
||||
TASK_STATUS[task_name] = result.id
|
||||
return result.state, result.info if result.info else {}
|
||||
|
||||
|
||||
def prepPlaylistData(data):
|
||||
playlists = []
|
||||
jellyfin_user = JellyfinUser.query.filter_by(jellyfin_user_id=session['jellyfin_user_id']).first()
|
||||
if not data.get('playlists'):
|
||||
|
||||
data['playlists']= {}
|
||||
data['playlists']['items'] = [data]
|
||||
|
||||
for playlist_data in data['playlists']['items']:
|
||||
# Fetch the playlist from the database if it exists
|
||||
db_playlist = Playlist.query.filter_by(spotify_playlist_id=playlist_data['id']).first()
|
||||
|
||||
if db_playlist:
|
||||
# If the playlist is in the database, use the stored values
|
||||
if isinstance(playlist_data['tracks'],list):
|
||||
track_count = len(playlist_data['tracks'] )
|
||||
else:
|
||||
track_count = playlist_data['tracks']['total'] or 0
|
||||
tracks_available = db_playlist.tracks_available or 0
|
||||
tracks_linked = len([track for track in db_playlist.tracks if track.jellyfin_id]) or 0
|
||||
percent_available = (tracks_available / track_count * 100) if track_count > 0 else 0
|
||||
|
||||
# Determine playlist status
|
||||
if tracks_available == track_count and track_count > 0:
|
||||
status = 'green' # Fully available
|
||||
elif tracks_available > 0:
|
||||
status = 'yellow' # Partially available
|
||||
else:
|
||||
status = 'red' # Not available
|
||||
else:
|
||||
# If the playlist is not in the database, initialize with 0
|
||||
track_count = 0
|
||||
tracks_available = 0
|
||||
tracks_linked = 0
|
||||
percent_available = 0
|
||||
status = 'red' # Not requested yet
|
||||
|
||||
# Append playlist data to the list
|
||||
playlists.append({
|
||||
'name': playlist_data['name'],
|
||||
'description': playlist_data['description'],
|
||||
'image': playlist_data['images'][0]['url'] if playlist_data['images'] else 'default-image.jpg',
|
||||
'url': playlist_data['external_urls']['spotify'],
|
||||
'id': playlist_data['id'],
|
||||
'jellyfin_id': db_playlist.jellyfin_id if db_playlist else '',
|
||||
'can_add': (db_playlist not in jellyfin_user.playlists) if db_playlist else True,
|
||||
'can_remove' : (db_playlist in jellyfin_user.playlists) if db_playlist else False,
|
||||
'last_updated':db_playlist.last_updated if db_playlist else '',
|
||||
'last_changed':db_playlist.last_changed if db_playlist else '',
|
||||
'tracks_available': tracks_available,
|
||||
'track_count': track_count,
|
||||
'tracks_linked': tracks_linked,
|
||||
'percent_available': percent_available,
|
||||
'status': status # Red, yellow, or green based on availability
|
||||
})
|
||||
|
||||
return playlists
|
||||
|
||||
def get_cached_spotify_playlists(playlist_ids):
|
||||
"""
|
||||
Fetches multiple Spotify playlists by their IDs, utilizing individual caching.
|
||||
|
||||
:param playlist_ids: A list of Spotify playlist IDs.
|
||||
:return: A dictionary containing the fetched playlists.
|
||||
"""
|
||||
spotify_data = {'playlists': {'items': []}}
|
||||
|
||||
for playlist_id in playlist_ids:
|
||||
playlist_data = get_cached_spotify_playlist(playlist_id)
|
||||
if playlist_data:
|
||||
spotify_data['playlists']['items'].append(playlist_data)
|
||||
else:
|
||||
app.logger.warning(f"Playlist data for ID {playlist_id} could not be retrieved.")
|
||||
|
||||
return spotify_data
|
||||
|
||||
@cache.memoize(timeout=3600)
|
||||
def get_cached_spotify_playlist(playlist_id):
|
||||
"""
|
||||
Fetches a Spotify playlist by its ID, utilizing caching to minimize API calls.
|
||||
|
||||
:param playlist_id: The Spotify playlist ID.
|
||||
:return: Playlist data as a dictionary, or None if an error occurs.
|
||||
"""
|
||||
try:
|
||||
playlist_data = sp.playlist(playlist_id) # Fetch data from Spotify API
|
||||
return playlist_data
|
||||
except Exception as e:
|
||||
app.logger.error(f"Error fetching playlist {playlist_id} from Spotify: {str(e)}")
|
||||
return None
|
||||
@cache.memoize(timeout=3600*24*10)
|
||||
def get_cached_spotify_track(track_id):
|
||||
"""
|
||||
Fetches a Spotify track by its ID, utilizing caching to minimize API calls.
|
||||
|
||||
:param track_id: The Spotify playlist ID.
|
||||
:return: Track data as a dictionary, or None if an error occurs.
|
||||
"""
|
||||
try:
|
||||
track_data = sp.track(track_id=track_id) # Fetch data from Spotify API
|
||||
return track_data
|
||||
except Exception as e:
|
||||
app.logger.error(f"Error fetching track {track_id} from Spotify: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
def prepAlbumData(data):
|
||||
items = []
|
||||
for item in data['albums']['items']:
|
||||
items.append({
|
||||
'name': item['name'],
|
||||
'description': f"Released: {item['release_date']}",
|
||||
'image': item['images'][0]['url'] if item['images'] else 'default-image.jpg',
|
||||
'url': item['external_urls']['spotify'],
|
||||
'id' : item['id'],
|
||||
'can_add' : False
|
||||
})
|
||||
return items
|
||||
|
||||
def prepArtistData(data):
|
||||
items = []
|
||||
for item in data['artists']['items']:
|
||||
items.append({
|
||||
'name': item['name'],
|
||||
'description': f"Popularity: {item['popularity']}",
|
||||
'image': item['images'][0]['url'] if item['images'] else 'default-image.jpg',
|
||||
'url': item['external_urls']['spotify'],
|
||||
'id' : item['id'],
|
||||
'can_add' : False
|
||||
|
||||
})
|
||||
return items
|
||||
|
||||
|
||||
|
||||
def getFeaturedPlaylists(country,offset):
|
||||
playlists_data = sp.featured_playlists(country=country, limit=16, offset=offset)
|
||||
|
||||
return prepPlaylistData(playlists_data), playlists_data['playlists']['total'],'Featured Playlists'
|
||||
|
||||
def getCategoryPlaylists(category,offset):
|
||||
playlists_data = sp.category_playlists(category_id=category, limit=16, offset=offset)
|
||||
|
||||
return prepPlaylistData(playlists_data), playlists_data['playlists']['total'],f"Category {playlists_data['message']}"
|
||||
|
||||
def getCategories(country,offset):
|
||||
categories_data = sp.categories(limit=16, offset= offset)
|
||||
categories = []
|
||||
|
||||
for cat in categories_data['categories']['items']:
|
||||
categories.append({
|
||||
'name': cat['name'],
|
||||
'description': '',
|
||||
'image': cat['icons'][0]['url'] if cat['icons'] else 'default-image.jpg',
|
||||
'url': f"/playlists?cat={cat['id']}",
|
||||
'id' : cat['id'],
|
||||
'type':'category'
|
||||
})
|
||||
return categories, categories_data['categories']['total'],'Browse Categories'
|
||||
|
||||
def get_tracks_for_playlist(data):
|
||||
results = data
|
||||
tracks = []
|
||||
is_admin = session.get('is_admin', False)
|
||||
|
||||
for idx, item in enumerate(results['tracks']):
|
||||
track_data = item['track']
|
||||
if track_data:
|
||||
duration_ms = track_data['duration_ms']
|
||||
minutes = duration_ms // 60000
|
||||
seconds = (duration_ms % 60000) // 1000
|
||||
|
||||
track_db = Track.query.filter_by(spotify_track_id=track_data['id']).first()
|
||||
|
||||
if track_db:
|
||||
downloaded = track_db.downloaded
|
||||
filesystem_path = track_db.filesystem_path if is_admin else None
|
||||
jellyfin_id = track_db.jellyfin_id
|
||||
download_status = track_db.download_status
|
||||
else:
|
||||
downloaded = False
|
||||
filesystem_path = None
|
||||
jellyfin_id = None
|
||||
download_status = None
|
||||
|
||||
tracks.append({
|
||||
'title': track_data['name'],
|
||||
'artist': ', '.join([artist['name'] for artist in track_data['artists']]),
|
||||
'url': track_data['external_urls']['spotify'],
|
||||
'duration': f'{minutes}:{seconds:02d}',
|
||||
'preview_url': track_data['preview_url'],
|
||||
'downloaded': downloaded,
|
||||
'filesystem_path': filesystem_path,
|
||||
'jellyfin_id': jellyfin_id,
|
||||
'spotify_id': track_data['id'],
|
||||
'duration_ms': duration_ms,
|
||||
'download_status' : download_status
|
||||
})
|
||||
|
||||
return tracks
|
||||
|
||||
def get_full_playlist_data(playlist_id):
|
||||
playlist_data = get_cached_spotify_playlist(playlist_id)
|
||||
all_tracks = []
|
||||
|
||||
offset = 0
|
||||
while True:
|
||||
response = sp.playlist_items(playlist_id, offset=offset, limit=100)
|
||||
items = response['items']
|
||||
all_tracks.extend(items)
|
||||
|
||||
if len(items) < 100:
|
||||
break
|
||||
offset += 100
|
||||
|
||||
playlist_data['tracks'] = all_tracks
|
||||
playlist_data['prepped_data'] = prepPlaylistData(playlist_data)
|
||||
return playlist_data
|
||||
|
||||
def jellyfin_login_required(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if 'jellyfin_user_name' not in session:
|
||||
flash('You need to log in using your Jellyfin Credentials to access this page.', 'warning')
|
||||
return redirect(url_for('login'))
|
||||
return f(*args, **kwargs)
|
||||
return decorated_function
|
||||
|
||||
def jellyfin_admin_required(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if not session['is_admin']:
|
||||
flash('You need to be a Jellyfin admin.', 'warning')
|
||||
return 404 # Redirect to your login route
|
||||
return f(*args, **kwargs)
|
||||
return decorated_function
|
||||
|
||||
|
||||
|
||||
|
||||
def update_playlist_metadata(playlist,spotify_playlist_data):
|
||||
metadata = PlaylistMetadata()
|
||||
metadata.Tags = [f'jellyplist:playlist:{playlist.id}',f'{playlist.tracks_available} of {playlist.track_count} Tracks available']
|
||||
metadata.Overview = spotify_playlist_data['description']
|
||||
jellyfin.update_playlist_metadata(session_token=_get_api_token(),playlist_id=playlist.jellyfin_id,updates= metadata , user_id= _get_admin_id())
|
||||
if spotify_playlist_data['images'] != None:
|
||||
jellyfin.set_playlist_cover_image(session_token= _get_api_token(),playlist_id= playlist.jellyfin_id,spotify_image_url= spotify_playlist_data['images'][0]['url'])
|
||||
|
||||
|
||||
|
||||
def _get_token_from_sessioncookie() -> str:
|
||||
return session['jellyfin_access_token']
|
||||
def _get_api_token() -> str:
|
||||
#return app.config['JELLYFIN_ACCESS_TOKEN']
|
||||
return jellyfin_admin_token
|
||||
def _get_logged_in_user():
|
||||
return JellyfinUser.query.filter_by(jellyfin_user_id=session['jellyfin_user_id']).first()
|
||||
def _get_admin_id():
|
||||
#return JellyfinUser.query.filter_by(is_admin=True).first().jellyfin_user_id
|
||||
return jellyfin_admin_id
|
||||
Reference in New Issue
Block a user