Files
SpotiFLAC/getTracks.py
T
afkarxyz 1e7a48d263 v2.6
2025-05-06 10:38:01 +07:00

318 lines
13 KiB
Python

import requests
import time
import os
import asyncio
import re
import base64
class TrackDownloader:
def __init__(self, use_fallback=False, timeout=30):
self.client = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.progress_callback = None
self.use_fallback = use_fallback
self.timeout = timeout
self.base_domain = "lucida.su" if use_fallback else "lucida.to"
def set_progress_callback(self, callback):
self.progress_callback = callback
def generate_filename(self, track_id, service):
return f"{track_id}_{service}.flac"
async def get_track_info(self, track_id, service="amazon", use_fallback=None):
if use_fallback is None:
use_fallback = self.use_fallback
domain_type = "su" if use_fallback else "to"
spotify_url = f"https://open.spotify.com/track/{track_id}"
result = self.convert_spotify_link(spotify_url, service, domain_type)
if "error" in result:
raise Exception(f"Failed to get track info: {result['error']}")
result["track_id"] = track_id
return result
def convert_spotify_link(self, spotify_url, target_service="amazon", domain_type="to"):
track_id_match = re.search(r'track/([a-zA-Z0-9]+)', spotify_url)
if not track_id_match:
return {"error": "Invalid Spotify URL"}
domain = "lucida.to" if domain_type == "to" else "lucida.su"
base_url = f"https://{domain}"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "id-ID,id;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": domain,
"Pragma": "no-cache",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
try:
headers["Referer"] = f"{base_url}/?url={spotify_url}&country=auto"
request_params = {
"url": spotify_url,
"country": "auto",
"to": target_service
}
session = requests.Session()
session.verify = True
response = session.get(
base_url,
params=request_params,
headers=headers,
timeout=self.timeout
)
html_content = response.text
token_match = re.search(r'token:"([^"]+)"', html_content)
token_expiry_match = re.search(r'tokenExpiry:(\d+)', html_content)
token = token_match.group(1) if token_match else None
token_expiry = int(token_expiry_match.group(1)) if token_expiry_match else None
url = None
url_patterns = [
r'"url":"([^"]+)"',
r'href="(https?://[^"]*' + re.escape(target_service) + r'[^"]*track[^"]*)"',
]
for pattern in url_patterns:
url_match = re.search(pattern, html_content)
if url_match:
url = url_match.group(1).replace('\\/', '/')
break
if not url:
redirect_patterns = [
r'url=([^&"]+)',
r'href="([^"]+)"',
r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]',
]
for pattern in redirect_patterns:
matches = re.finditer(pattern, html_content)
for match in matches:
potential_url = match.group(1)
if potential_url.startswith('http') and target_service.lower() in potential_url.lower():
url = potential_url.replace('\\/', '/')
break
if not url:
service_urls = re.finditer(r'(https?://[^"\s]+' + re.escape(target_service) + r'[^"\s]+)', html_content)
for match in service_urls:
url = match.group(1).replace('\\/', '/')
break
result = {
"service": target_service,
"url": url,
"token": {
"primary": None,
"expiry": None
}
}
if token:
try:
decoded_once = base64.b64decode(token).decode('latin1')
decoded_token = base64.b64decode(decoded_once).decode('latin1')
result["token"]["primary"] = decoded_token
except Exception:
result["token"]["primary"] = token
result["token"]["expiry"] = token_expiry
return result
except Exception as error:
return {"error": str(error)}
def download(self, metadata, output_dir, is_paused_callback=None, is_stopped_callback=None):
track_url = metadata['url']
primary_token = metadata['token']['primary']
expiry = metadata['token']['expiry']
track_id = metadata['track_id']
service = metadata['service']
print(f"Starting download for: {track_url}")
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped by user")
initial_request = {
"account": {"id": "auto", "type": "country"},
"compat": "false",
"downscale": "original",
"handoff": True,
"metadata": True,
"private": True,
"token": {
"expiry": expiry,
"primary": primary_token
},
"upload": {"enabled": False, "service": "pixeldrain"},
"url": track_url
}
response = self.client.post(f"https://{self.base_domain}/api/load?url=/api/fetch/stream/v2",
json=initial_request,
headers=self.headers)
csrf_token = response.cookies.get('csrf_token')
if csrf_token:
self.headers['X-CSRF-Token'] = csrf_token
initial_response = response.json()
if not initial_response.get("success", False):
raise Exception(f"Initial request failed: {initial_response.get('error', 'Unknown error')}")
handoff = initial_response["handoff"]
server = initial_response["server"]
file_name = self.generate_filename(track_id, service)
completion_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}"
print("Waiting for track processing to complete")
while True:
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped by user")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped by user")
completion_response = self.client.get(completion_url, headers=self.headers).json()
status = completion_response["status"]
if status == "completed":
print("Processing completed: 100%")
break
elif status == "error":
raise Exception(f"API request failed: {completion_response.get('message', 'Unknown error')}")
else:
progress = completion_response.get("progress", {})
if progress:
current = progress.get("current", 0)
total = progress.get("total", 100)
percent = int((current / total) * 100) if total > 0 else 0
action = progress.get("action", "Processing")
print(f"Progress: {percent}% - {action} ({current}/{total})")
if action.lower() == "metadata":
if self.progress_callback:
self.progress_callback(0, 0)
else:
print(f"Status: {status} - Waiting for progress information...")
if status.lower() == "metadata":
if self.progress_callback:
self.progress_callback(0, 0)
time.sleep(1)
download_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}/download"
print(f"Starting download of: {file_name}")
response = self.client.get(download_url, stream=True, headers=self.headers)
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
file_path = os.path.join(output_dir, file_name)
try:
with open(file_path, 'wb') as file:
start_time = time.time()
last_update_time = start_time
for chunk in response.iter_content(chunk_size=8192):
if is_stopped_callback and is_stopped_callback():
file.close()
if os.path.exists(file_path):
os.remove(file_path)
raise Exception("Download stopped by user")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
file.close()
if os.path.exists(file_path):
os.remove(file_path)
raise Exception("Download stopped by user")
if chunk:
file.write(chunk)
downloaded_size += len(chunk)
current_time = time.time()
if current_time - last_update_time >= 1:
if total_size > 0:
progress_percent = (downloaded_size / total_size) * 100
elapsed_time = current_time - start_time
speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0
print(f"Download progress: {progress_percent:.2f}% ({downloaded_size}/{total_size}) - {speed:.2f} MB/s")
else:
print(f"Downloaded {downloaded_size / (1024 * 1024):.2f} MB")
last_update_time = current_time
if self.progress_callback:
self.progress_callback(downloaded_size, total_size)
if downloaded_size == 0:
raise Exception("No data received from server")
print(f"Download completed: {file_path}")
return file_path
except Exception as e:
if os.path.exists(file_path) and os.path.getsize(file_path) == 0:
try:
os.remove(file_path)
except:
pass
raise e
async def main():
use_fallback = False
downloader = TrackDownloader(use_fallback)
output_dir = "."
track_id = "2plbrEY59IikOBgBGLjaoe"
service = "tidal"
def progress_update(current, total):
if total > 0:
percent = (current / total) * 100
print(f"\rDownload progress: {percent:.2f}% ({current}/{total})", end="")
downloader.set_progress_callback(progress_update)
try:
print(f"Getting track info for ID: {track_id} from {service}")
metadata = await downloader.get_track_info(track_id, service)
print(f"Track info received, starting download process")
downloaded_file = downloader.download(metadata, output_dir)
print(f"\nFile downloaded successfully: {downloaded_file}")
except Exception as e:
print(f"An error occurred: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())