reworked the celery task management

This commit is contained in:
Kamil
2024-12-04 22:22:04 +00:00
parent e2d37b77b0
commit 1ee0087b8f
8 changed files with 95 additions and 87 deletions

View File

@@ -11,19 +11,13 @@ from app.models import JellyfinUser,Playlist,Track, user_playlists, playlist_tra
import os
import redis
from celery import current_task,signals
from celery.result import AsyncResult
from app.providers import base
from app.registry.music_provider_registry import MusicProviderRegistry
from lidarr.classes import Artist
def acquire_lock(lock_name, expiration=60):
return redis_client.set(lock_name, "locked", ex=expiration, nx=True)
def release_lock(lock_name):
redis_client.delete(lock_name)
def prepare_logger():
FORMAT = "[%(asctime)s][%(filename)18s:%(lineno)4s - %(funcName)20s() ] %(message)s"
logging.basicConfig(format=FORMAT)
@signals.celeryd_init.connect
def setup_log_format(sender, conf, **kwargs):
@@ -36,7 +30,7 @@ def setup_log_format(sender, conf, **kwargs):
def update_all_playlists_track_status(self):
lock_key = "update_all_playlists_track_status_lock"
if acquire_lock(lock_key, expiration=600):
if task_manager.acquire_lock(lock_key, expiration=600):
try:
with app.app_context():
playlists = Playlist.query.all()
@@ -51,19 +45,26 @@ def update_all_playlists_track_status(self):
for playlist in playlists:
total_tracks = 0
available_tracks = 0
app.logger.debug(f"Current Playlist: {playlist.name} [{playlist.id}:{playlist.provider_playlist_id}]" )
app.logger.info(f"Current Playlist: {playlist.name} [{playlist.id}:{playlist.provider_playlist_id}]" )
for track in playlist.tracks:
total_tracks += 1
app.logger.debug(f"Processing track: {track.name} [{track.provider_track_id}]")
app.logger.debug(f"\tPath = {track.filesystem_path}")
if track.filesystem_path:
app.logger.debug(f"\tPath exists = {os.path.exists(track.filesystem_path)}")
app.logger.debug(f"\tJellyfinID = {track.jellyfin_id}")
if track.filesystem_path and os.path.exists(track.filesystem_path):
app.logger.debug(f"Track {track.name} is already downloaded at {track.filesystem_path}.")
app.logger.info(f"Track {track.name} is already downloaded at {track.filesystem_path}.")
available_tracks += 1
track.downloaded = True
db.session.commit()
#If not found in filesystem, but a jellyfin_id is set, query the jellyfin server for the track and populate the filesystem_path from the response with the path
elif track.jellyfin_id:
jellyfin_track = jellyfin.get_item(jellyfin_admin_token, track.jellyfin_id)
app.logger.debug(f"\tJellyfin Path: {jellyfin_track['Path']}")
app.logger.debug(f"\tJellyfin Path exists: {os.path.exists(jellyfin_track['Path'])}")
if jellyfin_track and os.path.exists(jellyfin_track['Path']):
app.logger.debug(f"Track {track.name} found in Jellyfin at {jellyfin_track['Path']}.")
app.logger.info(f"Track {track.name} found in Jellyfin at {jellyfin_track['Path']}.")
track.filesystem_path = jellyfin_track['Path']
track.downloaded = True
db.session.commit()
@@ -94,7 +95,7 @@ def update_all_playlists_track_status(self):
app.logger.info("All playlists' track statuses updated.")
return {'status': 'All playlists updated', 'total': total_playlists, 'processed': processed_playlists}
finally:
release_lock(lock_key)
task_manager.release_lock(lock_key)
else:
app.logger.info("Skipping task. Another instance is already running.")
return {'status': 'Task skipped, another instance is running'}
@@ -104,7 +105,7 @@ def update_all_playlists_track_status(self):
def download_missing_tracks(self):
lock_key = "download_missing_tracks_lock"
if acquire_lock(lock_key, expiration=1800):
if task_manager.acquire_lock(lock_key, expiration=1800):
try:
app.logger.info("Starting track download job...")
@@ -243,7 +244,7 @@ def download_missing_tracks(self):
'failed': failed_downloads
}
finally:
release_lock(lock_key)
task_manager.release_lock(lock_key)
if app.config['REFRESH_LIBRARIES_AFTER_DOWNLOAD_TASK']:
libraries = jellyfin.get_libraries(jellyfin_admin_token)
for lib in libraries:
@@ -257,7 +258,7 @@ def download_missing_tracks(self):
def check_for_playlist_updates(self):
lock_key = "check_for_playlist_updates_lock"
if acquire_lock(lock_key, expiration=600):
if task_manager.acquire_lock(lock_key, expiration=600):
try:
app.logger.info('Starting playlist update check...')
with app.app_context():
@@ -355,7 +356,7 @@ def check_for_playlist_updates(self):
return {'status': 'Playlist update check completed', 'total': total_playlists, 'processed': processed_playlists}
finally:
release_lock(lock_key)
task_manager.release_lock(lock_key)
else:
app.logger.info("Skipping task. Another instance is already running.")
return {'status': 'Task skipped, another instance is running'}
@@ -363,15 +364,15 @@ def check_for_playlist_updates(self):
@celery.task(bind=True)
def update_jellyfin_id_for_downloaded_tracks(self):
lock_key = "update_jellyfin_id_for_downloaded_tracks_lock"
full_update_key = 'full_update_jellyfin_ids'
if acquire_lock(lock_key, expiration=600): # Lock for 10 minutes
full_update_key = 'full_update_jellyfin_ids_lock'
if task_manager.acquire_lock(lock_key, expiration=600): # Lock for 10 minutes
try:
app.logger.info("Starting Jellyfin ID update for tracks...")
with app.app_context():
downloaded_tracks = Track.query.filter_by(downloaded=True, jellyfin_id=None).all()
if acquire_lock(full_update_key, expiration=60*60*24):
if task_manager.acquire_lock(full_update_key, expiration=60*60*24):
app.logger.info(f"performing full update on jellyfin track ids. (Update tracks and playlists if better quality will be found)")
downloaded_tracks = Track.query.all()
else:
@@ -415,7 +416,7 @@ def update_jellyfin_id_for_downloaded_tracks(self):
return {'status': 'All tracks updated', 'total': total_tracks, 'processed': processed_tracks}
finally:
release_lock(lock_key)
task_manager.release_lock(lock_key)
else:
app.logger.info("Skipping task. Another instance is already running.")
return {'status': 'Task skipped, another instance is running'}
@@ -424,7 +425,7 @@ def update_jellyfin_id_for_downloaded_tracks(self):
def request_lidarr(self):
lock_key = "request_lidarr_lock"
if acquire_lock(lock_key, expiration=600):
if task_manager.acquire_lock(lock_key, expiration=600):
with app.app_context():
if app.config['LIDARR_API_KEY'] and app.config['LIDARR_URL']:
from app import lidarr_client
@@ -497,11 +498,11 @@ def request_lidarr(self):
app.logger.info(f'Requests sent to Lidarr. Total items: {total_items}')
return {'status': 'Request sent to Lidarr'}
finally:
release_lock(lock_key)
task_manager.release_lock(lock_key)
else:
app.logger.info('Lidarr API key or URL not set. Skipping request.')
release_lock(lock_key)
task_manager.release_lock(lock_key)
else:
@@ -594,3 +595,48 @@ def compute_quality_score(result, use_ffprobe=False) -> float:
app.logger.warning(f"No valid file path for track {result.get('Name')} - Skipping ffprobe analysis.")
return score
class TaskManager:
def __init__(self):
self.tasks = {
'update_all_playlists_track_status': None,
'download_missing_tracks': None,
'check_for_playlist_updates': None,
'update_jellyfin_id_for_downloaded_tracks': None
}
if app.config['LIDARR_API_KEY']:
self.tasks['request_lidarr'] = None
def start_task(self, task_name, *args, **kwargs):
if task_name not in self.tasks:
raise ValueError(f"Task {task_name} is not defined.")
task = globals()[task_name].delay(*args, **kwargs)
self.tasks[task_name] = task.id
return task.id
def get_task_status(self, task_name):
if task_name not in self.tasks:
raise ValueError(f"Task {task_name} is not defined.")
task_id = self.tasks[task_name]
if not task_id:
return {'state': 'NOT STARTED', 'info': {}, 'lock_status': False}
result = AsyncResult(task_id)
lock_status = True if self.get_lock(f"{task_name}_lock") else False
return {'state': result.state, 'info': result.info if result.info else {}, 'lock_status': lock_status}
def acquire_lock(self, lock_name, expiration=60):
return redis_client.set(lock_name, "locked", ex=expiration, nx=True)
def release_lock(self, lock_name):
redis_client.delete(lock_name)
def get_lock(self, lock_name):
return redis_client.get(lock_name)
def prepare_logger(self):
FORMAT = "[%(asctime)s][%(filename)18s:%(lineno)4s - %(funcName)20s() ] %(message)s"
logging.basicConfig(format=FORMAT)
task_manager = TaskManager()