From 4bc164cc56476fec85897e1aa21195e27eaa62f4 Mon Sep 17 00:00:00 2001 From: afkarxyz Date: Fri, 23 May 2025 16:43:45 +0700 Subject: [PATCH] v2.8 --- SpotiFLAC.py | 168 +++++++++++++----- getTracks.py | 481 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 601 insertions(+), 48 deletions(-) diff --git a/SpotiFLAC.py b/SpotiFLAC.py index 6161cf6..3f19e15 100644 --- a/SpotiFLAC.py +++ b/SpotiFLAC.py @@ -10,14 +10,14 @@ from PyQt6.QtWidgets import ( QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit, QLabel, QFileDialog, QListWidget, QTextEdit, QTabWidget, QButtonGroup, QRadioButton, QAbstractItemView, QSpacerItem, QSizePolicy, QProgressBar, QCheckBox, QDialog, - QDialogButtonBox, QComboBox, QStyledItemDelegate, QStyle + QDialogButtonBox, QComboBox, QStyledItemDelegate ) from PyQt6.QtCore import Qt, QThread, pyqtSignal, QUrl, QTimer, QTime, QSettings, QSize -from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush, QPalette +from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException -from getTracks import LucidaDownloader, SquidWTFDownloader +from getTracks import LucidaDownloader, SquidWTFDownloader, TidalDownloader @dataclass class Track: @@ -86,6 +86,8 @@ class DownloadWorker(QThread): try: if self.service == "qobuz": downloader = SquidWTFDownloader(self.qobuz_region, self.timeout) + elif self.service == "tidal_api": + downloader = TidalDownloader(timeout=self.timeout) else: downloader = LucidaDownloader(self.use_fallback, self.timeout) @@ -141,7 +143,7 @@ class DownloadWorker(QThread): self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0) self.failed_tracks.append((track.title, track.artists, "No ISRC available")) continue - + self.progress.emit(f"Getting track from Qobuz with ISRC: {track.isrc}", 0) is_paused_callback = lambda: self.is_paused @@ -153,13 +155,57 @@ class DownloadWorker(QThread): is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback ) - else: + elif self.service == "tidal_api": + if not track.isrc: + self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0) + self.failed_tracks.append((track.title, track.artists, "No ISRC available")) + continue + + self.progress.emit(f"Searching and downloading from Tidal (API) for ISRC: {track.isrc} - {track.title} - {track.artists}", 0) + + import asyncio + + is_paused_callback = lambda: self.is_paused + is_stopped_callback = lambda: self.is_stopped + + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + download_result_details = loop.run_until_complete(downloader.download( + query=f"{track.title} {track.artists}", + isrc=track.isrc, + output_dir=track_outpath, + quality="LOSSLESS", + embed_metadata=True, + is_paused_callback=is_paused_callback, + is_stopped_callback=is_stopped_callback + )) + + if isinstance(download_result_details, str) and os.path.exists(download_result_details): + downloaded_file = download_result_details + elif isinstance(download_result_details, dict) and download_result_details.get("success") == False and download_result_details.get("error") == "Download stopped by user": + self.progress.emit(f"Download stopped by user for: {track.title}",0) + return + elif isinstance(download_result_details, dict) and download_result_details.get("success") == False: + raise Exception(download_result_details.get("error", "Tidal API download failed")) + elif isinstance(download_result_details, dict) and download_result_details.get("status") == "all_skipped" or download_result_details.get("status") == "skipped_exists": + self.progress.emit(f"File already exists or skipped: {new_filename}",0) + downloaded_file = new_filepath + else: + downloaded_file = None + raise Exception(f"Tidal API download failed or returned unexpected result: {download_result_details}") + + else: track_id = track.id self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0) - import asyncio - metadata = asyncio.run(downloader.get_track_info(track_id, self.service)) - + metadata = downloader.get_track_info(track_id, self.service) self.progress.emit(f"Track info received, starting download process", 0) is_paused_callback = lambda: self.is_paused @@ -172,7 +218,10 @@ class DownloadWorker(QThread): is_stopped_callback=is_stopped_callback ) - if downloaded_file == new_filepath: + if self.is_stopped: + return + + if downloaded_file == new_filepath: self.progress.emit(f"File already exists: {new_filename}", 0) self.progress.emit(f"Skipped: {track.title} - {track.artists}", int((i + 1) / total_tracks * 100)) @@ -255,21 +304,32 @@ class ServiceStatusChecker(QThread): def run(self): try: response = requests.get("https://lucida.to/api/stats", timeout=5) + services_status = {} if response.status_code == 200: data = response.json() - services_status = {} - current_services = data.get('all', {}).get('downloads', {}).get('current', {}).get('services', {}) - services_status['amazon'] = current_services.get('amazon', 0) > 0 services_status['tidal'] = current_services.get('tidal', 0) > 0 services_status['deezer'] = current_services.get('deezer', 0) > 0 - - self.status_updated.emit(services_status) else: - self.error.emit(f"Server returned status code: {response.status_code}") + self.error.emit(f"Lucida API error: {response.status_code}") + + self.status_updated.emit(services_status) except Exception as e: - self.error.emit(f"Error checking service status: {str(e)}") + self.error.emit(f"Error checking Lucida service status: {str(e)}") + +class TidalStatusChecker(QThread): + status_updated = pyqtSignal(bool) + error = pyqtSignal(str) + + def run(self): + try: + response = requests.get("https://tidal.401658.xyz", timeout=5) + is_online = response.status_code == 200 + self.status_updated.emit(is_online) + except Exception as e: + self.error.emit(f"Error checking Tidal (API) status: {str(e)}") + self.status_updated.emit(False) class QobuzStatusChecker(QThread): status_updated = pyqtSignal(bool) @@ -294,11 +354,6 @@ class StatusIndicatorDelegate(QStyledItemDelegate): super().paint(painter, option, index) - if option.state & QStyle.StateFlag.State_Selected: - text_color = option.palette.color(QPalette.ColorGroup.Active, QPalette.ColorRole.HighlightedText) - else: - text_color = option.palette.color(QPalette.ColorGroup.Active, QPalette.ColorRole.Text) - indicator_color = Qt.GlobalColor.green if is_online else Qt.GlobalColor.red circle_size = 6 @@ -323,12 +378,21 @@ class ServiceComboBox(QComboBox): self.status_checker = ServiceStatusChecker() self.status_checker.status_updated.connect(self.update_service_status) - self.status_checker.error.connect(lambda e: print(f"Status check error: {e}")) + self.status_checker.error.connect(lambda e: print(f"General status check error: {e}")) self.status_checker.start() self.status_timer = QTimer(self) self.status_timer.timeout.connect(self.refresh_status) - self.status_timer.start(5000) + self.status_timer.start(5000) + + self.tidal_api_status_checker = TidalStatusChecker() + self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status) + self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}")) + self.tidal_api_status_checker.start() + + self.tidal_api_status_timer = QTimer(self) + self.tidal_api_status_timer.timeout.connect(self.refresh_tidal_api_status) + self.tidal_api_status_timer.start(6000) def setup_items(self): current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -337,7 +401,8 @@ class ServiceComboBox(QComboBox): {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False}, {'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False}, {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False}, - {'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False} + {'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False}, + {'id': 'tidal_api', 'name': 'Tidal (API)', 'icon': 'tidal.png', 'online': False} ] for service in self.services: @@ -358,12 +423,12 @@ class ServiceComboBox(QComboBox): pixmap.save(path) def update_service_status(self, status_dict): - self.services_status = status_dict + self.services_status.update(status_dict) for i in range(self.count()): service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) - if service_id in self.services_status: + if service_id in self.services_status: service_data = self.itemData(i, Qt.ItemDataRole.UserRole) if isinstance(service_data, dict): service_data['online'] = self.services_status[service_id] @@ -374,8 +439,25 @@ class ServiceComboBox(QComboBox): def refresh_status(self): self.status_checker = ServiceStatusChecker() self.status_checker.status_updated.connect(self.update_service_status) - self.status_checker.error.connect(lambda e: print(f"Status check error: {e}")) + self.status_checker.error.connect(lambda e: print(f"General status check error: {e}")) self.status_checker.start() + + def update_tidal_api_service_status(self, is_online): + for i in range(self.count()): + service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) + if service_id == 'tidal_api': + service_data = self.itemData(i, Qt.ItemDataRole.UserRole) + if isinstance(service_data, dict): + service_data['online'] = is_online + self.setItemData(i, service_data, Qt.ItemDataRole.UserRole) + break + self.update() + + def refresh_tidal_api_status(self): + self.tidal_api_status_checker = TidalStatusChecker() + self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status) + self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}")) + self.tidal_api_status_checker.start() def currentData(self, role=Qt.ItemDataRole.UserRole + 1): return super().currentData(role) @@ -468,7 +550,7 @@ class QobuzRegionComboBox(QComboBox): class SpotiFLACGUI(QWidget): def __init__(self): super().__init__() - self.current_version = "2.7" + self.current_version = "2.8" self.tracks = [] self.reset_state() @@ -913,7 +995,7 @@ class SpotiFLACGUI(QWidget): spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed) about_layout.addItem(spacer) - footer_label = QLabel("v2.7 | May 2025") + footer_label = QLabel("v2.8 | May 2025") footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;") about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter) @@ -932,33 +1014,35 @@ class SpotiFLACGUI(QWidget): timeout_label = widget break + region_label = None + for widget in self.qobuz_region_dropdown.parentWidget().children(): + if isinstance(widget, QLabel) and widget.text() == "Region:": + region_label = widget + break + if service == "qobuz": self.fallback_checkbox.hide() self.timeout_input.hide() if timeout_label: timeout_label.hide() - region_label = None - for widget in self.qobuz_region_dropdown.parentWidget().children(): - if isinstance(widget, QLabel) and widget.text() == "Region:": - region_label = widget - break - if region_label: region_label.show() self.qobuz_region_dropdown.show() - else: + elif service == "tidal_api": + self.fallback_checkbox.hide() + self.timeout_input.hide() + if timeout_label: + timeout_label.hide() + if region_label: + region_label.hide() + self.qobuz_region_dropdown.hide() + else: self.fallback_checkbox.show() self.timeout_input.show() if timeout_label: timeout_label.show() - region_label = None - for widget in self.qobuz_region_dropdown.parentWidget().children(): - if isinstance(widget, QLabel) and widget.text() == "Region:": - region_label = widget - break - if region_label: region_label.hide() self.qobuz_region_dropdown.hide() diff --git a/getTracks.py b/getTracks.py index 6e1d5ef..3ec2c21 100644 --- a/getTracks.py +++ b/getTracks.py @@ -4,6 +4,10 @@ import os import asyncio import re import base64 +import json +import tempfile +import httpx +import aiofiles from datetime import datetime from mutagen.flac import FLAC, Picture from mutagen.id3 import PictureType @@ -537,28 +541,480 @@ class SquidWTFDownloader: except Exception as e: raise Exception(f"Metadata error: {e}") +class TidalDownloader: + def __init__(self, client_id="zU4XHVVkc2tDPo4t", client_secret="VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4=", timeout=30): + self.client_id = client_id + self.client_secret = client_secret + self.timeout = timeout + self.progress_callback = ProgressCallback() + + self.temp_dir = tempfile.gettempdir() + self.token_path = os.path.join(self.temp_dir, "tidal_token.json") + self.access_token = None + + if os.path.exists(self.token_path): + try: + with open(self.token_path, "r") as tok: + token = json.loads(tok.read()) + self.access_token = token.get("access_token") + except: + pass + + def set_progress_callback(self, callback): + self.progress_callback = callback + + async def get_access_token(self): + refresh_url = "https://auth.tidal.com/v1/oauth2/token" + + payload = { + "client_id": self.client_id, + "grant_type": "client_credentials", + } + + async with httpx.AsyncClient(http2=True) as client: + try: + response = await client.post( + url=refresh_url, + data=payload, + auth=(self.client_id, self.client_secret), + ) + + if response.status_code == 200: + token_data = response.json() + new_token = token_data.get("access_token") + + try: + with open(self.token_path, "w") as f: + json.dump({ + "access_token": new_token + }, f) + except: + pass + + self.access_token = new_token + return new_token + return None + + except: + return None + + async def search_tracks(self, query): + try: + tidal_token = self.access_token or await self.get_access_token() + if not tidal_token: + return {"error": "Failed to get access token"} + + search_url = f"https://api.tidal.com/v1/search/tracks?query={query}&limit=25&offset=0&countryCode=US" + header = {"authorization": f"Bearer {tidal_token}"} + + async with httpx.AsyncClient(http2=True) as client: + search_data = await client.get(url=search_url, headers=header) + response_data = search_data.json() + + filtered_items = [{ + "id": item.get("id"), + "title": item.get("title"), + "url": item.get("url"), + "isrc": item.get("isrc"), + "audioQuality": item.get("audioQuality"), + "mediaMetadata": item.get("mediaMetadata"), + "album": item.get("album", {}), + "artists": item.get("artists", []), + "artist": item.get("artist", {}), + "trackNumber": item.get("trackNumber"), + "volumeNumber": item.get("volumeNumber"), + "duration": item.get("duration"), + "copyright": item.get("copyright"), + "explicit": item.get("explicit") + } for item in response_data.get("items", [])] + + return { + "limit": response_data.get("limit"), + "offset": response_data.get("offset"), + "totalNumberOfItems": response_data.get("totalNumberOfItems"), + "items": filtered_items + } + + except Exception as e: + return {"error": f"Error: {str(e)}"} + + async def filter_by_isrc(self, query, isrc=None): + try: + result = await self.search_tracks(query) + + if "error" in result: + return result + + if isrc: + isrc_items = [item for item in result["items"] if item.get("isrc") == isrc] + + if len(isrc_items) > 1: + hires_items = [] + for item in isrc_items: + media_metadata = item.get("mediaMetadata", {}) + tags = media_metadata.get("tags", []) if media_metadata else [] + if "HIRES_LOSSLESS" in tags: + hires_items.append(item) + + if hires_items: + result["items"] = hires_items + else: + result["items"] = isrc_items + else: + result["items"] = isrc_items + + result["totalNumberOfItems"] = len(result["items"]) + + return result + + except Exception as e: + return {"error": f"Error: {str(e)}"} + + async def get_track_download_info(self, track_id, quality="LOSSLESS"): + try: + download_api_url = f"https://tidal.401658.xyz/track/?id={track_id}&quality={quality}" + + async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client: + response = await client.get(download_api_url) + + if response.status_code == 200: + data = response.json() + + for item in data: + if "OriginalTrackUrl" in item: + return { + "success": True, + "download_url": item["OriginalTrackUrl"], + "track_info": data[0] if data else {} + } + + return {"success": False, "error": "OriginalTrackUrl not found in response"} + else: + return {"success": False, "error": f"API returned status code: {response.status_code}"} + + except Exception as e: + return {"success": False, "error": f"Error getting download info: {str(e)}"} + + async def download_album_art(self, album_id, size="1280x1280"): + try: + art_url = f"https://resources.tidal.com/images/{album_id.replace('-', '/')}/{size}.jpg" + + async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client: + response = await client.get(art_url) + + if response.status_code == 200: + return response.content + else: + print(f"Failed to download album art: HTTP {response.status_code}") + return None + + except Exception as e: + print(f"Error downloading album art: {str(e)}") + return None + + async def embed_metadata(self, filepath, track_info, search_info=None): + try: + audio = FLAC(filepath) + + audio.clear() + + if track_info.get("title"): + audio["TITLE"] = track_info["title"] + + artists_list = [] + if search_info and search_info.get("artists"): + for artist in search_info["artists"]: + if artist.get("name"): + artists_list.append(artist["name"]) + elif search_info and search_info.get("artist") and search_info["artist"].get("name"): + artists_list.append(search_info["artist"]["name"]) + elif track_info.get("artists"): + for artist in track_info["artists"]: + if artist.get("name"): + artists_list.append(artist["name"]) + elif track_info.get("artist") and track_info["artist"].get("name"): + artists_list.append(track_info["artist"]["name"]) + + if artists_list: + audio["ARTIST"] = artists_list[0] + if len(artists_list) > 1: + audio["ALBUMARTIST"] = "; ".join(artists_list) + else: + audio["ALBUMARTIST"] = artists_list[0] + + album_info = search_info.get("album", {}) if search_info else track_info.get("album", {}) + if album_info.get("title"): + audio["ALBUM"] = album_info["title"] + + if search_info and search_info.get("trackNumber"): + audio["TRACKNUMBER"] = str(search_info["trackNumber"]) + elif track_info.get("trackNumber"): + audio["TRACKNUMBER"] = str(track_info["trackNumber"]) + + if search_info and search_info.get("volumeNumber"): + audio["DISCNUMBER"] = str(search_info["volumeNumber"]) + elif track_info.get("volumeNumber"): + audio["DISCNUMBER"] = str(track_info["volumeNumber"]) + + isrc = search_info.get("isrc") if search_info else track_info.get("isrc") + if isrc: + audio["ISRC"] = isrc + + copyright_info = search_info.get("copyright") if search_info else track_info.get("copyright") + if copyright_info: + audio["COPYRIGHT"] = copyright_info + + if album_info.get("releaseDate"): + audio["DATE"] = album_info["releaseDate"][:4] + + if track_info.get("genre"): + audio["GENRE"] = track_info["genre"] + + if album_info.get("cover"): + album_art = await self.download_album_art(album_info["cover"]) + if album_art: + picture = Picture() + picture.data = album_art + picture.type = PictureType.COVER_FRONT + picture.mime = "image/jpeg" + picture.desc = "Cover" + audio.add_picture(picture) + print("Album art embedded successfully") + + audio.save() + print(f"Metadata embedded successfully for: {track_info.get('title', 'Unknown')}") + + return True + + except Exception as e: + print(f"Error embedding metadata: {str(e)}") + return False + + async def download_file(self, url, filename, max_retries=3, is_paused_callback=None, is_stopped_callback=None): + for attempt in range(max_retries): + try: + async with httpx.AsyncClient(http2=True, timeout=60.0) as client: + async with client.stream('GET', url) as response: + if response.status_code == 200: + total_size_in_bytes = int(response.headers.get('content-length', 0)) + bytes_downloaded = 0 + async with aiofiles.open(filename, 'wb') as f: + async for chunk in response.aiter_bytes(chunk_size=8192): + if is_stopped_callback and is_stopped_callback(): + print("\\nDownload stopped.") + if os.path.exists(filename): + try: + os.remove(filename) + except OSError as e: + print(f"Error removing partial file: {e}") + return {"success": False, "error": "Download stopped by user"} + + while is_paused_callback and is_paused_callback(): + print("\\nDownload paused. Waiting...") + await asyncio.sleep(1) + + await f.write(chunk) + bytes_downloaded += len(chunk) + if total_size_in_bytes > 0: + if self.progress_callback: + self.progress_callback(bytes_downloaded, total_size_in_bytes) + if total_size_in_bytes > 0 and bytes_downloaded == total_size_in_bytes: + print() + print(f"Successfully downloaded: {filename} ({bytes_downloaded} bytes)") + return {"success": True, "size": bytes_downloaded} + else: + print(f"\\nFailed to download {filename}. HTTP Status: {response.status_code}") + if os.path.exists(filename): + try: + os.remove(filename) + except OSError as e: + print(f"Error removing partial file after server error: {e}") + return {"success": False, "error": f"HTTP {response.status_code}"} + + except Exception as e: + print() + if os.path.exists(filename): + try: + os.remove(filename) + except OSError as ose: + print(f"Error removing partial file after exception: {ose}") + if attempt < max_retries - 1: + print(f"Download attempt {attempt + 1} failed, retrying...") + await asyncio.sleep(2) + else: + return {"success": False, "error": f"Download failed after {max_retries} attempts: {str(e)}"} + + async def download_track(self, track_ids, search_results, output_dir=".", quality="LOSSLESS", embed_meta=True, is_paused_callback=None, is_stopped_callback=None): + if not isinstance(track_ids, list): + track_ids = [track_ids] + + if output_dir != ".": + os.makedirs(output_dir, exist_ok=True) + + search_map = {} + if search_results and search_results.get("items"): + for item in search_results["items"]: + search_map[item["id"]] = item + + all_skipped = True + skipped_files = [] + + for i, track_id in enumerate(track_ids): + download_info = await self.get_track_download_info(track_id, quality) + + if not download_info["success"]: + print(f"Failed to get download info for track {track_id}: {download_info['error']}") + continue + + download_url = download_info["download_url"] + track_info = download_info["track_info"] + search_info = search_map.get(track_id) + + title = track_info.get("title", f"track_{track_id}") + + artists_list = [] + if search_info and search_info.get("artists"): + for artist in search_info["artists"]: + if artist.get("name"): + artists_list.append(artist["name"]) + elif search_info and search_info.get("artist") and search_info["artist"].get("name"): + artists_list.append(search_info["artist"]["name"]) + elif track_info.get("artists"): + for artist in track_info["artists"]: + if artist.get("name"): + artists_list.append(artist["name"]) + elif track_info.get("artist") and track_info["artist"].get("name"): + artists_list.append(track_info["artist"]["name"]) + + artist_names = ", ".join(artists_list) if artists_list else "" + + safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).rstrip() + safe_artists = "".join(c for c in artist_names if c.isalnum() or c in (' ', '-', '_', ',', '.')).rstrip() + + if safe_artists: + filename = f"{safe_title} - {safe_artists}.flac" + else: + filename = f"{safe_title}.flac" + + filepath = os.path.join(output_dir, filename) + + if os.path.exists(filepath): + print(f"File {filename} already exists. Skipping download.") + skipped_files.append(filename) + if len(track_ids) == 1: + return { + "success": True, + "status": "skipped_exists", + "track_id": track_id, + "filename": filename, + "filepath": filepath, + "message": f"File {filename} already exists." + } + continue + + all_skipped = False + print(f"Downloading: {filename}") + + download_result = await self.download_file(download_url, filepath, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback) + + if download_result["success"]: + print(f"Successfully downloaded track {track_id}") + + if embed_meta: + print("Embedding metadata...") + await self.embed_metadata(filepath, track_info, search_info) + + return { + "success": True, + "track_id": track_id, + "filename": filename, + "filepath": filepath, + "size": download_result["size"], + "track_info": track_info, + "metadata_embedded": embed_meta + } + else: + print(f"Failed to download track {track_id}: {download_result['error']}") + if os.path.exists(filepath): + try: + os.remove(filepath) + except: + pass + if download_result.get("error") == "Download stopped by user": + return {"success": False, "error": "Download stopped by user", "track_id": track_id} + + if all_skipped and skipped_files: + return { + "success": True, + "status": "all_skipped", + "message": f"All files already exist: {', '.join(skipped_files)}" + } + + return {"success": False, "error": "All track IDs failed to download or were stopped"} + + async def search_and_download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None): + print(f"Searching for: {query}") + if isrc: + print(f"ISRC: {isrc}") + + search_result = await self.filter_by_isrc(query, isrc) + + if "error" in search_result: + print(f"Search error: {search_result['error']}") + return {"success": False, "error": search_result['error']} + + if not search_result["items"]: + print("No tracks found") + return {"success": False, "error": "No tracks found"} + + track_ids = [item["id"] for item in search_result["items"]] + print(f"Found {len(track_ids)} track(s): {track_ids}") + + download_result = await self.download_track(track_ids, search_result, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback) + + return download_result + + async def download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None): + result = await self.search_and_download(query, isrc, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback) + + if result["success"]: + if result.get("status") == "all_skipped": + print(f"Skipped: {result['message']}") + if "filepath" in result: + return result["filepath"] + return output_dir + elif result.get("status") == "skipped_exists": + print(f"Skipped: {result['message']}") + return result["filepath"] + else: + print("Download completed!") + return result["filepath"] + else: + print(f"Download failed: {result['error']}") + if result.get("error") == "Download stopped by user": + raise Exception("Download stopped by user") + raise Exception(result["error"]) + async def main(): print("=== LucidaDownloader ===") lucida = LucidaDownloader(domain="to") - track_id = "2plbrEY59IikOBgBGLjaoe" service = "tidal" output_dir = "." - + try: print(f"Getting track: {track_id} from {service}") metadata = await lucida.get_track_info(track_id, service) print("Starting download") - - downloaded_file = lucida.download(metadata, output_dir) + downloaded_file = await lucida.download(metadata, output_dir) print(f"Success: File saved as {downloaded_file}") except Exception as e: print(f"Error: {str(e)}") - + print("\n\n=== SquidWTFDownloader ===") squid = SquidWTFDownloader(region="us") - isrc = "TCAIT2495017" + isrc = "USUM72409273" output_dir = "." try: @@ -566,6 +1022,19 @@ async def main(): print(f"Success: File saved as {downloaded_file}") except Exception as e: print(f"Error: {str(e)}") + + print("\n\n=== TidalDownloader ===") + tidal = TidalDownloader() + + query = "APT." + isrc = "USAT22409172" + output_dir = "." + + try: + downloaded_file = await tidal.download(query, isrc, output_dir, quality="LOSSLESS", embed_metadata=True) + print(f"Success: File saved as {downloaded_file}") + except Exception as e: + print(f"Error: {str(e)}") if __name__ == "__main__": try: