diff --git a/Archived/LucidaDownloader.py b/Archived/LucidaDownloader.py new file mode 100644 index 0000000..6c8e402 --- /dev/null +++ b/Archived/LucidaDownloader.py @@ -0,0 +1,117 @@ +import requests +from tqdm import tqdm +import time +import os +import asyncio +from GetMetadata import main as get_metadata + +class TrackDownloader: + def __init__(self): + self.client = requests.Session() + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + + async def get_track_info(self): + metadata = await get_metadata() + return metadata + + def sanitize_filename(self, filename): + invalid_chars = '<>:"/\\|?*' + for char in invalid_chars: + filename = filename.replace(char, '') + + filename = ' '.join(filename.split()) + filename = filename.replace(' ,', ',') + filename = filename.replace(',', ', ').replace(' ', ' ') + + return filename.strip() + + def download(self, metadata, output_dir): + track_url = metadata['url'] + primary_token = metadata['token'] + expiry = metadata['expiry'] + + print(f"Starting download for: {track_url}") + + initial_request = { + "account": {"id": "auto", "type": "country"}, + "compat": "false", + "downscale": "original", + "handoff": True, + "metadata": True, + "private": True, + "token": { + "expiry": expiry, + "primary": primary_token + }, + "upload": {"enabled": False, "service": "pixeldrain"}, + "url": track_url + } + + response = self.client.post("https://lucida.to/api/load?url=/api/fetch/stream/v2", + json=initial_request, + headers=self.headers) + + csrf_token = response.cookies.get('csrf_token') + if csrf_token: + self.headers['X-CSRF-Token'] = csrf_token + + initial_response = response.json() + + if not initial_response.get("success", False): + raise Exception(f"Initial request failed: {initial_response.get('error', 'Unknown error')}") + + handoff = initial_response["handoff"] + server = initial_response["server"] + + file_name = f"{metadata['title']} - {metadata['artists']}.flac" + file_name = self.sanitize_filename(file_name) + + completion_url = f"https://{server}.lucida.to/api/fetch/request/{handoff}" + + print("Waiting for track processing to complete") + while True: + completion_response = self.client.get(completion_url, headers=self.headers).json() + if completion_response["status"] == "completed": + break + elif completion_response["status"] == "error": + raise Exception(f"API request failed: {completion_response.get('message', 'Unknown error')}") + time.sleep(1) + + download_url = f"https://{server}.lucida.to/api/fetch/request/{handoff}/download" + print(f"Starting download of: {file_name}") + + response = self.client.get(download_url, stream=True, headers=self.headers) + total_size = int(response.headers.get('content-length', 0)) + + file_path = os.path.join(output_dir, file_name) + + with open(file_path, 'wb') as file, tqdm( + desc=file_name, + total=total_size, + unit='iB', + unit_scale=True, + unit_divisor=1024, + ) as progress_bar: + for data in response.iter_content(chunk_size=1024): + size = file.write(data) + progress_bar.update(size) + + print(f"Download completed: {file_path}") + return file_path + +async def main(): + downloader = TrackDownloader() + output_dir = "." + + try: + metadata = await downloader.get_track_info() + + downloaded_file = downloader.download(metadata, output_dir) + print(f"File downloaded successfully: {downloaded_file}") + except Exception as e: + print(f"An error occurred: {str(e)}") + +if __name__ == "__main__": + asyncio.run(main())