diff --git a/SpotiFLAC.py b/SpotiFLAC.py index 0365012..d7e0fe6 100644 --- a/SpotiFLAC.py +++ b/SpotiFLAC.py @@ -23,6 +23,7 @@ from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException from qobuzDL import QobuzDownloader from tidalDL import TidalDownloader from deezerDL import DeezerDownloader +from amazonDL import LucidaDownloader @dataclass class Track: @@ -96,6 +97,8 @@ class DownloadWorker(QThread): downloader = TidalDownloader() elif self.service == "deezer": downloader = DeezerDownloader() + elif self.service == "amazon": + downloader = LucidaDownloader() else: downloader = TidalDownloader() @@ -223,6 +226,21 @@ class DownloadWorker(QThread): raise Exception("Downloaded file not found") else: raise Exception("Deezer download failed") + elif self.service == "amazon": + self.progress.emit(f"Downloading from Amazon Music: {track.title} - {track.artists}", 0) + + is_paused_callback = lambda: self.is_paused + is_stopped_callback = lambda: self.is_stopped + + downloaded_file = downloader.download( + track.id, + track_outpath, + is_paused_callback=is_paused_callback, + is_stopped_callback=is_stopped_callback + ) + + if not downloaded_file or not os.path.exists(downloaded_file): + raise Exception("Amazon Music download failed") else: track_id = track.id self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0) @@ -368,6 +386,19 @@ class DeezerStatusChecker(QThread): self.error.emit(f"Error checking Deezer status: {str(e)}") self.status_updated.emit(False) +class AmazonStatusChecker(QThread): + status_updated = pyqtSignal(bool) + error = pyqtSignal(str) + + def run(self): + try: + response = requests.get("https://lucida.to/api/load?url=%2Fapi%2Fcountries%3Fservice%3Damazon", timeout=5) + is_online = response.status_code == 200 + self.status_updated.emit(is_online) + except Exception as e: + self.error.emit(f"Error checking Amazon Music status: {str(e)}") + self.status_updated.emit(False) + class StatusIndicatorDelegate(QStyledItemDelegate): def paint(self, painter, option, index): item_data = index.data(Qt.ItemDataRole.UserRole) @@ -412,7 +443,16 @@ class ServiceComboBox(QComboBox): self.deezer_status_timer = QTimer(self) self.deezer_status_timer.timeout.connect(self.refresh_deezer_status) - self.deezer_status_timer.start(60000) + self.deezer_status_timer.start(60000) + + self.amazon_status_checker = AmazonStatusChecker() + self.amazon_status_checker.status_updated.connect(self.update_amazon_service_status) + self.amazon_status_checker.error.connect(lambda e: print(f"Amazon Music status check error: {e}")) + self.amazon_status_checker.start() + + self.amazon_status_timer = QTimer(self) + self.amazon_status_timer.timeout.connect(self.refresh_amazon_status) + self.amazon_status_timer.start(60000) def setup_items(self): current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -420,7 +460,8 @@ class ServiceComboBox(QComboBox): self.services = [ {'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False}, {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False}, - {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False} + {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False}, + {'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False} ] for service in self.services: @@ -476,6 +517,19 @@ class ServiceComboBox(QComboBox): self.deezer_status_checker.error.connect(lambda e: print(f"Deezer status check error: {e}")) self.deezer_status_checker.start() + def update_amazon_service_status(self, is_online): + self.update_service_status('amazon', is_online) + + def refresh_amazon_status(self): + if hasattr(self, 'amazon_status_checker') and self.amazon_status_checker.isRunning(): + self.amazon_status_checker.quit() + self.amazon_status_checker.wait() + + self.amazon_status_checker = AmazonStatusChecker() + self.amazon_status_checker.status_updated.connect(self.update_amazon_service_status) + self.amazon_status_checker.error.connect(lambda e: print(f"Amazon Music status check error: {e}")) + self.amazon_status_checker.start() + def currentData(self, role=Qt.ItemDataRole.UserRole + 1): return super().currentData(role) @@ -573,7 +627,7 @@ class QobuzRegionComboBox(QComboBox): class SpotiFLACGUI(QWidget): def __init__(self): super().__init__() - self.current_version = "4.3" + self.current_version = "4.4" self.tracks = [] self.all_tracks = [] self.reset_state() diff --git a/amazon.png b/amazon.png new file mode 100644 index 0000000..c0c593b Binary files /dev/null and b/amazon.png differ diff --git a/amazonDL.py b/amazonDL.py new file mode 100644 index 0000000..c07448b --- /dev/null +++ b/amazonDL.py @@ -0,0 +1,123 @@ +import requests +import time +import os +import re +import base64 +import urllib3 +from urllib.parse import unquote + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +def extract_data(html, patterns): + for pattern in patterns: + if match := re.search(pattern, html): + return match.group(1) + return None + +def download_track(track_id, service="amazon", output_dir="."): + client = requests.Session() + client.verify = False + headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} + + try: + spotify_url = f"https://open.spotify.com/track/{track_id}" + params = {"url": spotify_url, "country": "auto", "to": service} + + response = client.get("https://lucida.to", params=params, headers=headers, timeout=30) + html = response.text + + token = extract_data(html, [r'token:"([^"]+)"', r'"token"\s*:\s*"([^"]+)"']) + url = extract_data(html, [r'"url":"([^"]+)"', r'url:"([^"]+)"']) + expiry = extract_data(html, [r'tokenExpiry:(\d+)', r'"tokenExpiry"\s*:\s*(\d+)']) + + if not (token and url): + raise Exception("Could not extract required data") + + try: + decoded_token = base64.b64decode(base64.b64decode(token).decode('latin1')).decode('latin1') + except: + decoded_token = token + + clean_url = url.replace('\\/', '/') + print(f"Starting download for: {clean_url}") + + request_data = { + "account": {"id": "auto", "type": "country"}, + "compat": "false", "downscale": "original", "handoff": True, + "metadata": True, "private": True, + "token": {"primary": decoded_token, "expiry": int(expiry) if expiry else None}, + "upload": {"enabled": False, "service": "pixeldrain"}, + "url": clean_url + } + + response = client.post("https://lucida.to/api/load?url=/api/fetch/stream/v2", + json=request_data, headers=headers) + + if csrf_token := response.cookies.get('csrf_token'): + headers['X-CSRF-Token'] = csrf_token + + data = response.json() + if not data.get("success"): + raise Exception(f"Request failed: {data.get('error', 'Unknown error')}") + + completion_url = f"https://{data['server']}.lucida.to/api/fetch/request/{data['handoff']}" + print("Processing track...") + + while True: + resp = client.get(completion_url, headers=headers).json() + if resp["status"] == "completed": + print("Processing completed!") + break + elif resp["status"] == "error": + raise Exception(f"Processing failed: {resp.get('message', 'Unknown error')}") + elif progress := resp.get("progress"): + percent = int((progress.get("current", 0) / progress.get("total", 100)) * 100) + print(f"Progress: {percent}%") + time.sleep(1) + + download_url = f"https://{data['server']}.lucida.to/api/fetch/request/{data['handoff']}/download" + response = client.get(download_url, stream=True, headers=headers) + + file_name = "track.flac" + if content_disp := response.headers.get('content-disposition'): + if match := re.search(r'filename[*]?=([^;]+)', content_disp): + raw_name = match.group(1).strip('"\'') + file_name = unquote(raw_name[7:] if raw_name.startswith("UTF-8''") else raw_name) + for char in '<>:"/\\|?*': + file_name = file_name.replace(char, '') + file_name = file_name.strip() + + file_path = os.path.join(output_dir, file_name) + print(f"Downloading: {file_name}") + + with open(file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + print(f"Download completed: {file_path}") + return file_path + + except Exception as e: + print(f"Error: {str(e)}") + return None + +class LucidaDownloader: + def __init__(self): + self.progress_callback = None + + def set_progress_callback(self, callback): + self.progress_callback = callback + + def download(self, track_id, output_dir, is_paused_callback=None, is_stopped_callback=None): + """Download track using Lucida service""" + try: + return download_track(track_id, service="amazon", output_dir=output_dir) + except Exception as e: + raise Exception(f"Amazon Music download failed: {str(e)}") + +if __name__ == "__main__": + track_id = "2plbrEY59IikOBgBGLjaoe" + service = "amazon" + + download_track(track_id, service) \ No newline at end of file