Files
SpotiFLAC/getTracks.py
T
afkarxyz de798e4807 v3.4
2025-06-23 11:52:19 +07:00

1076 lines
46 KiB
Python

import requests
import time
import os
import asyncio
import re
import base64
import json
import tempfile
import httpx
import aiofiles
from datetime import datetime
from mutagen.flac import FLAC, Picture
from mutagen.id3 import PictureType
class ProgressCallback:
def __call__(self, current, total):
if total > 0:
percent = (current / total) * 100
print(f"\r{percent:.2f}% ({current}/{total})", end="")
else:
print(f"\r{current / (1024 * 1024):.2f} MB", end="")
class LucidaDownloader:
def __init__(self, timeout=30):
self.client = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.progress_callback = ProgressCallback()
self.timeout = timeout
self.base_domain = "lucida.to"
def set_progress_callback(self, callback):
self.progress_callback = callback
def generate_filename(self, track_id, service):
return f"{track_id}_{service}.flac"
async def get_track_info(self, track_id, service="tidal"):
if service not in ["tidal", "amazon", "deezer"]:
raise ValueError("Service must be one of 'tidal', 'amazon', or 'deezer'")
spotify_url = f"https://open.spotify.com/track/{track_id}"
result = self._convert_spotify_link(spotify_url, service)
if "error" in result:
raise Exception(f"Error: {result['error']}")
result["track_id"] = track_id
return result
def _convert_spotify_link(self, spotify_url, target_service="tidal"):
track_id_match = re.search(r'track/([a-zA-Z0-9]+)', spotify_url)
if not track_id_match:
return {"error": "Invalid Spotify URL"}
base_url = f"https://{self.base_domain}"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "id-ID,id;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": self.base_domain,
"Pragma": "no-cache",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
try:
headers["Referer"] = f"{base_url}/?url={spotify_url}&country=auto"
request_params = {
"url": spotify_url,
"country": "auto",
"to": target_service
}
session = requests.Session()
session.verify = True
response = session.get(
base_url,
params=request_params,
headers=headers,
timeout=self.timeout
)
html_content = response.text
token_match = re.search(r'token:"([^"]+)"', html_content)
token_expiry_match = re.search(r'tokenExpiry:(\d+)', html_content)
token = token_match.group(1) if token_match else None
token_expiry = int(token_expiry_match.group(1)) if token_expiry_match else None
url = None
url_patterns = [
r'"url":"([^"]+)"',
r'href="(https?://[^"]*' + re.escape(target_service) + r'[^"]*track[^"]*)"',
]
for pattern in url_patterns:
url_match = re.search(pattern, html_content)
if url_match:
url = url_match.group(1).replace('\\/', '/')
break
if not url:
redirect_patterns = [
r'url=([^&"]+)',
r'href="([^"]+)"',
r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]',
]
for pattern in redirect_patterns:
matches = re.finditer(pattern, html_content)
for match in matches:
potential_url = match.group(1)
if potential_url.startswith('http') and target_service.lower() in potential_url.lower():
url = potential_url.replace('\\/', '/')
break
if not url:
service_urls = re.finditer(r'(https?://[^"\s]+' + re.escape(target_service) + r'[^"\s]+)', html_content)
for match in service_urls:
url = match.group(1).replace('\\/', '/')
break
result = {
"service": target_service,
"url": url,
"token": {
"primary": None,
"expiry": None
}
}
if token:
try:
decoded_once = base64.b64decode(token).decode('latin1')
decoded_token = base64.b64decode(decoded_once).decode('latin1')
result["token"]["primary"] = decoded_token
except Exception:
result["token"]["primary"] = token
result["token"]["expiry"] = token_expiry
return result
except Exception as error:
return {"error": str(error)}
def download(self, metadata, output_dir=".", is_paused_callback=None, is_stopped_callback=None):
track_url = metadata['url']
primary_token = metadata['token']['primary']
expiry = metadata['token']['expiry']
track_id = metadata['track_id']
service = metadata['service']
print(f"Starting download: track ID {track_id}")
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped")
file_name = self.generate_filename(track_id, service)
file_path = os.path.join(output_dir, file_name)
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
if file_size > 0:
print(f"File already exists: {file_path} ({file_size / (1024 * 1024):.2f} MB)")
return file_path
initial_request = {
"account": {"id": "auto", "type": "country"},
"compat": "false",
"downscale": "original",
"handoff": True,
"metadata": True,
"private": True,
"token": {
"expiry": expiry,
"primary": primary_token
},
"upload": {"enabled": False, "service": "pixeldrain"},
"url": track_url
}
response = self.client.post(f"https://{self.base_domain}/api/load?url=/api/fetch/stream/v2",
json=initial_request,
headers=self.headers)
csrf_token = response.cookies.get('csrf_token')
if csrf_token:
self.headers['X-CSRF-Token'] = csrf_token
initial_response = response.json()
if not initial_response.get("success", False):
raise Exception(f"Request failed: {initial_response.get('error', 'Unknown error')}")
handoff = initial_response["handoff"]
server = initial_response["server"]
file_name = self.generate_filename(track_id, service)
completion_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}"
print("Waiting for processing...")
while True:
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped")
completion_response = self.client.get(completion_url, headers=self.headers).json()
status = completion_response["status"]
if status == "completed":
print("Processing: 100%")
break
elif status == "error":
raise Exception(f"API error: {completion_response.get('message', 'Unknown error')}")
else:
progress = completion_response.get("progress", {})
if progress:
current = progress.get("current", 0)
total = progress.get("total", 100)
percent = int((current / total) * 100) if total > 0 else 0
action = progress.get("action", "Processing")
print(f"{percent}% - {action}")
if action.lower() == "metadata":
if self.progress_callback:
self.progress_callback(0, 0)
else:
print(f"Status: {status}")
if status.lower() == "metadata":
if self.progress_callback:
self.progress_callback(0, 0)
time.sleep(1)
download_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}/download"
print(f"Downloading file...")
response = self.client.get(download_url, stream=True, headers=self.headers)
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
file_path = os.path.join(output_dir, file_name)
try:
with open(file_path, 'wb') as file:
start_time = time.time()
last_update_time = start_time
for chunk in response.iter_content(chunk_size=8192):
if is_stopped_callback and is_stopped_callback():
file.close()
if os.path.exists(file_path):
os.remove(file_path)
raise Exception("Download stopped")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
file.close()
if os.path.exists(file_path):
os.remove(file_path)
raise Exception("Download stopped")
if chunk:
file.write(chunk)
downloaded_size += len(chunk)
current_time = time.time()
if current_time - last_update_time >= 1:
if total_size > 0:
progress_percent = (downloaded_size / total_size) * 100
elapsed_time = current_time - start_time
speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0
print(f"{progress_percent:.2f}% - {speed:.2f} MB/s")
else:
print(f"{downloaded_size / (1024 * 1024):.2f} MB")
last_update_time = current_time
if self.progress_callback:
self.progress_callback(downloaded_size, total_size)
if downloaded_size == 0:
raise Exception("No data received")
print(f"Complete. File saved: {file_path}")
return file_path
except Exception as e:
if os.path.exists(file_path) and os.path.getsize(file_path) == 0:
try:
os.remove(file_path)
except:
pass
raise e
class SquidWTFDownloader:
def __init__(self, region="us", timeout=30):
if region not in ["eu", "us"]:
raise ValueError("Region must be either 'us' or 'eu'")
self.region = region
self.timeout = timeout
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.base_api_url = f"https://{region}.qobuz.squid.wtf/api"
self.download_chunk_size = 256 * 1024
self.progress_callback = ProgressCallback()
def set_progress_callback(self, callback):
self.progress_callback = callback
def sanitize_filename(self, filename):
if not filename:
return "Unknown Track"
sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename))
return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track"
def get_track_info(self, isrc):
print(f"Fetching: {isrc}")
search_url = f"{self.base_api_url}/get-music"
params = {'q': isrc, 'offset': 0, 'limit': 10}
try:
response = self.session.get(search_url, params=params, timeout=self.timeout)
response.raise_for_status()
data = response.json()
selected_track = None
if data and data.get("success"):
items = data.get("data", {}).get("tracks", {}).get("items", [])
priority = {24: 1, 16: 2}
for track in items:
track_isrc = track.get("isrc", "").upper()
search_isrc = isrc.upper()
if track_isrc == search_isrc:
current_prio = priority.get(track.get("maximum_bit_depth"), 3)
if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3):
selected_track = track
if current_prio == 1:
break
if not selected_track:
raise Exception(f"Track not found: {isrc}")
title = selected_track.get('title', 'Unknown')
bit_depth = selected_track.get('maximum_bit_depth', 'Unknown')
print(f"Found: {title} ({bit_depth}b)")
return selected_track
except requests.exceptions.RequestException as e:
raise Exception(f"Request error: {e}")
except Exception as e:
raise Exception(f"Error: {e}")
def get_download_url(self, track_id):
print("Fetching URL...")
download_api_url = f"{self.base_api_url}/download-music"
params = {'track_id': track_id, 'quality': 27}
try:
response = self.session.get(download_api_url, params=params, timeout=self.timeout)
response.raise_for_status()
data = response.json()
if data and data.get("success") and data.get("data", {}).get("url"):
download_url = data["data"]["url"]
print("URL found")
return download_url
else:
error_msg = data.get('error', {}).get('message', 'Unknown API error')
raise Exception(f"API error: {error_msg}")
except requests.exceptions.RequestException as e:
raise Exception(f"Request error: {e}")
except Exception as e:
raise Exception(f"Error: {e}")
def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None):
if output_dir != ".":
try:
os.makedirs(output_dir, exist_ok=True)
except OSError as e:
raise Exception(f"Directory error: {e}")
track_info = self.get_track_info(isrc)
track_id = track_info.get("id")
if not track_id:
raise Exception("No track ID found")
artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name'))
track_title = self.sanitize_filename(track_info.get('title'))
output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac")
if os.path.exists(output_filename):
file_size = os.path.getsize(output_filename)
if file_size > 0:
print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)")
return output_filename
download_url = self.get_download_url(track_id)
temp_filename = output_filename + ".part"
print(f"Downloading...")
try:
with self.session.get(download_url, stream=True, timeout=900) as response, \
open(temp_filename, 'wb') as f:
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
start_time = time.time()
last_update_time = start_time
for chunk in response.iter_content(chunk_size=self.download_chunk_size):
if is_stopped_callback and is_stopped_callback():
f.close()
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception("Download stopped")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
f.close()
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception("Download stopped")
f.write(chunk)
downloaded_size += len(chunk)
current_time = time.time()
if current_time - last_update_time >= 1:
if total_size > 0:
progress_percent = (downloaded_size / total_size) * 100
elapsed_time = current_time - start_time
speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0
print(f"{progress_percent:.2f}% - {speed:.2f} MB/s")
else:
print(f"{downloaded_size / (1024 * 1024):.2f} MB")
last_update_time = current_time
if self.progress_callback:
self.progress_callback(downloaded_size, total_size)
os.rename(temp_filename, output_filename)
print("Download complete")
except requests.exceptions.RequestException as e:
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception(f"Download failed: {e}")
except Exception as e:
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception(f"File error: {e}")
print("Adding metadata...")
try:
self._embed_metadata(output_filename, track_info)
print("Metadata saved")
except Exception as e:
print(f"Tagging failed: {e}")
print(f"Done")
return output_filename
def _embed_metadata(self, filename, track_info):
try:
audio = FLAC(filename)
audio.delete()
audio.clear_pictures()
album_info = track_info.get('album', {})
artist = track_info.get('performer', {}).get('name')
if track_info.get('title'):
audio['TITLE'] = track_info['title']
if artist:
audio['ARTIST'] = artist
if album_info.get('title'):
audio['ALBUM'] = album_info['title']
if album_info.get('artist', {}).get('name', artist):
audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist)
if track_info.get('track_number'):
audio['TRACKNUMBER'] = str(track_info['track_number'])
if track_info.get('release_date_original'):
audio['DATE'] = track_info['release_date_original']
try:
audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year)
except ValueError:
pass
if album_info.get('genre', {}).get('name'):
audio['GENRE'] = album_info['genre']['name']
if track_info.get('copyright'):
audio['COPYRIGHT'] = track_info['copyright']
if track_info.get('isrc'):
audio['ISRC'] = track_info['isrc']
if album_info.get('label', {}).get('name'):
audio['ORGANIZATION'] = album_info['label']['name']
img_info = album_info.get('image', {})
cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail')
if cover_url:
try:
img_response = self.session.get(cover_url, timeout=30)
img_response.raise_for_status()
mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower()
if mime_type in ['image/jpeg', 'image/png']:
picture = Picture()
picture.data = img_response.content
picture.type = PictureType.COVER_FRONT
picture.mime = mime_type
audio.add_picture(picture)
print("Cover added")
except Exception as e:
print(f"Cover error: {str(e)}")
audio.save()
except Exception as e:
raise Exception(f"Metadata error: {e}")
class TidalDownloader:
def __init__(self, client_id="zU4XHVVkc2tDPo4t", client_secret="VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4=", timeout=30):
self.client_id = client_id
self.client_secret = client_secret
self.timeout = timeout
self.progress_callback = ProgressCallback()
self.temp_dir = tempfile.gettempdir()
self.token_path = os.path.join(self.temp_dir, "tidal_token.json")
self.access_token = None
if os.path.exists(self.token_path):
try:
with open(self.token_path, "r") as tok:
token = json.loads(tok.read())
self.access_token = token.get("access_token")
except:
pass
def set_progress_callback(self, callback):
self.progress_callback = callback
async def get_access_token(self):
refresh_url = "https://auth.tidal.com/v1/oauth2/token"
payload = {
"client_id": self.client_id,
"grant_type": "client_credentials",
}
async with httpx.AsyncClient(http2=True) as client:
try:
response = await client.post(
url=refresh_url,
data=payload,
auth=(self.client_id, self.client_secret),
)
if response.status_code == 200:
token_data = response.json()
new_token = token_data.get("access_token")
try:
with open(self.token_path, "w") as f:
json.dump({
"access_token": new_token
}, f)
except:
pass
self.access_token = new_token
return new_token
return None
except:
return None
async def search_tracks(self, query):
try:
tidal_token = self.access_token or await self.get_access_token()
if not tidal_token:
return {"error": "Failed to get access token"}
search_url = f"https://api.tidal.com/v1/search/tracks?query={query}&limit=25&offset=0&countryCode=US"
header = {"authorization": f"Bearer {tidal_token}"}
async with httpx.AsyncClient(http2=True) as client:
search_data = await client.get(url=search_url, headers=header)
response_data = search_data.json()
filtered_items = [{
"id": item.get("id"),
"title": item.get("title"),
"url": item.get("url"),
"isrc": item.get("isrc"),
"audioQuality": item.get("audioQuality"),
"mediaMetadata": item.get("mediaMetadata"),
"album": item.get("album", {}),
"artists": item.get("artists", []),
"artist": item.get("artist", {}),
"trackNumber": item.get("trackNumber"),
"volumeNumber": item.get("volumeNumber"),
"duration": item.get("duration"),
"copyright": item.get("copyright"),
"explicit": item.get("explicit")
} for item in response_data.get("items", [])]
return {
"limit": response_data.get("limit"),
"offset": response_data.get("offset"),
"totalNumberOfItems": response_data.get("totalNumberOfItems"),
"items": filtered_items
}
except Exception as e:
return {"error": f"Error: {str(e)}"}
async def filter_by_isrc(self, query, isrc=None):
try:
result = await self.search_tracks(query)
if "error" in result:
return result
if isrc:
isrc_items = [item for item in result["items"] if item.get("isrc", "").upper() == isrc.upper()]
if len(isrc_items) > 1:
hires_items = []
for item in isrc_items:
media_metadata = item.get("mediaMetadata", {})
tags = media_metadata.get("tags", []) if media_metadata else []
if "HIRES_LOSSLESS" in tags:
hires_items.append(item)
if hires_items:
result["items"] = hires_items
else:
result["items"] = isrc_items
else:
result["items"] = isrc_items
result["totalNumberOfItems"] = len(result["items"])
return result
except Exception as e:
return {"error": f"Error: {str(e)}"}
async def get_track_download_info(self, track_id, quality="LOSSLESS"):
try:
download_api_url = f"https://hifi.401658.xyz/track/?id={track_id}&quality={quality}"
async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client:
response = await client.get(download_api_url)
if response.status_code == 200:
data = response.json()
for item in data:
if "OriginalTrackUrl" in item:
return {
"success": True,
"download_url": item["OriginalTrackUrl"],
"track_info": data[0] if data else {}
}
return {"success": False, "error": "OriginalTrackUrl not found in response"}
else:
return {"success": False, "error": f"API returned status code: {response.status_code}"}
except Exception as e:
return {"success": False, "error": f"Error getting download info: {str(e)}"}
async def download_album_art(self, album_id, size="1280x1280"):
try:
art_url = f"https://resources.tidal.com/images/{album_id.replace('-', '/')}/{size}.jpg"
async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client:
response = await client.get(art_url)
if response.status_code == 200:
return response.content
else:
print(f"Failed to download album art: HTTP {response.status_code}")
return None
except Exception as e:
print(f"Error downloading album art: {str(e)}")
return None
async def embed_metadata(self, filepath, track_info, search_info=None):
try:
audio = FLAC(filepath)
audio.clear()
if track_info.get("title"):
audio["TITLE"] = track_info["title"]
artists_list = []
if search_info and search_info.get("artists"):
for artist in search_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif search_info and search_info.get("artist") and search_info["artist"].get("name"):
artists_list.append(search_info["artist"]["name"])
elif track_info.get("artists"):
for artist in track_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif track_info.get("artist") and track_info["artist"].get("name"):
artists_list.append(track_info["artist"]["name"])
if artists_list:
audio["ARTIST"] = artists_list[0]
if len(artists_list) > 1:
audio["ALBUMARTIST"] = "; ".join(artists_list)
else:
audio["ALBUMARTIST"] = artists_list[0]
album_info = search_info.get("album", {}) if search_info else track_info.get("album", {})
if album_info.get("title"):
audio["ALBUM"] = album_info["title"]
if search_info and search_info.get("trackNumber"):
audio["TRACKNUMBER"] = str(search_info["trackNumber"])
elif track_info.get("trackNumber"):
audio["TRACKNUMBER"] = str(track_info["trackNumber"])
if search_info and search_info.get("volumeNumber"):
audio["DISCNUMBER"] = str(search_info["volumeNumber"])
elif track_info.get("volumeNumber"):
audio["DISCNUMBER"] = str(track_info["volumeNumber"])
isrc = search_info.get("isrc") if search_info else track_info.get("isrc")
if isrc:
audio["ISRC"] = isrc
copyright_info = search_info.get("copyright") if search_info else track_info.get("copyright")
if copyright_info:
audio["COPYRIGHT"] = copyright_info
release_date = None
if search_info and search_info.get("streamStartDate"):
release_date = search_info["streamStartDate"]
elif track_info.get("streamStartDate"):
release_date = track_info["streamStartDate"]
if release_date:
if "T" in release_date:
date_part = release_date.split("T")[0]
audio["DATE"] = date_part
else:
audio["DATE"] = release_date
if track_info.get("genre"):
audio["GENRE"] = track_info["genre"]
if album_info.get("cover"):
album_art = await self.download_album_art(album_info["cover"])
if album_art:
picture = Picture()
picture.data = album_art
picture.type = PictureType.COVER_FRONT
picture.mime = "image/jpeg"
picture.desc = "Cover"
audio.add_picture(picture)
print("Album art embedded successfully")
audio.save()
print(f"Metadata embedded successfully for: {track_info.get('title', 'Unknown')}")
return True
except Exception as e:
print(f"Error embedding metadata: {str(e)}")
return False
async def download_file(self, url, filename, max_retries=3, is_paused_callback=None, is_stopped_callback=None):
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(http2=True, timeout=60.0) as client:
async with client.stream('GET', url) as response:
if response.status_code == 200:
total_size_in_bytes = int(response.headers.get('content-length', 0))
bytes_downloaded = 0
async with aiofiles.open(filename, 'wb') as f:
async for chunk in response.aiter_bytes(chunk_size=8192):
if is_stopped_callback and is_stopped_callback():
print("\\nDownload stopped.")
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as e:
print(f"Error removing partial file: {e}")
return {"success": False, "error": "Download stopped by user"}
while is_paused_callback and is_paused_callback():
print("\\nDownload paused. Waiting...")
await asyncio.sleep(1)
await f.write(chunk)
bytes_downloaded += len(chunk)
if total_size_in_bytes > 0:
if self.progress_callback:
self.progress_callback(bytes_downloaded, total_size_in_bytes)
if total_size_in_bytes > 0 and bytes_downloaded == total_size_in_bytes:
print()
print(f"Successfully downloaded: {filename} ({bytes_downloaded} bytes)")
return {"success": True, "size": bytes_downloaded}
else:
print(f"\\nFailed to download {filename}. HTTP Status: {response.status_code}")
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as e:
print(f"Error removing partial file after server error: {e}")
return {"success": False, "error": f"HTTP {response.status_code}"}
except Exception as e:
print()
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as ose:
print(f"Error removing partial file after exception: {ose}")
if attempt < max_retries - 1:
print(f"Download attempt {attempt + 1} failed, retrying...")
await asyncio.sleep(2)
else:
return {"success": False, "error": f"Download failed after {max_retries} attempts: {str(e)}"}
async def download_track(self, track_ids, search_results, output_dir=".", quality="LOSSLESS", embed_meta=True, is_paused_callback=None, is_stopped_callback=None):
if not isinstance(track_ids, list):
track_ids = [track_ids]
if output_dir != ".":
os.makedirs(output_dir, exist_ok=True)
search_map = {}
if search_results and search_results.get("items"):
for item in search_results["items"]:
search_map[item["id"]] = item
all_skipped = True
skipped_files = []
for i, track_id in enumerate(track_ids):
download_info = await self.get_track_download_info(track_id, quality)
if not download_info["success"]:
print(f"Failed to get download info for track {track_id}: {download_info['error']}")
continue
download_url = download_info["download_url"]
track_info = download_info["track_info"]
search_info = search_map.get(track_id)
title = track_info.get("title", f"track_{track_id}")
artists_list = []
if search_info and search_info.get("artists"):
for artist in search_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif search_info and search_info.get("artist") and search_info["artist"].get("name"):
artists_list.append(search_info["artist"]["name"])
elif track_info.get("artists"):
for artist in track_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif track_info.get("artist") and track_info["artist"].get("name"):
artists_list.append(track_info["artist"]["name"])
artist_names = ", ".join(artists_list) if artists_list else ""
safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).rstrip()
safe_artists = "".join(c for c in artist_names if c.isalnum() or c in (' ', '-', '_', ',', '.')).rstrip()
if safe_artists:
filename = f"{safe_title} - {safe_artists}.flac"
else:
filename = f"{safe_title}.flac"
filepath = os.path.join(output_dir, filename)
if os.path.exists(filepath):
print(f"File {filename} already exists. Skipping download.")
skipped_files.append(filename)
if len(track_ids) == 1:
return {
"success": True,
"status": "skipped_exists",
"track_id": track_id,
"filename": filename,
"filepath": filepath,
"message": f"File {filename} already exists."
}
continue
all_skipped = False
print(f"Downloading: {filename}")
download_result = await self.download_file(download_url, filepath, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
if download_result["success"]:
print(f"Successfully downloaded track {track_id}")
if embed_meta:
print("Embedding metadata...")
await self.embed_metadata(filepath, track_info, search_info)
return {
"success": True,
"track_id": track_id,
"filename": filename,
"filepath": filepath,
"size": download_result["size"],
"track_info": track_info,
"metadata_embedded": embed_meta
}
else:
print(f"Failed to download track {track_id}: {download_result['error']}")
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
pass
if download_result.get("error") == "Download stopped by user":
return {"success": False, "error": "Download stopped by user", "track_id": track_id}
if all_skipped and skipped_files:
return {
"success": True,
"status": "all_skipped",
"message": f"All files already exist: {', '.join(skipped_files)}"
}
return {"success": False, "error": "All track IDs failed to download or were stopped"}
async def search_and_download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None):
print(f"Searching for: {query}")
if isrc:
print(f"ISRC: {isrc}")
search_result = await self.filter_by_isrc(query, isrc)
if "error" in search_result:
print(f"Search error: {search_result['error']}")
return {"success": False, "error": search_result['error']}
raw_result = None
if isrc:
raw_result = await self.search_tracks(query)
if not search_result["items"]:
if isrc and raw_result and raw_result.get("items"):
print("No tracks found with ISRC filter, falling back to unfiltered search")
search_result = raw_result
else:
print("No tracks found")
return {"success": False, "error": "No tracks found"}
track_ids = [item["id"] for item in search_result["items"]]
print(f"Found {len(track_ids)} track(s): {track_ids}")
download_result = await self.download_track(track_ids, search_result, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
return download_result
async def download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None):
result = await self.search_and_download(query, isrc, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
if result["success"]:
if result.get("status") == "all_skipped":
print(f"Skipped: {result['message']}")
if "filepath" in result:
return result["filepath"]
return output_dir
elif result.get("status") == "skipped_exists":
print(f"Skipped: {result['message']}")
return result["filepath"]
else:
print("Download completed!")
return result["filepath"]
else:
print(f"Download failed: {result['error']}")
if result.get("error") == "Download stopped by user":
raise Exception("Download stopped by user")
raise Exception(result["error"])
def print_progress(current, total):
if total > 0:
percent = (current / total) * 100
print(f"\rProgress: {percent:.2f}% ({current}/{total})", end="")
else:
print(f"\rDownloaded: {current / (1024 * 1024):.2f} MB", end="")
async def main():
print("=== LucidaDownloader ===")
lucida = LucidaDownloader()
track_id = "2plbrEY59IikOBgBGLjaoe"
service = "tidal"
output_dir = "."
try:
print(f"Getting track: {track_id} from {service}")
metadata = await lucida.get_track_info(track_id, service)
print("Starting download")
downloaded_file = lucida.download(metadata, output_dir)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
print("\n\n=== SquidWTFDownloader ===")
squid = SquidWTFDownloader(region="us")
isrc = "USAT22409172"
output_dir = "."
try:
downloaded_file = squid.download(isrc, output_dir)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
print("\n\n=== TidalDownloader ===")
tidal = TidalDownloader()
query = "APT."
isrc = "USAT22409172"
output_dir = "."
try:
downloaded_file = await tidal.download(query, isrc, output_dir, quality="LOSSLESS", embed_metadata=True)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
try:
import sys
if sys.platform == "win32":
import os
os.system("chcp 65001 > nul")
try:
sys.stdout.reconfigure(encoding='utf-8')
except:
pass
except:
pass
asyncio.run(main())