diff --git a/Archived/getMetadata.py b/Archived/getMetadata.py deleted file mode 100644 index 11e8b8c..0000000 --- a/Archived/getMetadata.py +++ /dev/null @@ -1,99 +0,0 @@ -import asyncio -import zendriver as zd - -async def get_metadata(page, headless=True): - max_attempts = 40 - attempts = 0 - - await asyncio.sleep(2) - - await page.evaluate(""" - window.downloadInfo = null; - const originalFetch = window.fetch; - window.fetch = async function(...args) { - const [url, config] = args; - if (url.includes('/api/load?url=%2Fapi%2Ffetch%2Fstream%2Fv2')) { - const payload = JSON.parse(config.body); - const title = document.querySelector('h1.svelte-6pt9ji').textContent.trim(); - const artists = Array.from(document.querySelectorAll('h2.svelte-6pt9ji a.normal')) - .map(a => a.textContent.trim()) - .join(', '); - const cover = document.querySelector('.svelte-6pt9ji .meta.svelte-6pt9ji a').href; - - window.downloadInfo = { - url: payload.url, - cover: cover, - title: title, - artists: artists, - token: payload.token.primary, - expiry: payload.token.expiry - }; - } - return originalFetch.apply(this, args); - }; - """) - - await page.evaluate(""" - function waitForElement(selector) { - return new Promise(resolve => { - if (document.querySelector(selector)) { - return resolve(document.querySelector(selector)); - } - - const observer = new MutationObserver(mutations => { - if (document.querySelector(selector)) { - observer.disconnect(); - resolve(document.querySelector(selector)); - } - }); - - observer.observe(document.documentElement, { - childList: true, - subtree: true - }); - }); - } - - (async () => { - if (!window.location.hostname.includes('lucida.')) return; - - await Promise.race([ - waitForElement('.d1-track button'), - waitForElement('button[class*="download-button"]') - ]); - - const clickDownloadButton = () => { - const button = document.querySelector('.d1-track button') || - document.querySelector('button[class*="download-button"]'); - if (button) button.click(); - }; - - clickDownloadButton(); - })(); - """) - - while attempts < max_attempts: - download_info = await page.evaluate("window.downloadInfo") - if download_info: - return download_info - - await asyncio.sleep(0.5) - attempts += 1 - - raise TimeoutError("Timeout") - -async def main(headless=True): - browser = await zd.start(headless=headless) - try: - track_id = "2plbrEY59IikOBgBGLjaoe" - url = f"https://lucida.to/?url=https%3A%2F%2Fopen.spotify.com%2Ftrack%2F{track_id}&country=auto&to=tidal" - - page = await browser.get(url) - download_info = await get_metadata(page) - print(download_info) - return download_info - finally: - await browser.stop() - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/SpotiFLAC.py b/SpotiFLAC.py index 0e36ae3..eefdc92 100644 --- a/SpotiFLAC.py +++ b/SpotiFLAC.py @@ -17,7 +17,8 @@ from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException -from getTracks import LucidaDownloader, SquidWTFDownloader, TidalDownloader +from qobuzDL import QobuzDownloader +from tidalDL import TidalDownloader @dataclass class Track: @@ -53,10 +54,9 @@ class MetadataFetchWorker(QThread): class DownloadWorker(QThread): finished = pyqtSignal(bool, str, list) progress = pyqtSignal(str, int) - - def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False, + def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False, album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True, - use_album_subfolders=False, service="amazon", timeout=30, qobuz_region="us"): + use_album_subfolders=False, service="tidal", qobuz_region="us"): super().__init__() self.tracks = tracks self.outpath = outpath @@ -68,7 +68,6 @@ class DownloadWorker(QThread): self.use_track_numbers = use_track_numbers self.use_album_subfolders = use_album_subfolders self.service = service - self.timeout = timeout self.qobuz_region = qobuz_region self.is_paused = False self.is_stopped = False @@ -84,11 +83,11 @@ class DownloadWorker(QThread): def run(self): try: if self.service == "qobuz": - downloader = SquidWTFDownloader(self.qobuz_region, self.timeout) - elif self.service == "tidal_api": - downloader = TidalDownloader(timeout=self.timeout) + downloader = QobuzDownloader(self.qobuz_region) + elif self.service == "tidal": + downloader = TidalDownloader() else: - downloader = LucidaDownloader(timeout=self.timeout) + downloader = TidalDownloader() def progress_update(current, total): if total > 0: @@ -154,16 +153,15 @@ class DownloadWorker(QThread): is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback ) - elif self.service == "tidal_api": + elif self.service == "tidal": if not track.isrc: self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0) self.failed_tracks.append((track.title, track.artists, "No ISRC available")) continue - self.progress.emit(f"Searching and downloading from Tidal (API) for ISRC: {track.isrc} - {track.title} - {track.artists}", 0) + self.progress.emit(f"Searching and downloading from Tidal for ISRC: {track.isrc} - {track.title} - {track.artists}", 0) import asyncio - is_paused_callback = lambda: self.is_paused is_stopped_callback = lambda: self.is_stopped @@ -181,7 +179,6 @@ class DownloadWorker(QThread): isrc=track.isrc, output_dir=track_outpath, quality="LOSSLESS", - embed_metadata=True, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback )) @@ -192,13 +189,13 @@ class DownloadWorker(QThread): self.progress.emit(f"Download stopped by user for: {track.title}",0) return elif isinstance(download_result_details, dict) and download_result_details.get("success") == False: - raise Exception(download_result_details.get("error", "Tidal API download failed")) + raise Exception(download_result_details.get("error", "Tidal download failed")) elif isinstance(download_result_details, dict) and (download_result_details.get("status") == "all_skipped" or download_result_details.get("status") == "skipped_exists"): self.progress.emit(f"File already exists or skipped: {new_filename}",0) downloaded_file = new_filepath else: downloaded_file = None - raise Exception(f"Tidal API download failed or returned unexpected result: {download_result_details}") + raise Exception(f"Tidal download failed or returned unexpected result: {download_result_details}") else: track_id = track.id self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0) @@ -306,26 +303,7 @@ class UpdateDialog(QDialog): self.update_button.clicked.connect(self.accept) self.cancel_button.clicked.connect(self.reject) -class ServiceStatusChecker(QThread): - status_updated = pyqtSignal(dict) - error = pyqtSignal(str) - - def run(self): - try: - response = requests.get("https://lucida.to/api/stats", timeout=5) - services_status = {} - if response.status_code == 200: - data = response.json() - current_services = data.get('downloads', {}).get('current', {}).get('services', {}) - services_status['amazon'] = current_services.get('amazon', 0) > 0 - services_status['tidal'] = current_services.get('tidal', 0) > 0 - services_status['deezer'] = current_services.get('deezer', 0) > 0 - else: - self.error.emit(f"Lucida API error: {response.status_code}") - - self.status_updated.emit(services_status) - except Exception as e: - self.error.emit(f"Error checking Lucida service status: {str(e)}") + class TidalStatusChecker(QThread): status_updated = pyqtSignal(bool) @@ -382,36 +360,23 @@ class ServiceComboBox(QComboBox): self.services_status = {} self.setItemDelegate(StatusIndicatorDelegate()) - self.setup_items() - self.status_checker = ServiceStatusChecker() - self.status_checker.status_updated.connect(self.update_service_status) - self.status_checker.error.connect(lambda e: print(f"General status check error: {e}")) - self.status_checker.start() - - self.status_timer = QTimer(self) - self.status_timer.timeout.connect(self.refresh_status) - self.status_timer.start(5000) + self.tidal_status_checker = TidalStatusChecker() + self.tidal_status_checker.status_updated.connect(self.update_tidal_service_status) + self.tidal_status_checker.error.connect(lambda e: print(f"Tidal status check error: {e}")) + self.tidal_status_checker.start() - self.tidal_api_status_checker = TidalStatusChecker() - self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status) - self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}")) - self.tidal_api_status_checker.start() - - self.tidal_api_status_timer = QTimer(self) - self.tidal_api_status_timer.timeout.connect(self.refresh_tidal_api_status) - self.tidal_api_status_timer.start(6000) + self.tidal_status_timer = QTimer(self) + self.tidal_status_timer.timeout.connect(self.refresh_tidal_status) + self.tidal_status_timer.start(6000) def setup_items(self): current_dir = os.path.dirname(os.path.abspath(__file__)) self.services = [ - {'id': 'tidal', 'name': 'Tidal (Lucida)', 'icon': 'tidal.png', 'online': False}, - {'id': 'amazon', 'name': 'Amazon (Lucida)', 'icon': 'amazon.png', 'online': False}, - {'id': 'deezer', 'name': 'Deezer (Lucida)', 'icon': 'deezer.png', 'online': False}, - {'id': 'qobuz', 'name': 'Qobuz (SquidWTF)', 'icon': 'qobuz.png', 'online': False}, - {'id': 'tidal_api', 'name': 'Tidal (API)', 'icon': 'tidal.png', 'online': False} + {'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False}, + {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False} ] for service in self.services: @@ -425,48 +390,27 @@ class ServiceComboBox(QComboBox): item_index = self.count() - 1 self.setItemData(item_index, service['id'], Qt.ItemDataRole.UserRole + 1) self.setItemData(item_index, service, Qt.ItemDataRole.UserRole) - def create_placeholder_icon(self, path): pixmap = QPixmap(16, 16) pixmap.fill(Qt.GlobalColor.transparent) pixmap.save(path) - - def update_service_status(self, status_dict): - self.services_status.update(status_dict) - - for i in range(self.count()): - service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) - - if service_id in self.services_status: - service_data = self.itemData(i, Qt.ItemDataRole.UserRole) - if isinstance(service_data, dict): - service_data['online'] = self.services_status[service_id] - self.setItemData(i, service_data, Qt.ItemDataRole.UserRole) - - self.update() - - def refresh_status(self): - self.status_checker = ServiceStatusChecker() - self.status_checker.status_updated.connect(self.update_service_status) - self.status_checker.error.connect(lambda e: print(f"General status check error: {e}")) - self.status_checker.start() - def update_tidal_api_service_status(self, is_online): + def update_tidal_service_status(self, is_online): for i in range(self.count()): service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) - if service_id == 'tidal_api': + if service_id == 'tidal': service_data = self.itemData(i, Qt.ItemDataRole.UserRole) if isinstance(service_data, dict): service_data['online'] = is_online self.setItemData(i, service_data, Qt.ItemDataRole.UserRole) break self.update() - - def refresh_tidal_api_status(self): - self.tidal_api_status_checker = TidalStatusChecker() - self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status) - self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}")) - self.tidal_api_status_checker.start() + + def refresh_tidal_status(self): + self.tidal_status_checker = TidalStatusChecker() + self.tidal_status_checker.status_updated.connect(self.update_tidal_service_status) + self.tidal_status_checker.error.connect(lambda e: print(f"Tidal status check error: {e}")) + self.tidal_status_checker.start() def currentData(self, role=Qt.ItemDataRole.UserRole + 1): return super().currentData(role) @@ -559,7 +503,7 @@ class QobuzRegionComboBox(QComboBox): class SpotiFLACGUI(QWidget): def __init__(self): super().__init__() - self.current_version = "3.4" + self.current_version = "3.5" self.tracks = [] self.reset_state() @@ -570,9 +514,8 @@ class SpotiFLACGUI(QWidget): self.filename_format = self.settings.value('filename_format', 'title_artist') self.use_track_numbers = self.settings.value('use_track_numbers', False, type=bool) self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool) - self.service = self.settings.value('service', 'amazon') + self.service = self.settings.value('service', 'tidal') self.qobuz_region = self.settings.value('qobuz_region', 'us') - self.timeout_value = self.settings.value('timeout_value', 30, type=int) self.check_for_updates = self.settings.value('check_for_updates', True, type=bool) self.elapsed_time = QTime(0, 0, 0) @@ -902,20 +845,11 @@ class SpotiFLACGUI(QWidget): self.service_dropdown = ServiceComboBox() self.service_dropdown.currentIndexChanged.connect(self.on_service_changed) - service_fallback_layout.addWidget(service_label) service_fallback_layout.addWidget(self.service_dropdown) service_fallback_layout.addSpacing(10) - timeout_label = QLabel('Timeout:') - self.timeout_input = QLineEdit() - self.timeout_input.setText(str(self.timeout_value)) - self.timeout_input.setFixedWidth(35) - self.timeout_input.textChanged.connect(self.save_timeout_setting) - service_fallback_layout.addWidget(timeout_label) - service_fallback_layout.addWidget(self.timeout_input) - region_label = QLabel('Region:') self.qobuz_region_dropdown = QobuzRegionComboBox() self.qobuz_region_dropdown.currentIndexChanged.connect(self.save_qobuz_region_setting) @@ -932,7 +866,6 @@ class SpotiFLACGUI(QWidget): settings_layout.addStretch() settings_tab.setLayout(settings_layout) self.tab_widget.addTab(settings_tab, "Settings") - for i in range(self.service_dropdown.count()): if self.service_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == self.service: self.service_dropdown.setCurrentIndex(i) @@ -943,6 +876,8 @@ class SpotiFLACGUI(QWidget): self.qobuz_region_dropdown.setCurrentIndex(i) break + self.update_service_ui() + self.qobuz_region_dropdown.status_updated.connect( lambda region_id, is_online: self.service_dropdown.update_qobuz_status(region_id, is_online) ) @@ -955,8 +890,7 @@ class SpotiFLACGUI(QWidget): sections = [ ("Check for Updates", "https://github.com/afkarxyz/SpotiFLAC/releases"), - ("Report an Issue", "https://github.com/afkarxyz/SpotiFLAC/issues"), - ("Lucida Status", "https://status.lucida.to") + ("Report an Issue", "https://github.com/afkarxyz/SpotiFLAC/issues") ] for title, url in sections: @@ -988,7 +922,7 @@ class SpotiFLACGUI(QWidget): } """) button.setCursor(Qt.CursorShape.PointingHandCursor) - button.clicked.connect(lambda _, url=url: QDesktopServices.openUrl(QUrl(url))) + button.clicked.connect(lambda _, url=url: QDesktopServices.openUrl(QUrl(url if url.startswith(('http://', 'https://')) else f'https://{url}'))) section_layout.addWidget(button, alignment=Qt.AlignmentFlag.AlignCenter) about_layout.addWidget(section_widget) @@ -997,25 +931,18 @@ class SpotiFLACGUI(QWidget): spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed) about_layout.addItem(spacer) - footer_label = QLabel("v3.4 | June 2025") + footer_label = QLabel("v3.5 | July 2025") footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;") about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter) about_tab.setLayout(about_layout) - self.tab_widget.addTab(about_tab, "About") - + self.tab_widget.addTab(about_tab, "About") def on_service_changed(self, index): service = self.service_dropdown.currentData() self.service = service self.settings.setValue('service', service) self.settings.sync() - timeout_label = None - for widget in self.timeout_input.parentWidget().children(): - if isinstance(widget, QLabel) and widget.text() == "Timeout:": - timeout_label = widget - break - region_label = None for widget in self.qobuz_region_dropdown.parentWidget().children(): if isinstance(widget, QLabel) and widget.text() == "Region:": @@ -1023,30 +950,32 @@ class SpotiFLACGUI(QWidget): break if service == "qobuz": - self.timeout_input.hide() - if timeout_label: - timeout_label.hide() - if region_label: region_label.show() self.qobuz_region_dropdown.show() - elif service == "tidal_api": - self.timeout_input.hide() - if timeout_label: - timeout_label.hide() + else: if region_label: region_label.hide() self.qobuz_region_dropdown.hide() - else: - self.timeout_input.show() - if timeout_label: - timeout_label.show() - + self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}") + + def update_service_ui(self): + service = self.service + + region_label = None + for widget in self.qobuz_region_dropdown.parentWidget().children(): + if isinstance(widget, QLabel) and widget.text() == "Region:": + region_label = widget + break + + if service == "qobuz": + if region_label: + region_label.show() + self.qobuz_region_dropdown.show() + else: if region_label: region_label.hide() self.qobuz_region_dropdown.hide() - - self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}") def save_url(self): self.settings.setValue('spotify_url', self.spotify_url.text().strip()) @@ -1061,27 +990,11 @@ class SpotiFLACGUI(QWidget): self.use_track_numbers = self.track_number_checkbox.isChecked() self.settings.setValue('use_track_numbers', self.use_track_numbers) self.settings.sync() - def save_album_subfolder_setting(self): self.use_album_subfolders = self.album_subfolder_checkbox.isChecked() self.settings.setValue('use_album_subfolders', self.use_album_subfolders) self.settings.sync() - def save_timeout_setting(self): - try: - timeout = int(self.timeout_input.text()) - if timeout > 0: - self.timeout_value = timeout - self.settings.setValue('timeout_value', self.timeout_value) - self.settings.sync() - self.log_output.append(f"Timeout setting saved: {self.timeout_value} seconds") - else: - self.timeout_input.setText(str(self.timeout_value)) - self.log_output.append("Timeout must be a positive number") - except ValueError: - self.timeout_input.setText(str(self.timeout_value)) - self.log_output.append("Timeout must be a valid number") - def save_qobuz_region_setting(self): region = self.qobuz_region_dropdown.currentData() self.qobuz_region = region @@ -1382,7 +1295,6 @@ class SpotiFLACGUI(QWidget): self.use_track_numbers, self.use_album_subfolders, service, - self.timeout_value, qobuz_region ) self.worker.finished.connect(self.on_download_finished) diff --git a/getMetadata.py b/getMetadata.py index 004320b..4f346b7 100644 --- a/getMetadata.py +++ b/getMetadata.py @@ -2,35 +2,43 @@ from time import sleep from urllib.parse import urlparse, parse_qs import requests import json -import hmac import time -import hashlib -from typing import Tuple, Callable, Dict, Any, List +import pyotp +import base64 +from random import randrange +from typing import Dict, Any, List, Tuple -_TOTP_SECRET = bytearray([53,53,48,55,49,52,53,56,53,51,52,56,55,52,57,57,53,57,50,50,52,56,54,51,48,51,50,57,51,52,55]) +# https://github.com/visagenull/Spotify-Free +def get_random_user_agent(): + return f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_{randrange(11, 15)}_{randrange(4, 9)}) AppleWebKit/{randrange(530, 537)}.{randrange(30, 37)} (KHTML, like Gecko) Chrome/{randrange(80, 105)}.0.{randrange(3000, 4500)}.{randrange(60, 125)} Safari/{randrange(530, 537)}.{randrange(30, 36)}" -def generate_totp( - secret: bytes = _TOTP_SECRET, - algorithm: Callable[[], object] = hashlib.sha1, - digits: int = 6, - counter_factory: Callable[[], int] = lambda: int(time.time()) // 30, -) -> Tuple[str, int]: - counter = counter_factory() - hmac_result = hmac.new( - secret, counter.to_bytes(8, byteorder="big"), algorithm - ).digest() +def generate_totp(): + secret_cipher = [37, 84, 32, 76, 87, 90, 87, 47, 13, 75, 48, 54, 44, 28, 19, 21, 22] + processed = [byte ^ ((i % 33) + 9) for i, byte in enumerate(secret_cipher)] + processed_str = "".join(map(str, processed)) + utf8_bytes = processed_str.encode('utf-8') + hex_str = utf8_bytes.hex() + secret_bytes = bytes.fromhex(hex_str) + b32_secret = base64.b32encode(secret_bytes).decode('utf-8') + totp = pyotp.TOTP(b32_secret) - offset = hmac_result[-1] & 15 - truncated_value = ( - (hmac_result[offset] & 127) << 24 - | (hmac_result[offset + 1] & 255) << 16 - | (hmac_result[offset + 2] & 255) << 8 - | (hmac_result[offset + 3] & 255) - ) - return ( - str(truncated_value % (10**digits)).zfill(digits), - counter * 30_000, - ) + headers = { + "Host": "open.spotify.com", + "User-Agent": get_random_user_agent(), + "Accept": "*/*", + } + + try: + resp = requests.get("https://open.spotify.com/api/server-time", headers=headers, timeout=10) + if resp.status_code != 200: + raise Exception(f"Failed to get server time. Status code: {resp.status_code}") + data = resp.json() + server_time = data.get("serverTime") + if server_time is None: + raise Exception("Failed to fetch server time from Spotify") + return totp, server_time + except Exception as e: + raise Exception(f"Error getting server time: {str(e)}") token_url = 'https://open.spotify.com/api/token' playlist_base_url = 'https://api.spotify.com/v1/playlists/{}' @@ -102,14 +110,20 @@ def get_json_from_api(api_url, access_token): def get_access_token(): try: - totp, timestamp = generate_totp() + totp, server_time = generate_totp() + otp_code = totp.at(int(server_time)) + timestamp_ms = int(time.time() * 1000) params = { - "reason": "init", - "productType": "web-player", - "totp": totp, - "totpVer": 5, - "ts": timestamp, + 'reason': 'init', + 'productType': 'web-player', + 'totp': otp_code, + 'totpServerTime': server_time, + 'totpVer': '8', + 'sTime': server_time, + 'cTime': timestamp_ms, + 'buildVer': 'web-player_2025-07-02_1720000000000_12345678', + 'buildDate': '2025-07-02' } req = requests.get(token_url, headers=headers, params=params, timeout=10) diff --git a/getTracks.py b/getTracks.py deleted file mode 100644 index 6ec7842..0000000 --- a/getTracks.py +++ /dev/null @@ -1,1076 +0,0 @@ -import requests -import time -import os -import asyncio -import re -import base64 -import json -import tempfile -import httpx -import aiofiles -from datetime import datetime -from mutagen.flac import FLAC, Picture -from mutagen.id3 import PictureType - -class ProgressCallback: - def __call__(self, current, total): - if total > 0: - percent = (current / total) * 100 - print(f"\r{percent:.2f}% ({current}/{total})", end="") - else: - print(f"\r{current / (1024 * 1024):.2f} MB", end="") - -class LucidaDownloader: - def __init__(self, timeout=30): - self.client = requests.Session() - self.headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' - } - self.progress_callback = ProgressCallback() - self.timeout = timeout - - self.base_domain = "lucida.to" - - def set_progress_callback(self, callback): - self.progress_callback = callback - - def generate_filename(self, track_id, service): - return f"{track_id}_{service}.flac" - - async def get_track_info(self, track_id, service="tidal"): - if service not in ["tidal", "amazon", "deezer"]: - raise ValueError("Service must be one of 'tidal', 'amazon', or 'deezer'") - - spotify_url = f"https://open.spotify.com/track/{track_id}" - - result = self._convert_spotify_link(spotify_url, service) - - if "error" in result: - raise Exception(f"Error: {result['error']}") - - result["track_id"] = track_id - - return result - - def _convert_spotify_link(self, spotify_url, target_service="tidal"): - track_id_match = re.search(r'track/([a-zA-Z0-9]+)', spotify_url) - if not track_id_match: - return {"error": "Invalid Spotify URL"} - - base_url = f"https://{self.base_domain}" - - headers = { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", - "Accept-Language": "id-ID,id;q=0.9", - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "Host": self.base_domain, - "Pragma": "no-cache", - "Upgrade-Insecure-Requests": "1", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" - } - - try: - headers["Referer"] = f"{base_url}/?url={spotify_url}&country=auto" - - request_params = { - "url": spotify_url, - "country": "auto", - "to": target_service - } - - session = requests.Session() - session.verify = True - - response = session.get( - base_url, - params=request_params, - headers=headers, - timeout=self.timeout - ) - - html_content = response.text - - token_match = re.search(r'token:"([^"]+)"', html_content) - token_expiry_match = re.search(r'tokenExpiry:(\d+)', html_content) - - token = token_match.group(1) if token_match else None - token_expiry = int(token_expiry_match.group(1)) if token_expiry_match else None - - url = None - url_patterns = [ - r'"url":"([^"]+)"', - r'href="(https?://[^"]*' + re.escape(target_service) + r'[^"]*track[^"]*)"', - ] - - for pattern in url_patterns: - url_match = re.search(pattern, html_content) - if url_match: - url = url_match.group(1).replace('\\/', '/') - break - - if not url: - redirect_patterns = [ - r'url=([^&"]+)', - r'href="([^"]+)"', - r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]', - ] - - for pattern in redirect_patterns: - matches = re.finditer(pattern, html_content) - for match in matches: - potential_url = match.group(1) - if potential_url.startswith('http') and target_service.lower() in potential_url.lower(): - url = potential_url.replace('\\/', '/') - break - - if not url: - service_urls = re.finditer(r'(https?://[^"\s]+' + re.escape(target_service) + r'[^"\s]+)', html_content) - for match in service_urls: - url = match.group(1).replace('\\/', '/') - break - - result = { - "service": target_service, - "url": url, - "token": { - "primary": None, - "expiry": None - } - } - - if token: - try: - decoded_once = base64.b64decode(token).decode('latin1') - decoded_token = base64.b64decode(decoded_once).decode('latin1') - result["token"]["primary"] = decoded_token - except Exception: - result["token"]["primary"] = token - - result["token"]["expiry"] = token_expiry - - return result - - except Exception as error: - return {"error": str(error)} - - def download(self, metadata, output_dir=".", is_paused_callback=None, is_stopped_callback=None): - track_url = metadata['url'] - primary_token = metadata['token']['primary'] - expiry = metadata['token']['expiry'] - track_id = metadata['track_id'] - service = metadata['service'] - - print(f"Starting download: track ID {track_id}") - - if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped") - - file_name = self.generate_filename(track_id, service) - file_path = os.path.join(output_dir, file_name) - - if os.path.exists(file_path): - file_size = os.path.getsize(file_path) - if file_size > 0: - print(f"File already exists: {file_path} ({file_size / (1024 * 1024):.2f} MB)") - return file_path - - initial_request = { - "account": {"id": "auto", "type": "country"}, - "compat": "false", - "downscale": "original", - "handoff": True, - "metadata": True, - "private": True, - "token": { - "expiry": expiry, - "primary": primary_token - }, - "upload": {"enabled": False, "service": "pixeldrain"}, - "url": track_url - } - - response = self.client.post(f"https://{self.base_domain}/api/load?url=/api/fetch/stream/v2", - json=initial_request, - headers=self.headers) - - csrf_token = response.cookies.get('csrf_token') - if csrf_token: - self.headers['X-CSRF-Token'] = csrf_token - - initial_response = response.json() - - if not initial_response.get("success", False): - raise Exception(f"Request failed: {initial_response.get('error', 'Unknown error')}") - - handoff = initial_response["handoff"] - server = initial_response["server"] - - file_name = self.generate_filename(track_id, service) - - completion_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}" - - print("Waiting for processing...") - while True: - if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped") - - while is_paused_callback and is_paused_callback(): - time.sleep(0.1) - if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped") - - completion_response = self.client.get(completion_url, headers=self.headers).json() - - status = completion_response["status"] - if status == "completed": - print("Processing: 100%") - break - elif status == "error": - raise Exception(f"API error: {completion_response.get('message', 'Unknown error')}") - else: - progress = completion_response.get("progress", {}) - if progress: - current = progress.get("current", 0) - total = progress.get("total", 100) - percent = int((current / total) * 100) if total > 0 else 0 - action = progress.get("action", "Processing") - print(f"{percent}% - {action}") - - if action.lower() == "metadata": - if self.progress_callback: - self.progress_callback(0, 0) - else: - print(f"Status: {status}") - if status.lower() == "metadata": - if self.progress_callback: - self.progress_callback(0, 0) - - time.sleep(1) - - download_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}/download" - print(f"Downloading file...") - - response = self.client.get(download_url, stream=True, headers=self.headers) - total_size = int(response.headers.get('content-length', 0)) - downloaded_size = 0 - - file_path = os.path.join(output_dir, file_name) - - try: - with open(file_path, 'wb') as file: - start_time = time.time() - last_update_time = start_time - - for chunk in response.iter_content(chunk_size=8192): - if is_stopped_callback and is_stopped_callback(): - file.close() - if os.path.exists(file_path): - os.remove(file_path) - raise Exception("Download stopped") - - while is_paused_callback and is_paused_callback(): - time.sleep(0.1) - if is_stopped_callback and is_stopped_callback(): - file.close() - if os.path.exists(file_path): - os.remove(file_path) - raise Exception("Download stopped") - - if chunk: - file.write(chunk) - downloaded_size += len(chunk) - - current_time = time.time() - if current_time - last_update_time >= 1: - if total_size > 0: - progress_percent = (downloaded_size / total_size) * 100 - elapsed_time = current_time - start_time - speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0 - print(f"{progress_percent:.2f}% - {speed:.2f} MB/s") - else: - print(f"{downloaded_size / (1024 * 1024):.2f} MB") - - last_update_time = current_time - - if self.progress_callback: - self.progress_callback(downloaded_size, total_size) - - if downloaded_size == 0: - raise Exception("No data received") - - print(f"Complete. File saved: {file_path}") - return file_path - - except Exception as e: - if os.path.exists(file_path) and os.path.getsize(file_path) == 0: - try: - os.remove(file_path) - except: - pass - raise e - -class SquidWTFDownloader: - def __init__(self, region="us", timeout=30): - if region not in ["eu", "us"]: - raise ValueError("Region must be either 'us' or 'eu'") - - self.region = region - self.timeout = timeout - self.session = requests.Session() - self.headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' - } - self.base_api_url = f"https://{region}.qobuz.squid.wtf/api" - self.download_chunk_size = 256 * 1024 - self.progress_callback = ProgressCallback() - - def set_progress_callback(self, callback): - self.progress_callback = callback - - def sanitize_filename(self, filename): - if not filename: - return "Unknown Track" - sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename)) - return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track" - - def get_track_info(self, isrc): - print(f"Fetching: {isrc}") - search_url = f"{self.base_api_url}/get-music" - params = {'q': isrc, 'offset': 0, 'limit': 10} - - try: - response = self.session.get(search_url, params=params, timeout=self.timeout) - response.raise_for_status() - data = response.json() - - selected_track = None - if data and data.get("success"): - items = data.get("data", {}).get("tracks", {}).get("items", []) - priority = {24: 1, 16: 2} - for track in items: - track_isrc = track.get("isrc", "").upper() - search_isrc = isrc.upper() - if track_isrc == search_isrc: - current_prio = priority.get(track.get("maximum_bit_depth"), 3) - if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3): - selected_track = track - if current_prio == 1: - break - - if not selected_track: - raise Exception(f"Track not found: {isrc}") - - title = selected_track.get('title', 'Unknown') - bit_depth = selected_track.get('maximum_bit_depth', 'Unknown') - print(f"Found: {title} ({bit_depth}b)") - return selected_track - - except requests.exceptions.RequestException as e: - raise Exception(f"Request error: {e}") - except Exception as e: - raise Exception(f"Error: {e}") - - def get_download_url(self, track_id): - print("Fetching URL...") - download_api_url = f"{self.base_api_url}/download-music" - params = {'track_id': track_id, 'quality': 27} - - try: - response = self.session.get(download_api_url, params=params, timeout=self.timeout) - response.raise_for_status() - data = response.json() - - if data and data.get("success") and data.get("data", {}).get("url"): - download_url = data["data"]["url"] - print("URL found") - return download_url - else: - error_msg = data.get('error', {}).get('message', 'Unknown API error') - raise Exception(f"API error: {error_msg}") - - except requests.exceptions.RequestException as e: - raise Exception(f"Request error: {e}") - except Exception as e: - raise Exception(f"Error: {e}") - - def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None): - if output_dir != ".": - try: - os.makedirs(output_dir, exist_ok=True) - except OSError as e: - raise Exception(f"Directory error: {e}") - - track_info = self.get_track_info(isrc) - track_id = track_info.get("id") - - if not track_id: - raise Exception("No track ID found") - - artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name')) - track_title = self.sanitize_filename(track_info.get('title')) - output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac") - - if os.path.exists(output_filename): - file_size = os.path.getsize(output_filename) - if file_size > 0: - print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)") - return output_filename - - download_url = self.get_download_url(track_id) - temp_filename = output_filename + ".part" - - print(f"Downloading...") - try: - with self.session.get(download_url, stream=True, timeout=900) as response, \ - open(temp_filename, 'wb') as f: - response.raise_for_status() - total_size = int(response.headers.get('content-length', 0)) - downloaded_size = 0 - start_time = time.time() - last_update_time = start_time - - for chunk in response.iter_content(chunk_size=self.download_chunk_size): - if is_stopped_callback and is_stopped_callback(): - f.close() - if os.path.exists(temp_filename): - os.remove(temp_filename) - raise Exception("Download stopped") - - while is_paused_callback and is_paused_callback(): - time.sleep(0.1) - if is_stopped_callback and is_stopped_callback(): - f.close() - if os.path.exists(temp_filename): - os.remove(temp_filename) - raise Exception("Download stopped") - f.write(chunk) - downloaded_size += len(chunk) - - current_time = time.time() - if current_time - last_update_time >= 1: - if total_size > 0: - progress_percent = (downloaded_size / total_size) * 100 - elapsed_time = current_time - start_time - speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0 - print(f"{progress_percent:.2f}% - {speed:.2f} MB/s") - else: - print(f"{downloaded_size / (1024 * 1024):.2f} MB") - - last_update_time = current_time - - if self.progress_callback: - self.progress_callback(downloaded_size, total_size) - - os.rename(temp_filename, output_filename) - print("Download complete") - - except requests.exceptions.RequestException as e: - if os.path.exists(temp_filename): - os.remove(temp_filename) - raise Exception(f"Download failed: {e}") - except Exception as e: - if os.path.exists(temp_filename): - os.remove(temp_filename) - raise Exception(f"File error: {e}") - - print("Adding metadata...") - try: - self._embed_metadata(output_filename, track_info) - print("Metadata saved") - except Exception as e: - print(f"Tagging failed: {e}") - - print(f"Done") - return output_filename - - def _embed_metadata(self, filename, track_info): - try: - audio = FLAC(filename) - audio.delete() - audio.clear_pictures() - - album_info = track_info.get('album', {}) - artist = track_info.get('performer', {}).get('name') - - if track_info.get('title'): - audio['TITLE'] = track_info['title'] - if artist: - audio['ARTIST'] = artist - if album_info.get('title'): - audio['ALBUM'] = album_info['title'] - if album_info.get('artist', {}).get('name', artist): - audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist) - if track_info.get('track_number'): - audio['TRACKNUMBER'] = str(track_info['track_number']) - if track_info.get('release_date_original'): - audio['DATE'] = track_info['release_date_original'] - try: - audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year) - except ValueError: - pass - if album_info.get('genre', {}).get('name'): - audio['GENRE'] = album_info['genre']['name'] - if track_info.get('copyright'): - audio['COPYRIGHT'] = track_info['copyright'] - if track_info.get('isrc'): - audio['ISRC'] = track_info['isrc'] - if album_info.get('label', {}).get('name'): - audio['ORGANIZATION'] = album_info['label']['name'] - - img_info = album_info.get('image', {}) - cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail') - if cover_url: - try: - img_response = self.session.get(cover_url, timeout=30) - img_response.raise_for_status() - mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower() - if mime_type in ['image/jpeg', 'image/png']: - picture = Picture() - picture.data = img_response.content - picture.type = PictureType.COVER_FRONT - picture.mime = mime_type - audio.add_picture(picture) - print("Cover added") - except Exception as e: - print(f"Cover error: {str(e)}") - - audio.save() - - except Exception as e: - raise Exception(f"Metadata error: {e}") - -class TidalDownloader: - def __init__(self, client_id="zU4XHVVkc2tDPo4t", client_secret="VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4=", timeout=30): - self.client_id = client_id - self.client_secret = client_secret - self.timeout = timeout - self.progress_callback = ProgressCallback() - - self.temp_dir = tempfile.gettempdir() - self.token_path = os.path.join(self.temp_dir, "tidal_token.json") - self.access_token = None - - if os.path.exists(self.token_path): - try: - with open(self.token_path, "r") as tok: - token = json.loads(tok.read()) - self.access_token = token.get("access_token") - except: - pass - - def set_progress_callback(self, callback): - self.progress_callback = callback - - async def get_access_token(self): - refresh_url = "https://auth.tidal.com/v1/oauth2/token" - - payload = { - "client_id": self.client_id, - "grant_type": "client_credentials", - } - - async with httpx.AsyncClient(http2=True) as client: - try: - response = await client.post( - url=refresh_url, - data=payload, - auth=(self.client_id, self.client_secret), - ) - - if response.status_code == 200: - token_data = response.json() - new_token = token_data.get("access_token") - - try: - with open(self.token_path, "w") as f: - json.dump({ - "access_token": new_token - }, f) - except: - pass - - self.access_token = new_token - return new_token - return None - - except: - return None - - async def search_tracks(self, query): - try: - tidal_token = self.access_token or await self.get_access_token() - if not tidal_token: - return {"error": "Failed to get access token"} - - search_url = f"https://api.tidal.com/v1/search/tracks?query={query}&limit=25&offset=0&countryCode=US" - header = {"authorization": f"Bearer {tidal_token}"} - - async with httpx.AsyncClient(http2=True) as client: - search_data = await client.get(url=search_url, headers=header) - response_data = search_data.json() - - filtered_items = [{ - "id": item.get("id"), - "title": item.get("title"), - "url": item.get("url"), - "isrc": item.get("isrc"), - "audioQuality": item.get("audioQuality"), - "mediaMetadata": item.get("mediaMetadata"), - "album": item.get("album", {}), - "artists": item.get("artists", []), - "artist": item.get("artist", {}), - "trackNumber": item.get("trackNumber"), - "volumeNumber": item.get("volumeNumber"), - "duration": item.get("duration"), - "copyright": item.get("copyright"), - "explicit": item.get("explicit") - } for item in response_data.get("items", [])] - - return { - "limit": response_data.get("limit"), - "offset": response_data.get("offset"), - "totalNumberOfItems": response_data.get("totalNumberOfItems"), - "items": filtered_items - } - - except Exception as e: - return {"error": f"Error: {str(e)}"} - - async def filter_by_isrc(self, query, isrc=None): - try: - result = await self.search_tracks(query) - - if "error" in result: - return result - - if isrc: - isrc_items = [item for item in result["items"] if item.get("isrc", "").upper() == isrc.upper()] - - if len(isrc_items) > 1: - hires_items = [] - for item in isrc_items: - media_metadata = item.get("mediaMetadata", {}) - tags = media_metadata.get("tags", []) if media_metadata else [] - if "HIRES_LOSSLESS" in tags: - hires_items.append(item) - - if hires_items: - result["items"] = hires_items - else: - result["items"] = isrc_items - else: - result["items"] = isrc_items - - result["totalNumberOfItems"] = len(result["items"]) - - return result - - except Exception as e: - return {"error": f"Error: {str(e)}"} - - async def get_track_download_info(self, track_id, quality="LOSSLESS"): - try: - download_api_url = f"https://hifi.401658.xyz/track/?id={track_id}&quality={quality}" - - async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client: - response = await client.get(download_api_url) - - if response.status_code == 200: - data = response.json() - - for item in data: - if "OriginalTrackUrl" in item: - return { - "success": True, - "download_url": item["OriginalTrackUrl"], - "track_info": data[0] if data else {} - } - - return {"success": False, "error": "OriginalTrackUrl not found in response"} - else: - return {"success": False, "error": f"API returned status code: {response.status_code}"} - - except Exception as e: - return {"success": False, "error": f"Error getting download info: {str(e)}"} - - async def download_album_art(self, album_id, size="1280x1280"): - try: - art_url = f"https://resources.tidal.com/images/{album_id.replace('-', '/')}/{size}.jpg" - - async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client: - response = await client.get(art_url) - - if response.status_code == 200: - return response.content - else: - print(f"Failed to download album art: HTTP {response.status_code}") - return None - - except Exception as e: - print(f"Error downloading album art: {str(e)}") - return None - - async def embed_metadata(self, filepath, track_info, search_info=None): - try: - audio = FLAC(filepath) - - audio.clear() - - if track_info.get("title"): - audio["TITLE"] = track_info["title"] - - artists_list = [] - if search_info and search_info.get("artists"): - for artist in search_info["artists"]: - if artist.get("name"): - artists_list.append(artist["name"]) - elif search_info and search_info.get("artist") and search_info["artist"].get("name"): - artists_list.append(search_info["artist"]["name"]) - elif track_info.get("artists"): - for artist in track_info["artists"]: - if artist.get("name"): - artists_list.append(artist["name"]) - elif track_info.get("artist") and track_info["artist"].get("name"): - artists_list.append(track_info["artist"]["name"]) - - if artists_list: - audio["ARTIST"] = artists_list[0] - if len(artists_list) > 1: - audio["ALBUMARTIST"] = "; ".join(artists_list) - else: - audio["ALBUMARTIST"] = artists_list[0] - - album_info = search_info.get("album", {}) if search_info else track_info.get("album", {}) - if album_info.get("title"): - audio["ALBUM"] = album_info["title"] - - if search_info and search_info.get("trackNumber"): - audio["TRACKNUMBER"] = str(search_info["trackNumber"]) - elif track_info.get("trackNumber"): - audio["TRACKNUMBER"] = str(track_info["trackNumber"]) - - if search_info and search_info.get("volumeNumber"): - audio["DISCNUMBER"] = str(search_info["volumeNumber"]) - elif track_info.get("volumeNumber"): - audio["DISCNUMBER"] = str(track_info["volumeNumber"]) - - isrc = search_info.get("isrc") if search_info else track_info.get("isrc") - if isrc: - audio["ISRC"] = isrc - - copyright_info = search_info.get("copyright") if search_info else track_info.get("copyright") - if copyright_info: - audio["COPYRIGHT"] = copyright_info - - release_date = None - if search_info and search_info.get("streamStartDate"): - release_date = search_info["streamStartDate"] - elif track_info.get("streamStartDate"): - release_date = track_info["streamStartDate"] - - if release_date: - if "T" in release_date: - date_part = release_date.split("T")[0] - audio["DATE"] = date_part - else: - audio["DATE"] = release_date - - if track_info.get("genre"): - audio["GENRE"] = track_info["genre"] - - if album_info.get("cover"): - album_art = await self.download_album_art(album_info["cover"]) - if album_art: - picture = Picture() - picture.data = album_art - picture.type = PictureType.COVER_FRONT - picture.mime = "image/jpeg" - picture.desc = "Cover" - audio.add_picture(picture) - print("Album art embedded successfully") - - audio.save() - print(f"Metadata embedded successfully for: {track_info.get('title', 'Unknown')}") - - return True - - except Exception as e: - print(f"Error embedding metadata: {str(e)}") - return False - - async def download_file(self, url, filename, max_retries=3, is_paused_callback=None, is_stopped_callback=None): - for attempt in range(max_retries): - try: - async with httpx.AsyncClient(http2=True, timeout=60.0) as client: - async with client.stream('GET', url) as response: - if response.status_code == 200: - total_size_in_bytes = int(response.headers.get('content-length', 0)) - bytes_downloaded = 0 - async with aiofiles.open(filename, 'wb') as f: - async for chunk in response.aiter_bytes(chunk_size=8192): - if is_stopped_callback and is_stopped_callback(): - print("\\nDownload stopped.") - if os.path.exists(filename): - try: - os.remove(filename) - except OSError as e: - print(f"Error removing partial file: {e}") - return {"success": False, "error": "Download stopped by user"} - - while is_paused_callback and is_paused_callback(): - print("\\nDownload paused. Waiting...") - await asyncio.sleep(1) - - await f.write(chunk) - bytes_downloaded += len(chunk) - if total_size_in_bytes > 0: - if self.progress_callback: - self.progress_callback(bytes_downloaded, total_size_in_bytes) - if total_size_in_bytes > 0 and bytes_downloaded == total_size_in_bytes: - print() - print(f"Successfully downloaded: {filename} ({bytes_downloaded} bytes)") - return {"success": True, "size": bytes_downloaded} - else: - print(f"\\nFailed to download {filename}. HTTP Status: {response.status_code}") - if os.path.exists(filename): - try: - os.remove(filename) - except OSError as e: - print(f"Error removing partial file after server error: {e}") - return {"success": False, "error": f"HTTP {response.status_code}"} - - except Exception as e: - print() - if os.path.exists(filename): - try: - os.remove(filename) - except OSError as ose: - print(f"Error removing partial file after exception: {ose}") - if attempt < max_retries - 1: - print(f"Download attempt {attempt + 1} failed, retrying...") - await asyncio.sleep(2) - else: - return {"success": False, "error": f"Download failed after {max_retries} attempts: {str(e)}"} - - async def download_track(self, track_ids, search_results, output_dir=".", quality="LOSSLESS", embed_meta=True, is_paused_callback=None, is_stopped_callback=None): - if not isinstance(track_ids, list): - track_ids = [track_ids] - - if output_dir != ".": - os.makedirs(output_dir, exist_ok=True) - - search_map = {} - if search_results and search_results.get("items"): - for item in search_results["items"]: - search_map[item["id"]] = item - - all_skipped = True - skipped_files = [] - - for i, track_id in enumerate(track_ids): - download_info = await self.get_track_download_info(track_id, quality) - - if not download_info["success"]: - print(f"Failed to get download info for track {track_id}: {download_info['error']}") - continue - - download_url = download_info["download_url"] - track_info = download_info["track_info"] - search_info = search_map.get(track_id) - - title = track_info.get("title", f"track_{track_id}") - - artists_list = [] - if search_info and search_info.get("artists"): - for artist in search_info["artists"]: - if artist.get("name"): - artists_list.append(artist["name"]) - elif search_info and search_info.get("artist") and search_info["artist"].get("name"): - artists_list.append(search_info["artist"]["name"]) - elif track_info.get("artists"): - for artist in track_info["artists"]: - if artist.get("name"): - artists_list.append(artist["name"]) - elif track_info.get("artist") and track_info["artist"].get("name"): - artists_list.append(track_info["artist"]["name"]) - - artist_names = ", ".join(artists_list) if artists_list else "" - - safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).rstrip() - safe_artists = "".join(c for c in artist_names if c.isalnum() or c in (' ', '-', '_', ',', '.')).rstrip() - - if safe_artists: - filename = f"{safe_title} - {safe_artists}.flac" - else: - filename = f"{safe_title}.flac" - - filepath = os.path.join(output_dir, filename) - - if os.path.exists(filepath): - print(f"File {filename} already exists. Skipping download.") - skipped_files.append(filename) - if len(track_ids) == 1: - return { - "success": True, - "status": "skipped_exists", - "track_id": track_id, - "filename": filename, - "filepath": filepath, - "message": f"File {filename} already exists." - } - continue - - all_skipped = False - print(f"Downloading: {filename}") - - download_result = await self.download_file(download_url, filepath, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback) - - if download_result["success"]: - print(f"Successfully downloaded track {track_id}") - - if embed_meta: - print("Embedding metadata...") - await self.embed_metadata(filepath, track_info, search_info) - - return { - "success": True, - "track_id": track_id, - "filename": filename, - "filepath": filepath, - "size": download_result["size"], - "track_info": track_info, - "metadata_embedded": embed_meta - } - else: - print(f"Failed to download track {track_id}: {download_result['error']}") - if os.path.exists(filepath): - try: - os.remove(filepath) - except: - pass - if download_result.get("error") == "Download stopped by user": - return {"success": False, "error": "Download stopped by user", "track_id": track_id} - - if all_skipped and skipped_files: - return { - "success": True, - "status": "all_skipped", - "message": f"All files already exist: {', '.join(skipped_files)}" - } - - return {"success": False, "error": "All track IDs failed to download or were stopped"} - - async def search_and_download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None): - print(f"Searching for: {query}") - if isrc: - print(f"ISRC: {isrc}") - - search_result = await self.filter_by_isrc(query, isrc) - - if "error" in search_result: - print(f"Search error: {search_result['error']}") - return {"success": False, "error": search_result['error']} - - raw_result = None - if isrc: - raw_result = await self.search_tracks(query) - - if not search_result["items"]: - if isrc and raw_result and raw_result.get("items"): - print("No tracks found with ISRC filter, falling back to unfiltered search") - search_result = raw_result - else: - print("No tracks found") - return {"success": False, "error": "No tracks found"} - - track_ids = [item["id"] for item in search_result["items"]] - print(f"Found {len(track_ids)} track(s): {track_ids}") - - download_result = await self.download_track(track_ids, search_result, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback) - - return download_result - - async def download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None): - result = await self.search_and_download(query, isrc, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback) - - if result["success"]: - if result.get("status") == "all_skipped": - print(f"Skipped: {result['message']}") - if "filepath" in result: - return result["filepath"] - return output_dir - elif result.get("status") == "skipped_exists": - print(f"Skipped: {result['message']}") - return result["filepath"] - else: - print("Download completed!") - return result["filepath"] - else: - print(f"Download failed: {result['error']}") - if result.get("error") == "Download stopped by user": - raise Exception("Download stopped by user") - raise Exception(result["error"]) - -def print_progress(current, total): - if total > 0: - percent = (current / total) * 100 - print(f"\rProgress: {percent:.2f}% ({current}/{total})", end="") - else: - print(f"\rDownloaded: {current / (1024 * 1024):.2f} MB", end="") - -async def main(): - print("=== LucidaDownloader ===") - lucida = LucidaDownloader() - track_id = "2plbrEY59IikOBgBGLjaoe" - service = "tidal" - output_dir = "." - - try: - print(f"Getting track: {track_id} from {service}") - metadata = await lucida.get_track_info(track_id, service) - print("Starting download") - downloaded_file = lucida.download(metadata, output_dir) - print(f"Success: File saved as {downloaded_file}") - except Exception as e: - print(f"Error: {str(e)}") - - print("\n\n=== SquidWTFDownloader ===") - squid = SquidWTFDownloader(region="us") - - isrc = "USAT22409172" - output_dir = "." - - try: - downloaded_file = squid.download(isrc, output_dir) - print(f"Success: File saved as {downloaded_file}") - except Exception as e: - print(f"Error: {str(e)}") - - print("\n\n=== TidalDownloader ===") - tidal = TidalDownloader() - - query = "APT." - isrc = "USAT22409172" - output_dir = "." - - try: - downloaded_file = await tidal.download(query, isrc, output_dir, quality="LOSSLESS", embed_metadata=True) - print(f"Success: File saved as {downloaded_file}") - except Exception as e: - print(f"Error: {str(e)}") - -if __name__ == "__main__": - try: - import sys - if sys.platform == "win32": - import os - os.system("chcp 65001 > nul") - try: - sys.stdout.reconfigure(encoding='utf-8') - except: - pass - except: - pass - - asyncio.run(main()) \ No newline at end of file diff --git a/qobuzDL.py b/qobuzDL.py new file mode 100644 index 0000000..b32bd89 --- /dev/null +++ b/qobuzDL.py @@ -0,0 +1,271 @@ +import requests +import time +import os +import re +from datetime import datetime +from mutagen.flac import FLAC, Picture +from mutagen.id3 import PictureType + +class ProgressCallback: + def __call__(self, current, total): + if total > 0: + percent = (current / total) * 100 + print(f"\r{percent:.2f}% ({current}/{total})", end="") + else: + print(f"\r{current / (1024 * 1024):.2f} MB", end="") + +class QobuzDownloader: + def __init__(self, region="us", timeout=30): + if region not in ["eu", "us"]: + raise ValueError("Region must be either 'us' or 'eu'") + + self.region = region + self.timeout = timeout + self.session = requests.Session() + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + self.base_api_url = f"https://{region}.qobuz.squid.wtf/api" + self.download_chunk_size = 256 * 1024 + self.progress_callback = ProgressCallback() + + def set_progress_callback(self, callback): + self.progress_callback = callback + + def sanitize_filename(self, filename): + if not filename: + return "Unknown Track" + sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename)) + return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track" + + def get_track_info(self, isrc): + print(f"Fetching: {isrc}") + search_url = f"{self.base_api_url}/get-music" + params = {'q': isrc, 'offset': 0, 'limit': 10} + + try: + response = self.session.get(search_url, params=params, timeout=self.timeout) + response.raise_for_status() + data = response.json() + + selected_track = None + if data and data.get("success"): + items = data.get("data", {}).get("tracks", {}).get("items", []) + priority = {24: 1, 16: 2} + for track in items: + if track.get("isrc") == isrc: + current_prio = priority.get(track.get("maximum_bit_depth"), 3) + if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3): + selected_track = track + if current_prio == 1: + break + + if not selected_track: + raise Exception(f"Track not found: {isrc}") + + title = selected_track.get('title', 'Unknown') + bit_depth = selected_track.get('maximum_bit_depth', 'Unknown') + print(f"Found: {title} ({bit_depth}b)") + return selected_track + + except requests.exceptions.RequestException as e: + raise Exception(f"Request error: {e}") + except Exception as e: + raise Exception(f"Error: {e}") + + def get_download_url(self, track_id): + print("Fetching URL...") + download_api_url = f"{self.base_api_url}/download-music" + params = {'track_id': track_id, 'quality': 27} + + try: + response = self.session.get(download_api_url, params=params, timeout=self.timeout) + response.raise_for_status() + data = response.json() + + if data and data.get("success") and data.get("data", {}).get("url"): + download_url = data["data"]["url"] + print("URL found") + return download_url + else: + error_msg = data.get('error', {}).get('message', 'Unknown API error') + raise Exception(f"API error: {error_msg}") + + except requests.exceptions.RequestException as e: + raise Exception(f"Request error: {e}") + except Exception as e: + raise Exception(f"Error: {e}") + + def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None): + if output_dir != ".": + try: + os.makedirs(output_dir, exist_ok=True) + except OSError as e: + raise Exception(f"Directory error: {e}") + + track_info = self.get_track_info(isrc) + track_id = track_info.get("id") + + if not track_id: + raise Exception("No track ID found") + + artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name')) + track_title = self.sanitize_filename(track_info.get('title')) + output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac") + + if os.path.exists(output_filename): + file_size = os.path.getsize(output_filename) + if file_size > 0: + print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)") + return output_filename + + download_url = self.get_download_url(track_id) + temp_filename = output_filename + ".part" + + print(f"Downloading...") + try: + with self.session.get(download_url, stream=True, timeout=900) as response, \ + open(temp_filename, 'wb') as f: + response.raise_for_status() + total_size = int(response.headers.get('content-length', 0)) + downloaded_size = 0 + start_time = time.time() + last_update_time = start_time + + for chunk in response.iter_content(chunk_size=self.download_chunk_size): + if is_stopped_callback and is_stopped_callback(): + f.close() + if os.path.exists(temp_filename): + os.remove(temp_filename) + raise Exception("Download stopped") + + while is_paused_callback and is_paused_callback(): + time.sleep(0.1) + if is_stopped_callback and is_stopped_callback(): + f.close() + if os.path.exists(temp_filename): + os.remove(temp_filename) + raise Exception("Download stopped") + f.write(chunk) + downloaded_size += len(chunk) + + current_time = time.time() + if current_time - last_update_time >= 1: + if total_size > 0: + progress_percent = (downloaded_size / total_size) * 100 + elapsed_time = current_time - start_time + speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0 + print(f"{progress_percent:.2f}% - {speed:.2f} MB/s") + else: + print(f"{downloaded_size / (1024 * 1024):.2f} MB") + + last_update_time = current_time + + if self.progress_callback: + self.progress_callback(downloaded_size, total_size) + + os.rename(temp_filename, output_filename) + print("Download complete") + + except requests.exceptions.RequestException as e: + if os.path.exists(temp_filename): + os.remove(temp_filename) + raise Exception(f"Download failed: {e}") + except Exception as e: + if os.path.exists(temp_filename): + os.remove(temp_filename) + raise Exception(f"File error: {e}") + + print("Adding metadata...") + try: + self._embed_metadata(output_filename, track_info) + print("Metadata saved") + except Exception as e: + print(f"Tagging failed: {e}") + + print(f"Done") + return output_filename + + def _embed_metadata(self, filename, track_info): + try: + audio = FLAC(filename) + audio.delete() + audio.clear_pictures() + + album_info = track_info.get('album', {}) + artist = track_info.get('performer', {}).get('name') + + if track_info.get('title'): + audio['TITLE'] = track_info['title'] + if artist: + audio['ARTIST'] = artist + if album_info.get('title'): + audio['ALBUM'] = album_info['title'] + if album_info.get('artist', {}).get('name', artist): + audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist) + if track_info.get('track_number'): + audio['TRACKNUMBER'] = str(track_info['track_number']) + if track_info.get('release_date_original'): + audio['DATE'] = track_info['release_date_original'] + try: + audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year) + except ValueError: + pass + if album_info.get('genre', {}).get('name'): + audio['GENRE'] = album_info['genre']['name'] + if track_info.get('copyright'): + audio['COPYRIGHT'] = track_info['copyright'] + if track_info.get('isrc'): + audio['ISRC'] = track_info['isrc'] + if album_info.get('label', {}).get('name'): + audio['ORGANIZATION'] = album_info['label']['name'] + + img_info = album_info.get('image', {}) + cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail') + if cover_url: + try: + img_response = self.session.get(cover_url, timeout=30) + img_response.raise_for_status() + mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower() + if mime_type in ['image/jpeg', 'image/png']: + picture = Picture() + picture.data = img_response.content + picture.type = PictureType.COVER_FRONT + picture.mime = mime_type + audio.add_picture(picture) + print("Cover added") + except Exception as e: + print(f"Cover error: {str(e)}") + + audio.save() + + except Exception as e: + raise Exception(f"Metadata error: {e}") + +def main(): + print("=== QobuzDL - Qobuz Downloader ===") + downloader = QobuzDownloader(region="us") + + isrc = "USAT22409172" + output_dir = "." + + try: + downloaded_file = downloader.download(isrc, output_dir) + print(f"Success: File saved as {downloaded_file}") + except Exception as e: + print(f"Error: {str(e)}") + +if __name__ == "__main__": + try: + import sys + if sys.platform == "win32": + import os + os.system("chcp 65001 > nul") + try: + sys.stdout.reconfigure(encoding='utf-8') + except: + pass + except: + pass + + main() \ No newline at end of file diff --git a/tidalDL.py b/tidalDL.py new file mode 100644 index 0000000..1e606d8 --- /dev/null +++ b/tidalDL.py @@ -0,0 +1,454 @@ +import asyncio +import json +import os +import re +import tempfile +import time +import httpx +from mutagen.flac import FLAC, Picture +from mutagen.id3 import PictureType + +class ProgressCallback: + def __call__(self, current, total): + if total > 0: + percent = (current / total) * 100 + print(f"\r{percent:.2f}% ({current}/{total})", end="") + else: + print(f"\r{current / (1024 * 1024):.2f} MB", end="") + +class TidalDownloader: + def __init__(self, timeout=30, max_retries=3): + self.timeout = timeout + self.max_retries = max_retries + self.download_chunk_size = 256 * 1024 + self.progress_callback = ProgressCallback() + self.client_id = "zU4XHVVkc2tDPo4t" + self.client_secret = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4=" + self.temp_dir = tempfile.gettempdir() + self.token_path = os.path.join(self.temp_dir, "tidal_token.json") + self.access_token = None + self._load_token() + + def set_progress_callback(self, callback): + self.progress_callback = callback + + def _load_token(self): + if os.path.exists(self.token_path): + try: + with open(self.token_path, "r") as tok: + token = json.loads(tok.read()) + self.access_token = token.get("access_token") + except: + pass + + def sanitize_filename(self, filename): + if not filename: + return "Unknown Track" + sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename)) + return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track" + + async def get_access_token(self): + if self.access_token: + return self.access_token + + refresh_url = "https://auth.tidal.com/v1/oauth2/token" + + payload = { + "client_id": self.client_id, + "grant_type": "client_credentials", + } + + async with httpx.AsyncClient(http2=True) as client: + try: + response = await client.post( + url=refresh_url, + data=payload, + auth=(self.client_id, self.client_secret), + ) + + if response.status_code == 200: + token_data = response.json() + new_token = token_data.get("access_token") + + try: + with open(self.token_path, "w") as f: + json.dump({ + "access_token": new_token + }, f) + except: + pass + + self.access_token = new_token + return new_token + else: + return None + + except: + return None + + async def search_tracks(self, query): + try: + tidal_token = await self.get_access_token() + if not tidal_token: + raise Exception("Failed to get access token") + + search_url = f"https://api.tidal.com/v1/search/tracks?query={query}&limit=25&offset=0&countryCode=US" + header = {"authorization": f"Bearer {tidal_token}"} + + async with httpx.AsyncClient(http2=True) as client: + search_data = await client.get(url=search_url, headers=header) + response_data = search_data.json() + + filtered_items = [{ + "id": item.get("id"), + "title": item.get("title"), + "url": item.get("url"), + "isrc": item.get("isrc"), + "audioQuality": item.get("audioQuality"), + "mediaMetadata": item.get("mediaMetadata"), + "album": item.get("album", {}), + "artists": item.get("artists", []), + "artist": item.get("artist", {}), + "trackNumber": item.get("trackNumber"), + "volumeNumber": item.get("volumeNumber"), + "duration": item.get("duration"), + "copyright": item.get("copyright"), + "explicit": item.get("explicit") + } for item in response_data.get("items", [])] + + return { + "limit": response_data.get("limit"), + "offset": response_data.get("offset"), + "totalNumberOfItems": response_data.get("totalNumberOfItems"), + "items": filtered_items + } + + except Exception as e: + raise Exception(f"Search error: {str(e)}") + + async def get_track_info(self, query, isrc=None): + print(f"Fetching: {query}" + (f" (ISRC: {isrc})" if isrc else "")) + + try: + result = await self.search_tracks(query) + + if not result or not result.get("items"): + raise Exception(f"No tracks found for query: {query}") + + selected_track = None + if isrc: + isrc_items = [item for item in result["items"] if item.get("isrc") == isrc] + + if len(isrc_items) > 1: + hires_items = [] + for item in isrc_items: + media_metadata = item.get("mediaMetadata", {}) + tags = media_metadata.get("tags", []) if media_metadata else [] + if "HIRES_LOSSLESS" in tags: + hires_items.append(item) + + if hires_items: + selected_track = hires_items[0] + else: + selected_track = isrc_items[0] + elif len(isrc_items) == 1: + selected_track = isrc_items[0] + else: + selected_track = result["items"][0] + else: + selected_track = result["items"][0] + + if not selected_track: + raise Exception(f"Track not found: {query}" + (f" (ISRC: {isrc})" if isrc else "")) + + title = selected_track.get('title', 'Unknown') + quality = selected_track.get('audioQuality', 'Unknown') + print(f"Found: {title} ({quality})") + return selected_track + + except Exception as e: + raise Exception(f"Error getting track info: {str(e)}") + + async def get_download_url(self, track_id, quality="LOSSLESS"): + print("Fetching URL...") + download_api_url = f"https://hifi.401658.xyz/track/?id={track_id}&quality={quality}" + + async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client: + try: + response = await client.get(download_api_url) + + if response.status_code == 200: + data = response.json() + + for item in data: + if "OriginalTrackUrl" in item: + print("URL found") + return { + "download_url": item["OriginalTrackUrl"], + "track_info": data[0] if data else {} + } + + raise Exception("Download URL not found in response") + else: + raise Exception(f"API returned status code: {response.status_code}") + + except Exception as e: + raise Exception(f"Error getting download URL: {str(e)}") + + async def download_album_art(self, album_id, size="1280x1280"): + try: + art_url = f"https://resources.tidal.com/images/{album_id.replace('-', '/')}/{size}.jpg" + + async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client: + response = await client.get(art_url) + + if response.status_code == 200: + return response.content + else: + print(f"Failed to download album art: HTTP {response.status_code}") + return None + + except Exception as e: + print(f"Error downloading album art: {str(e)}") + return None + + async def download_file(self, url, filepath, is_paused_callback=None, is_stopped_callback=None): + temp_filepath = filepath + ".part" + retry_count = 0 + + while retry_count <= self.max_retries: + try: + async with httpx.AsyncClient(http2=True, timeout=60.0) as client: + async with client.stream('GET', url) as response: + if response.status_code != 200: + raise Exception(f"HTTP {response.status_code}") + + total_size = int(response.headers.get('content-length', 0)) + downloaded_size = 0 + start_time = time.time() + last_update_time = start_time + + with open(temp_filepath, 'wb') as f: + async for chunk in response.aiter_bytes(chunk_size=self.download_chunk_size): + if is_stopped_callback and is_stopped_callback(): + f.close() + if os.path.exists(temp_filepath): + os.remove(temp_filepath) + raise Exception("Download stopped") + + while is_paused_callback and is_paused_callback(): + await asyncio.sleep(0.1) + if is_stopped_callback and is_stopped_callback(): + f.close() + if os.path.exists(temp_filepath): + os.remove(temp_filepath) + raise Exception("Download stopped") + + f.write(chunk) + downloaded_size += len(chunk) + + current_time = time.time() + if current_time - last_update_time >= 1: + if total_size > 0: + progress_percent = (downloaded_size / total_size) * 100 + elapsed_time = current_time - start_time + speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0 + print(f"{progress_percent:.2f}% - {speed:.2f} MB/s") + else: + print(f"{downloaded_size / (1024 * 1024):.2f} MB") + + last_update_time = current_time + + if self.progress_callback: + self.progress_callback(downloaded_size, total_size) + + os.rename(temp_filepath, filepath) + print("Download complete") + return {"success": True, "size": downloaded_size} + + except Exception as e: + retry_count += 1 + if retry_count > self.max_retries: + if os.path.exists(temp_filepath): + try: + os.remove(temp_filepath) + except: + pass + raise Exception(f"Download error after {self.max_retries} retries: {str(e)}") + + print(f"Download error (attempt {retry_count}/{self.max_retries}): {str(e)}") + print(f"Retrying in {retry_count * 2} seconds...") + await asyncio.sleep(retry_count * 2) + + async def embed_metadata(self, filepath, track_info, search_info=None): + try: + print("Embedding metadata...") + audio = FLAC(filepath) + audio.clear() + audio.clear_pictures() + + if track_info.get("title"): + audio["TITLE"] = track_info["title"] + + artists_list = [] + if search_info and search_info.get("artists"): + for artist in search_info["artists"]: + if artist.get("name"): + artists_list.append(artist["name"]) + elif search_info and search_info.get("artist") and search_info["artist"].get("name"): + artists_list.append(search_info["artist"]["name"]) + elif track_info.get("artists"): + for artist in track_info["artists"]: + if artist.get("name"): + artists_list.append(artist["name"]) + elif track_info.get("artist") and track_info["artist"].get("name"): + artists_list.append(track_info["artist"]["name"]) + + if artists_list: + audio["ARTIST"] = artists_list[0] + if len(artists_list) > 1: + audio["ALBUMARTIST"] = "; ".join(artists_list) + else: + audio["ALBUMARTIST"] = artists_list[0] + + album_info = search_info.get("album", {}) if search_info else track_info.get("album", {}) + if album_info.get("title"): + audio["ALBUM"] = album_info["title"] + + if search_info and search_info.get("trackNumber"): + audio["TRACKNUMBER"] = str(search_info["trackNumber"]) + elif track_info.get("trackNumber"): + audio["TRACKNUMBER"] = str(track_info["trackNumber"]) + + if search_info and search_info.get("volumeNumber"): + audio["DISCNUMBER"] = str(search_info["volumeNumber"]) + elif track_info.get("volumeNumber"): + audio["DISCNUMBER"] = str(track_info["volumeNumber"]) + + duration = search_info.get("duration") if search_info else track_info.get("duration") + if duration: + audio["LENGTH"] = str(duration) + + isrc = search_info.get("isrc") if search_info else track_info.get("isrc") + if isrc: + audio["ISRC"] = isrc + + copyright_info = search_info.get("copyright") if search_info else track_info.get("copyright") + if copyright_info: + audio["COPYRIGHT"] = copyright_info + + if album_info.get("releaseDate"): + audio["DATE"] = album_info["releaseDate"][:4] + try: + audio["YEAR"] = album_info["releaseDate"][:4] + except: + pass + + if track_info.get("genre"): + audio["GENRE"] = track_info["genre"] + + if track_info.get("audioQuality"): + audio["COMMENT"] = f"Tidal {track_info['audioQuality']}" + + if album_info.get("cover"): + album_art = await self.download_album_art(album_info["cover"]) + if album_art: + picture = Picture() + picture.data = album_art + picture.type = PictureType.COVER_FRONT + picture.mime = "image/jpeg" + picture.desc = "Cover" + audio.add_picture(picture) + print("Album art embedded") + + audio.save() + print(f"Metadata embedded successfully for: {track_info.get('title', 'Unknown')}") + return True + + except Exception as e: + print(f"Error embedding metadata: {str(e)}") + return False + + async def download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", is_paused_callback=None, is_stopped_callback=None): + if output_dir != ".": + try: + os.makedirs(output_dir, exist_ok=True) + except OSError as e: + raise Exception(f"Directory error: {e}") + + track_info = await self.get_track_info(query, isrc) + track_id = track_info.get("id") + + if not track_id: + raise Exception("No track ID found") + + artists_list = [] + if track_info.get("artists"): + for artist in track_info["artists"]: + if artist.get("name"): + artists_list.append(artist["name"]) + elif track_info.get("artist") and track_info["artist"].get("name"): + artists_list.append(track_info["artist"]["name"]) + + artist_name = ", ".join(artists_list) if artists_list else "Unknown Artist" + artist_name = self.sanitize_filename(artist_name) + track_title = self.sanitize_filename(track_info.get("title", f"track_{track_id}")) + + output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac") + + if os.path.exists(output_filename): + file_size = os.path.getsize(output_filename) + if file_size > 0: + print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)") + return output_filename + + download_info = await self.get_download_url(track_id, quality) + download_url = download_info["download_url"] + download_track_info = download_info["track_info"] + + print(f"Downloading to: {output_filename}") + await self.download_file( + download_url, + output_filename, + is_paused_callback=is_paused_callback, + is_stopped_callback=is_stopped_callback + ) + + print("Adding metadata...") + try: + await self.embed_metadata(output_filename, download_track_info, track_info) + print("Metadata saved") + except Exception as e: + print(f"Tagging failed: {e}") + + print("Done") + return output_filename + +async def main(): + print("=== TidalDL - Tidal Downloader ===") + downloader = TidalDownloader(timeout=30, max_retries=3) + + query = "APT." + isrc = "USAT22409172" + output_dir = "." + + try: + downloaded_file = await downloader.download(query, isrc, output_dir) + print(f"Success: File saved as {downloaded_file}") + except Exception as e: + print(f"Error: {str(e)}") + +if __name__ == "__main__": + try: + import sys + if sys.platform == "win32": + import os + os.system("chcp 65001 > nul") + try: + sys.stdout.reconfigure(encoding='utf-8') + except: + pass + except: + pass + + asyncio.run(main()) \ No newline at end of file