This commit is contained in:
afkarxyz
2025-05-23 16:43:45 +07:00
parent 46cb65665e
commit 4bc164cc56
2 changed files with 601 additions and 48 deletions
+475 -6
View File
@@ -4,6 +4,10 @@ import os
import asyncio
import re
import base64
import json
import tempfile
import httpx
import aiofiles
from datetime import datetime
from mutagen.flac import FLAC, Picture
from mutagen.id3 import PictureType
@@ -537,28 +541,480 @@ class SquidWTFDownloader:
except Exception as e:
raise Exception(f"Metadata error: {e}")
class TidalDownloader:
def __init__(self, client_id="zU4XHVVkc2tDPo4t", client_secret="VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4=", timeout=30):
self.client_id = client_id
self.client_secret = client_secret
self.timeout = timeout
self.progress_callback = ProgressCallback()
self.temp_dir = tempfile.gettempdir()
self.token_path = os.path.join(self.temp_dir, "tidal_token.json")
self.access_token = None
if os.path.exists(self.token_path):
try:
with open(self.token_path, "r") as tok:
token = json.loads(tok.read())
self.access_token = token.get("access_token")
except:
pass
def set_progress_callback(self, callback):
self.progress_callback = callback
async def get_access_token(self):
refresh_url = "https://auth.tidal.com/v1/oauth2/token"
payload = {
"client_id": self.client_id,
"grant_type": "client_credentials",
}
async with httpx.AsyncClient(http2=True) as client:
try:
response = await client.post(
url=refresh_url,
data=payload,
auth=(self.client_id, self.client_secret),
)
if response.status_code == 200:
token_data = response.json()
new_token = token_data.get("access_token")
try:
with open(self.token_path, "w") as f:
json.dump({
"access_token": new_token
}, f)
except:
pass
self.access_token = new_token
return new_token
return None
except:
return None
async def search_tracks(self, query):
try:
tidal_token = self.access_token or await self.get_access_token()
if not tidal_token:
return {"error": "Failed to get access token"}
search_url = f"https://api.tidal.com/v1/search/tracks?query={query}&limit=25&offset=0&countryCode=US"
header = {"authorization": f"Bearer {tidal_token}"}
async with httpx.AsyncClient(http2=True) as client:
search_data = await client.get(url=search_url, headers=header)
response_data = search_data.json()
filtered_items = [{
"id": item.get("id"),
"title": item.get("title"),
"url": item.get("url"),
"isrc": item.get("isrc"),
"audioQuality": item.get("audioQuality"),
"mediaMetadata": item.get("mediaMetadata"),
"album": item.get("album", {}),
"artists": item.get("artists", []),
"artist": item.get("artist", {}),
"trackNumber": item.get("trackNumber"),
"volumeNumber": item.get("volumeNumber"),
"duration": item.get("duration"),
"copyright": item.get("copyright"),
"explicit": item.get("explicit")
} for item in response_data.get("items", [])]
return {
"limit": response_data.get("limit"),
"offset": response_data.get("offset"),
"totalNumberOfItems": response_data.get("totalNumberOfItems"),
"items": filtered_items
}
except Exception as e:
return {"error": f"Error: {str(e)}"}
async def filter_by_isrc(self, query, isrc=None):
try:
result = await self.search_tracks(query)
if "error" in result:
return result
if isrc:
isrc_items = [item for item in result["items"] if item.get("isrc") == isrc]
if len(isrc_items) > 1:
hires_items = []
for item in isrc_items:
media_metadata = item.get("mediaMetadata", {})
tags = media_metadata.get("tags", []) if media_metadata else []
if "HIRES_LOSSLESS" in tags:
hires_items.append(item)
if hires_items:
result["items"] = hires_items
else:
result["items"] = isrc_items
else:
result["items"] = isrc_items
result["totalNumberOfItems"] = len(result["items"])
return result
except Exception as e:
return {"error": f"Error: {str(e)}"}
async def get_track_download_info(self, track_id, quality="LOSSLESS"):
try:
download_api_url = f"https://tidal.401658.xyz/track/?id={track_id}&quality={quality}"
async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client:
response = await client.get(download_api_url)
if response.status_code == 200:
data = response.json()
for item in data:
if "OriginalTrackUrl" in item:
return {
"success": True,
"download_url": item["OriginalTrackUrl"],
"track_info": data[0] if data else {}
}
return {"success": False, "error": "OriginalTrackUrl not found in response"}
else:
return {"success": False, "error": f"API returned status code: {response.status_code}"}
except Exception as e:
return {"success": False, "error": f"Error getting download info: {str(e)}"}
async def download_album_art(self, album_id, size="1280x1280"):
try:
art_url = f"https://resources.tidal.com/images/{album_id.replace('-', '/')}/{size}.jpg"
async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client:
response = await client.get(art_url)
if response.status_code == 200:
return response.content
else:
print(f"Failed to download album art: HTTP {response.status_code}")
return None
except Exception as e:
print(f"Error downloading album art: {str(e)}")
return None
async def embed_metadata(self, filepath, track_info, search_info=None):
try:
audio = FLAC(filepath)
audio.clear()
if track_info.get("title"):
audio["TITLE"] = track_info["title"]
artists_list = []
if search_info and search_info.get("artists"):
for artist in search_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif search_info and search_info.get("artist") and search_info["artist"].get("name"):
artists_list.append(search_info["artist"]["name"])
elif track_info.get("artists"):
for artist in track_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif track_info.get("artist") and track_info["artist"].get("name"):
artists_list.append(track_info["artist"]["name"])
if artists_list:
audio["ARTIST"] = artists_list[0]
if len(artists_list) > 1:
audio["ALBUMARTIST"] = "; ".join(artists_list)
else:
audio["ALBUMARTIST"] = artists_list[0]
album_info = search_info.get("album", {}) if search_info else track_info.get("album", {})
if album_info.get("title"):
audio["ALBUM"] = album_info["title"]
if search_info and search_info.get("trackNumber"):
audio["TRACKNUMBER"] = str(search_info["trackNumber"])
elif track_info.get("trackNumber"):
audio["TRACKNUMBER"] = str(track_info["trackNumber"])
if search_info and search_info.get("volumeNumber"):
audio["DISCNUMBER"] = str(search_info["volumeNumber"])
elif track_info.get("volumeNumber"):
audio["DISCNUMBER"] = str(track_info["volumeNumber"])
isrc = search_info.get("isrc") if search_info else track_info.get("isrc")
if isrc:
audio["ISRC"] = isrc
copyright_info = search_info.get("copyright") if search_info else track_info.get("copyright")
if copyright_info:
audio["COPYRIGHT"] = copyright_info
if album_info.get("releaseDate"):
audio["DATE"] = album_info["releaseDate"][:4]
if track_info.get("genre"):
audio["GENRE"] = track_info["genre"]
if album_info.get("cover"):
album_art = await self.download_album_art(album_info["cover"])
if album_art:
picture = Picture()
picture.data = album_art
picture.type = PictureType.COVER_FRONT
picture.mime = "image/jpeg"
picture.desc = "Cover"
audio.add_picture(picture)
print("Album art embedded successfully")
audio.save()
print(f"Metadata embedded successfully for: {track_info.get('title', 'Unknown')}")
return True
except Exception as e:
print(f"Error embedding metadata: {str(e)}")
return False
async def download_file(self, url, filename, max_retries=3, is_paused_callback=None, is_stopped_callback=None):
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(http2=True, timeout=60.0) as client:
async with client.stream('GET', url) as response:
if response.status_code == 200:
total_size_in_bytes = int(response.headers.get('content-length', 0))
bytes_downloaded = 0
async with aiofiles.open(filename, 'wb') as f:
async for chunk in response.aiter_bytes(chunk_size=8192):
if is_stopped_callback and is_stopped_callback():
print("\\nDownload stopped.")
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as e:
print(f"Error removing partial file: {e}")
return {"success": False, "error": "Download stopped by user"}
while is_paused_callback and is_paused_callback():
print("\\nDownload paused. Waiting...")
await asyncio.sleep(1)
await f.write(chunk)
bytes_downloaded += len(chunk)
if total_size_in_bytes > 0:
if self.progress_callback:
self.progress_callback(bytes_downloaded, total_size_in_bytes)
if total_size_in_bytes > 0 and bytes_downloaded == total_size_in_bytes:
print()
print(f"Successfully downloaded: {filename} ({bytes_downloaded} bytes)")
return {"success": True, "size": bytes_downloaded}
else:
print(f"\\nFailed to download {filename}. HTTP Status: {response.status_code}")
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as e:
print(f"Error removing partial file after server error: {e}")
return {"success": False, "error": f"HTTP {response.status_code}"}
except Exception as e:
print()
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as ose:
print(f"Error removing partial file after exception: {ose}")
if attempt < max_retries - 1:
print(f"Download attempt {attempt + 1} failed, retrying...")
await asyncio.sleep(2)
else:
return {"success": False, "error": f"Download failed after {max_retries} attempts: {str(e)}"}
async def download_track(self, track_ids, search_results, output_dir=".", quality="LOSSLESS", embed_meta=True, is_paused_callback=None, is_stopped_callback=None):
if not isinstance(track_ids, list):
track_ids = [track_ids]
if output_dir != ".":
os.makedirs(output_dir, exist_ok=True)
search_map = {}
if search_results and search_results.get("items"):
for item in search_results["items"]:
search_map[item["id"]] = item
all_skipped = True
skipped_files = []
for i, track_id in enumerate(track_ids):
download_info = await self.get_track_download_info(track_id, quality)
if not download_info["success"]:
print(f"Failed to get download info for track {track_id}: {download_info['error']}")
continue
download_url = download_info["download_url"]
track_info = download_info["track_info"]
search_info = search_map.get(track_id)
title = track_info.get("title", f"track_{track_id}")
artists_list = []
if search_info and search_info.get("artists"):
for artist in search_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif search_info and search_info.get("artist") and search_info["artist"].get("name"):
artists_list.append(search_info["artist"]["name"])
elif track_info.get("artists"):
for artist in track_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif track_info.get("artist") and track_info["artist"].get("name"):
artists_list.append(track_info["artist"]["name"])
artist_names = ", ".join(artists_list) if artists_list else ""
safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).rstrip()
safe_artists = "".join(c for c in artist_names if c.isalnum() or c in (' ', '-', '_', ',', '.')).rstrip()
if safe_artists:
filename = f"{safe_title} - {safe_artists}.flac"
else:
filename = f"{safe_title}.flac"
filepath = os.path.join(output_dir, filename)
if os.path.exists(filepath):
print(f"File {filename} already exists. Skipping download.")
skipped_files.append(filename)
if len(track_ids) == 1:
return {
"success": True,
"status": "skipped_exists",
"track_id": track_id,
"filename": filename,
"filepath": filepath,
"message": f"File {filename} already exists."
}
continue
all_skipped = False
print(f"Downloading: {filename}")
download_result = await self.download_file(download_url, filepath, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
if download_result["success"]:
print(f"Successfully downloaded track {track_id}")
if embed_meta:
print("Embedding metadata...")
await self.embed_metadata(filepath, track_info, search_info)
return {
"success": True,
"track_id": track_id,
"filename": filename,
"filepath": filepath,
"size": download_result["size"],
"track_info": track_info,
"metadata_embedded": embed_meta
}
else:
print(f"Failed to download track {track_id}: {download_result['error']}")
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
pass
if download_result.get("error") == "Download stopped by user":
return {"success": False, "error": "Download stopped by user", "track_id": track_id}
if all_skipped and skipped_files:
return {
"success": True,
"status": "all_skipped",
"message": f"All files already exist: {', '.join(skipped_files)}"
}
return {"success": False, "error": "All track IDs failed to download or were stopped"}
async def search_and_download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None):
print(f"Searching for: {query}")
if isrc:
print(f"ISRC: {isrc}")
search_result = await self.filter_by_isrc(query, isrc)
if "error" in search_result:
print(f"Search error: {search_result['error']}")
return {"success": False, "error": search_result['error']}
if not search_result["items"]:
print("No tracks found")
return {"success": False, "error": "No tracks found"}
track_ids = [item["id"] for item in search_result["items"]]
print(f"Found {len(track_ids)} track(s): {track_ids}")
download_result = await self.download_track(track_ids, search_result, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
return download_result
async def download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None):
result = await self.search_and_download(query, isrc, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
if result["success"]:
if result.get("status") == "all_skipped":
print(f"Skipped: {result['message']}")
if "filepath" in result:
return result["filepath"]
return output_dir
elif result.get("status") == "skipped_exists":
print(f"Skipped: {result['message']}")
return result["filepath"]
else:
print("Download completed!")
return result["filepath"]
else:
print(f"Download failed: {result['error']}")
if result.get("error") == "Download stopped by user":
raise Exception("Download stopped by user")
raise Exception(result["error"])
async def main():
print("=== LucidaDownloader ===")
lucida = LucidaDownloader(domain="to")
track_id = "2plbrEY59IikOBgBGLjaoe"
service = "tidal"
output_dir = "."
try:
print(f"Getting track: {track_id} from {service}")
metadata = await lucida.get_track_info(track_id, service)
print("Starting download")
downloaded_file = lucida.download(metadata, output_dir)
downloaded_file = await lucida.download(metadata, output_dir)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
print("\n\n=== SquidWTFDownloader ===")
squid = SquidWTFDownloader(region="us")
isrc = "TCAIT2495017"
isrc = "USUM72409273"
output_dir = "."
try:
@@ -566,6 +1022,19 @@ async def main():
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
print("\n\n=== TidalDownloader ===")
tidal = TidalDownloader()
query = "APT."
isrc = "USAT22409172"
output_dir = "."
try:
downloaded_file = await tidal.download(query, isrc, output_dir, quality="LOSSLESS", embed_metadata=True)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
try: