123 lines
4.7 KiB
Python
123 lines
4.7 KiB
Python
import requests
|
|
import time
|
|
import os
|
|
import re
|
|
import base64
|
|
import urllib3
|
|
from urllib.parse import unquote
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
def extract_data(html, patterns):
|
|
for pattern in patterns:
|
|
if match := re.search(pattern, html):
|
|
return match.group(1)
|
|
return None
|
|
|
|
def download_track(track_id, service="amazon", output_dir="."):
|
|
client = requests.Session()
|
|
client.verify = False
|
|
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
|
|
|
|
try:
|
|
spotify_url = f"https://open.spotify.com/track/{track_id}"
|
|
params = {"url": spotify_url, "country": "auto", "to": service}
|
|
|
|
response = client.get("https://lucida.to", params=params, headers=headers, timeout=30)
|
|
html = response.text
|
|
|
|
token = extract_data(html, [r'token:"([^"]+)"', r'"token"\s*:\s*"([^"]+)"'])
|
|
url = extract_data(html, [r'"url":"([^"]+)"', r'url:"([^"]+)"'])
|
|
expiry = extract_data(html, [r'tokenExpiry:(\d+)', r'"tokenExpiry"\s*:\s*(\d+)'])
|
|
|
|
if not (token and url):
|
|
raise Exception("Could not extract required data")
|
|
|
|
try:
|
|
decoded_token = base64.b64decode(base64.b64decode(token).decode('latin1')).decode('latin1')
|
|
except:
|
|
decoded_token = token
|
|
|
|
clean_url = url.replace('\\/', '/')
|
|
print(f"Starting download for: {clean_url}")
|
|
|
|
request_data = {
|
|
"account": {"id": "auto", "type": "country"},
|
|
"compat": "false", "downscale": "original", "handoff": True,
|
|
"metadata": True, "private": True,
|
|
"token": {"primary": decoded_token, "expiry": int(expiry) if expiry else None},
|
|
"upload": {"enabled": False, "service": "pixeldrain"},
|
|
"url": clean_url
|
|
}
|
|
|
|
response = client.post("https://lucida.to/api/load?url=/api/fetch/stream/v2",
|
|
json=request_data, headers=headers)
|
|
|
|
if csrf_token := response.cookies.get('csrf_token'):
|
|
headers['X-CSRF-Token'] = csrf_token
|
|
|
|
data = response.json()
|
|
if not data.get("success"):
|
|
raise Exception(f"Request failed: {data.get('error', 'Unknown error')}")
|
|
|
|
completion_url = f"https://{data['server']}.lucida.to/api/fetch/request/{data['handoff']}"
|
|
print("Processing track...")
|
|
|
|
while True:
|
|
resp = client.get(completion_url, headers=headers).json()
|
|
if resp["status"] == "completed":
|
|
print("Processing completed!")
|
|
break
|
|
elif resp["status"] == "error":
|
|
raise Exception(f"Processing failed: {resp.get('message', 'Unknown error')}")
|
|
elif progress := resp.get("progress"):
|
|
percent = int((progress.get("current", 0) / progress.get("total", 100)) * 100)
|
|
print(f"Progress: {percent}%")
|
|
time.sleep(1)
|
|
|
|
download_url = f"https://{data['server']}.lucida.to/api/fetch/request/{data['handoff']}/download"
|
|
response = client.get(download_url, stream=True, headers=headers)
|
|
|
|
file_name = "track.flac"
|
|
if content_disp := response.headers.get('content-disposition'):
|
|
if match := re.search(r'filename[*]?=([^;]+)', content_disp):
|
|
raw_name = match.group(1).strip('"\'')
|
|
file_name = unquote(raw_name[7:] if raw_name.startswith("UTF-8''") else raw_name)
|
|
for char in '<>:"/\\|?*':
|
|
file_name = file_name.replace(char, '')
|
|
file_name = file_name.strip()
|
|
|
|
file_path = os.path.join(output_dir, file_name)
|
|
print(f"Downloading: {file_name}")
|
|
|
|
with open(file_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
print(f"Download completed: {file_path}")
|
|
return file_path
|
|
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
return None
|
|
|
|
class LucidaDownloader:
|
|
def __init__(self):
|
|
self.progress_callback = None
|
|
|
|
def set_progress_callback(self, callback):
|
|
self.progress_callback = callback
|
|
|
|
def download(self, track_id, output_dir, is_paused_callback=None, is_stopped_callback=None):
|
|
try:
|
|
return download_track(track_id, service="amazon", output_dir=output_dir)
|
|
except Exception as e:
|
|
raise Exception(f"Amazon Music download failed: {str(e)}")
|
|
|
|
if __name__ == "__main__":
|
|
track_id = "2plbrEY59IikOBgBGLjaoe"
|
|
service = "amazon"
|
|
|
|
download_track(track_id, service)
|