Create LucidaDownloader.py
This commit is contained in:
@@ -0,0 +1,117 @@
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
import time
|
||||
import os
|
||||
import asyncio
|
||||
from GetMetadata import main as get_metadata
|
||||
|
||||
class TrackDownloader:
|
||||
def __init__(self):
|
||||
self.client = requests.Session()
|
||||
self.headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||||
}
|
||||
|
||||
async def get_track_info(self):
|
||||
metadata = await get_metadata()
|
||||
return metadata
|
||||
|
||||
def sanitize_filename(self, filename):
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
for char in invalid_chars:
|
||||
filename = filename.replace(char, '')
|
||||
|
||||
filename = ' '.join(filename.split())
|
||||
filename = filename.replace(' ,', ',')
|
||||
filename = filename.replace(',', ', ').replace(' ', ' ')
|
||||
|
||||
return filename.strip()
|
||||
|
||||
def download(self, metadata, output_dir):
|
||||
track_url = metadata['url']
|
||||
primary_token = metadata['token']
|
||||
expiry = metadata['expiry']
|
||||
|
||||
print(f"Starting download for: {track_url}")
|
||||
|
||||
initial_request = {
|
||||
"account": {"id": "auto", "type": "country"},
|
||||
"compat": "false",
|
||||
"downscale": "original",
|
||||
"handoff": True,
|
||||
"metadata": True,
|
||||
"private": True,
|
||||
"token": {
|
||||
"expiry": expiry,
|
||||
"primary": primary_token
|
||||
},
|
||||
"upload": {"enabled": False, "service": "pixeldrain"},
|
||||
"url": track_url
|
||||
}
|
||||
|
||||
response = self.client.post("https://lucida.to/api/load?url=/api/fetch/stream/v2",
|
||||
json=initial_request,
|
||||
headers=self.headers)
|
||||
|
||||
csrf_token = response.cookies.get('csrf_token')
|
||||
if csrf_token:
|
||||
self.headers['X-CSRF-Token'] = csrf_token
|
||||
|
||||
initial_response = response.json()
|
||||
|
||||
if not initial_response.get("success", False):
|
||||
raise Exception(f"Initial request failed: {initial_response.get('error', 'Unknown error')}")
|
||||
|
||||
handoff = initial_response["handoff"]
|
||||
server = initial_response["server"]
|
||||
|
||||
file_name = f"{metadata['title']} - {metadata['artists']}.flac"
|
||||
file_name = self.sanitize_filename(file_name)
|
||||
|
||||
completion_url = f"https://{server}.lucida.to/api/fetch/request/{handoff}"
|
||||
|
||||
print("Waiting for track processing to complete")
|
||||
while True:
|
||||
completion_response = self.client.get(completion_url, headers=self.headers).json()
|
||||
if completion_response["status"] == "completed":
|
||||
break
|
||||
elif completion_response["status"] == "error":
|
||||
raise Exception(f"API request failed: {completion_response.get('message', 'Unknown error')}")
|
||||
time.sleep(1)
|
||||
|
||||
download_url = f"https://{server}.lucida.to/api/fetch/request/{handoff}/download"
|
||||
print(f"Starting download of: {file_name}")
|
||||
|
||||
response = self.client.get(download_url, stream=True, headers=self.headers)
|
||||
total_size = int(response.headers.get('content-length', 0))
|
||||
|
||||
file_path = os.path.join(output_dir, file_name)
|
||||
|
||||
with open(file_path, 'wb') as file, tqdm(
|
||||
desc=file_name,
|
||||
total=total_size,
|
||||
unit='iB',
|
||||
unit_scale=True,
|
||||
unit_divisor=1024,
|
||||
) as progress_bar:
|
||||
for data in response.iter_content(chunk_size=1024):
|
||||
size = file.write(data)
|
||||
progress_bar.update(size)
|
||||
|
||||
print(f"Download completed: {file_path}")
|
||||
return file_path
|
||||
|
||||
async def main():
|
||||
downloader = TrackDownloader()
|
||||
output_dir = "."
|
||||
|
||||
try:
|
||||
metadata = await downloader.get_track_info()
|
||||
|
||||
downloaded_file = downloader.download(metadata, output_dir)
|
||||
print(f"File downloaded successfully: {downloaded_file}")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user