Update v1.8

This commit is contained in:
afkarxyz
2025-03-02 06:30:12 +07:00
parent e6e953b2ed
commit 2df77120cf
6 changed files with 91 additions and 292 deletions
+5 -5
View File
@@ -1,7 +1,7 @@
import asyncio import asyncio
import zendriver as zd import zendriver as zd
async def get_metadata(page): async def get_metadata(page, headless=True):
max_attempts = 40 max_attempts = 40
attempts = 0 attempts = 0
@@ -14,9 +14,9 @@ async def get_metadata(page):
const [url, config] = args; const [url, config] = args;
if (url.includes('/api/load?url=%2Fapi%2Ffetch%2Fstream%2Fv2')) { if (url.includes('/api/load?url=%2Fapi%2Ffetch%2Fstream%2Fv2')) {
const payload = JSON.parse(config.body); const payload = JSON.parse(config.body);
const title = document.querySelector('h1.svelte-6pt9ji').textContent; const title = document.querySelector('h1.svelte-6pt9ji').textContent.trim();
const artists = Array.from(document.querySelectorAll('h2.svelte-6pt9ji a.normal')) const artists = Array.from(document.querySelectorAll('h2.svelte-6pt9ji a.normal'))
.map(a => a.textContent) .map(a => a.textContent.trim())
.join(', '); .join(', ');
const cover = document.querySelector('.svelte-6pt9ji .meta.svelte-6pt9ji a').href; const cover = document.querySelector('.svelte-6pt9ji .meta.svelte-6pt9ji a').href;
@@ -82,8 +82,8 @@ async def get_metadata(page):
raise TimeoutError("Timeout") raise TimeoutError("Timeout")
async def main(): async def main(headless=True):
browser = await zd.start(headless=False) browser = await zd.start(headless=headless)
try: try:
track_id = "2plbrEY59IikOBgBGLjaoe" track_id = "2plbrEY59IikOBgBGLjaoe"
url = f"https://lucida.to/?url=https%3A%2F%2Fopen.spotify.com%2Ftrack%2F{track_id}&country=auto&to=tidal" url = f"https://lucida.to/?url=https%3A%2F%2Fopen.spotify.com%2Ftrack%2F{track_id}&country=auto&to=tidal"
-120
View File
@@ -1,120 +0,0 @@
import requests
from tqdm import tqdm
import time
import os
import asyncio
from GetMetadata import main as get_metadata
class TrackDownloader:
def __init__(self):
self.client = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
async def get_track_info(self):
metadata = await get_metadata()
return metadata
def sanitize_filename(self, filename):
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '')
filename = ' '.join(filename.split())
filename = filename.replace(' ,', ',')
filename = filename.replace(',', ', ')
while ' ' in filename:
filename = filename.replace(' ', ' ')
filename = filename.rsplit('.', 1)
filename[0] = filename[0].strip()
return '.'.join(filename)
def download(self, metadata, output_dir):
track_url = metadata['url']
primary_token = metadata['token']
expiry = metadata['expiry']
print(f"Starting download for: {track_url}")
initial_request = {
"account": {"id": "auto", "type": "country"},
"compat": "false",
"downscale": "original",
"handoff": True,
"metadata": True,
"private": True,
"token": {
"expiry": expiry,
"primary": primary_token
},
"upload": {"enabled": False, "service": "pixeldrain"},
"url": track_url
}
response = self.client.post("https://lucida.to/api/load?url=/api/fetch/stream/v2",
json=initial_request,
headers=self.headers)
csrf_token = response.cookies.get('csrf_token')
if csrf_token:
self.headers['X-CSRF-Token'] = csrf_token
initial_response = response.json()
if not initial_response.get("success", False):
raise Exception(f"Initial request failed: {initial_response.get('error', 'Unknown error')}")
handoff = initial_response["handoff"]
server = initial_response["server"]
file_name = f"{metadata['title']} - {metadata['artists']}.flac"
file_name = self.sanitize_filename(file_name)
completion_url = f"https://{server}.lucida.to/api/fetch/request/{handoff}"
print("Waiting for track processing to complete")
while True:
completion_response = self.client.get(completion_url, headers=self.headers).json()
if completion_response["status"] == "completed":
break
elif completion_response["status"] == "error":
raise Exception(f"API request failed: {completion_response.get('message', 'Unknown error')}")
time.sleep(1)
download_url = f"https://{server}.lucida.to/api/fetch/request/{handoff}/download"
print(f"Starting download of: {file_name}")
response = self.client.get(download_url, stream=True, headers=self.headers)
total_size = int(response.headers.get('content-length', 0))
file_path = os.path.join(output_dir, file_name)
with open(file_path, 'wb') as file, tqdm(
desc=file_name,
total=total_size,
unit='iB',
unit_scale=True,
unit_divisor=1024,
) as progress_bar:
for data in response.iter_content(chunk_size=1024):
size = file.write(data)
progress_bar.update(size)
print(f"Download completed: {file_path}")
return file_path
async def main():
downloader = TrackDownloader()
output_dir = "."
try:
metadata = await downloader.get_track_info()
downloaded_file = downloader.download(metadata, output_dir)
print(f"File downloaded successfully: {downloaded_file}")
except Exception as e:
print(f"An error occurred: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())
+64 -58
View File
@@ -1,7 +1,7 @@
import sys import sys
import asyncio
import os import os
import time import time
from datetime import datetime
import requests import requests
from pathlib import Path from pathlib import Path
from packaging import version from packaging import version
@@ -11,7 +11,6 @@ from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QGroupBox, QComboBox, QDialog, QDialogButtonBox) QGroupBox, QComboBox, QDialog, QDialogButtonBox)
from PyQt6.QtCore import QThread, pyqtSignal, Qt, QSettings, QSize, QTimer, QUrl from PyQt6.QtCore import QThread, pyqtSignal, Qt, QSettings, QSize, QTimer, QUrl
from PyQt6.QtGui import QIcon, QPixmap, QCursor,QDesktopServices from PyQt6.QtGui import QIcon, QPixmap, QCursor,QDesktopServices
from getMetadata import get_metadata
from getTracks import TrackDownloader from getTracks import TrackDownloader
class ImageDownloader(QThread): class ImageDownloader(QThread):
@@ -31,40 +30,18 @@ class MetadataFetcher(QThread):
finished = pyqtSignal(dict) finished = pyqtSignal(dict)
error = pyqtSignal(str) error = pyqtSignal(str)
def __init__(self, url, headless=True, service="tidal", use_fallback=False): def __init__(self, url, service="amazon", use_fallback=False):
super().__init__() super().__init__()
self.url = url self.url = url
self.headless_mode = headless
self.service = service self.service = service
self.use_fallback = use_fallback self.use_fallback = use_fallback
self.max_retries = 3 self.max_retries = 3
def extract_track_id(self, url): def extract_track_id(self, url):
if "track/" in url: if "track/" in url:
return url.split("track/")[1].split("?")[0] return url.split("track/")[1].split("?")[0].split("/")[0]
return None return None
async def fetch_metadata(self, track_id):
import zendriver as zd
from asyncio import sleep
domain = "lucida.su" if self.use_fallback else "lucida.to"
for attempt in range(self.max_retries):
try:
lucida_url = f"https://{domain}/?url=https%3A%2F%2Fopen.spotify.com%2Ftrack%2F{track_id}&country=auto&to={self.service}"
browser = await zd.start(headless=self.headless_mode)
try:
page = await browser.get(lucida_url)
return await get_metadata(page)
finally:
await browser.stop()
except Exception as e:
if "refused" in str(e).lower() and attempt < self.max_retries - 1:
await sleep(2 * (attempt + 1))
continue
raise e
def run(self): def run(self):
try: try:
track_id = self.extract_track_id(self.url) track_id = self.extract_track_id(self.url)
@@ -72,11 +49,33 @@ class MetadataFetcher(QThread):
self.error.emit("Invalid Spotify URL") self.error.emit("Invalid Spotify URL")
return return
metadata = asyncio.run(self.fetch_metadata(track_id)) fallback = "su" if self.use_fallback else "to"
if metadata: api_url = f"https://apislucida.vercel.app/{fallback}/{track_id}/{self.service}"
self.finished.emit(metadata)
else: for attempt in range(self.max_retries):
self.error.emit("Failed to fetch track metadata") try:
response = requests.get(api_url)
response.raise_for_status()
metadata = response.json()
formatted_metadata = {
'title': metadata['title'],
'artists': metadata['artists'],
'cover': metadata['coverArtwork'],
'url': metadata['url'],
'token': metadata['token'],
'duration': metadata.get('durationMs', 0),
'release_date': metadata.get('releaseDate', '')
}
self.finished.emit(formatted_metadata)
return
except requests.exceptions.RequestException as e:
if attempt < self.max_retries - 1:
time.sleep(2 * (attempt + 1))
continue
raise e
except Exception as e: except Exception as e:
error_msg = str(e) error_msg = str(e)
@@ -162,9 +161,8 @@ class ServiceComboBox(QComboBox):
os.makedirs(icons_dir) os.makedirs(icons_dir)
services = [ services = [
{'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png'},
{'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png'}, {'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png'},
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png'} {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png'}
] ]
for service in services: for service in services:
@@ -218,7 +216,7 @@ class UpdateDialog(QDialog):
class SpotiFlacGUI(QMainWindow): class SpotiFlacGUI(QMainWindow):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.current_version = "1.7" self.current_version = "1.8"
self.settings = QSettings('SpotiFlac', 'Settings') self.settings = QSettings('SpotiFlac', 'Settings')
self.setWindowTitle("SpotiFLAC") self.setWindowTitle("SpotiFLAC")
self.check_for_updates = self.settings.value('check_for_updates', True, type=bool) self.check_for_updates = self.settings.value('check_for_updates', True, type=bool)
@@ -269,13 +267,11 @@ class SpotiFlacGUI(QMainWindow):
print(f"Error checking for updates: {e}") print(f"Error checking for updates: {e}")
def load_settings(self): def load_settings(self):
headless = self.settings.value('headless', True, type=bool)
fallback = self.settings.value('fallback', False, type=bool) fallback = self.settings.value('fallback', False, type=bool)
service = self.settings.value('service', 'tidal') service = self.settings.value('service', 'amazon')
format_type = self.settings.value('format', 'title_artist') format_type = self.settings.value('format', 'title_artist')
output_dir = self.settings.value('output_dir', self.default_music_dir) output_dir = self.settings.value('output_dir', self.default_music_dir)
self.headless_checkbox.setChecked(headless)
self.fallback_checkbox.setChecked(fallback) self.fallback_checkbox.setChecked(fallback)
for i in range(self.service_combo.count()): for i in range(self.service_combo.count()):
@@ -288,8 +284,6 @@ class SpotiFlacGUI(QMainWindow):
self.dir_input.setText(output_dir) self.dir_input.setText(output_dir)
def setup_settings_persistence(self): def setup_settings_persistence(self):
self.headless_checkbox.stateChanged.connect(
lambda x: self.settings.setValue('headless', bool(x)))
self.fallback_checkbox.stateChanged.connect( self.fallback_checkbox.stateChanged.connect(
lambda x: self.settings.setValue('fallback', bool(x))) lambda x: self.settings.setValue('fallback', bool(x)))
self.service_combo.currentIndexChanged.connect( self.service_combo.currentIndexChanged.connect(
@@ -348,12 +342,7 @@ class SpotiFlacGUI(QMainWindow):
settings_container_layout.setContentsMargins(0, 0, 0, 0) settings_container_layout.setContentsMargins(0, 0, 0, 0)
settings_container_layout.setSpacing(10) settings_container_layout.setSpacing(10)
self.headless_checkbox = QCheckBox("Headless") self.fallback_checkbox = QCheckBox("Fallback Server")
self.headless_checkbox.setCursor(QCursor(Qt.CursorShape.PointingHandCursor))
self.headless_checkbox.setChecked(True)
settings_container_layout.addWidget(self.headless_checkbox)
self.fallback_checkbox = QCheckBox("Fallback")
self.fallback_checkbox.setCursor(QCursor(Qt.CursorShape.PointingHandCursor)) self.fallback_checkbox.setCursor(QCursor(Qt.CursorShape.PointingHandCursor))
self.fallback_checkbox.setChecked(False) self.fallback_checkbox.setChecked(False)
settings_container_layout.addWidget(self.fallback_checkbox) settings_container_layout.addWidget(self.fallback_checkbox)
@@ -377,7 +366,7 @@ class SpotiFlacGUI(QMainWindow):
format_layout.setContentsMargins(0, 0, 0, 0) format_layout.setContentsMargins(0, 0, 0, 0)
format_layout.setSpacing(10) format_layout.setSpacing(10)
format_label = QLabel("Filename:") format_label = QLabel("Filename Format:")
self.format_title_artist = QRadioButton("Title") self.format_title_artist = QRadioButton("Title")
self.format_artist_title = QRadioButton("Artist") self.format_artist_title = QRadioButton("Artist")
self.format_title_artist.setCursor(QCursor(Qt.CursorShape.PointingHandCursor)) self.format_title_artist.setCursor(QCursor(Qt.CursorShape.PointingHandCursor))
@@ -529,10 +518,9 @@ class SpotiFlacGUI(QMainWindow):
return return
self.fetch_button.setEnabled(False) self.fetch_button.setEnabled(False)
self.status_label.setText("Fetching track information...") self.status_label.setText("Fetching track information...")
headless = self.headless_checkbox.isChecked()
fallback = self.fallback_checkbox.isChecked() fallback = self.fallback_checkbox.isChecked()
service = self.service_combo.currentData() service = self.service_combo.currentData()
self.fetcher = MetadataFetcher(url, headless=headless, service=service, use_fallback=fallback) self.fetcher = MetadataFetcher(url, service=service, use_fallback=fallback)
self.fetcher.finished.connect(self.handle_track_info) self.fetcher.finished.connect(self.handle_track_info)
self.fetcher.error.connect(self.handle_fetch_error) self.fetcher.error.connect(self.handle_fetch_error)
self.fetcher.start() self.fetcher.start()
@@ -541,25 +529,43 @@ class SpotiFlacGUI(QMainWindow):
self.metadata = metadata self.metadata = metadata
self.fetch_button.setEnabled(True) self.fetch_button.setEnabled(True)
self.title_label.setText(metadata['title'].strip()) self.title_label.setText(metadata['title'].strip())
self.artist_label.setText(metadata['artists'].strip())
artist_text = ""
artists_list = metadata['artists'].strip().split(",")
if len(artists_list) > 1:
artist_text += "<b>Artists</b> " + metadata['artists'].strip()
else:
artist_text += "<b>Artist</b> " + metadata['artists'].strip()
if metadata.get('release_date'):
try:
date_obj = datetime.fromisoformat(metadata['release_date'].replace('Z', '+00:00'))
formatted_date = date_obj.strftime("%d-%m-%Y")
artist_text += f"<br><b>Released</b> {formatted_date}"
except:
if metadata['release_date']:
artist_text += f"<br><b>Released</b> {metadata['release_date']}"
if metadata.get('duration'):
duration_ms = metadata['duration']
minutes = int(duration_ms / 60000)
seconds = int((duration_ms % 60000) / 1000)
artist_text += f"<br><b>Duration</b> {minutes}:{seconds:02d}"
self.artist_label.setText(artist_text)
self.artist_label.setTextFormat(Qt.TextFormat.RichText)
self.image_downloader = ImageDownloader(metadata['cover']) self.image_downloader = ImageDownloader(metadata['cover'])
self.image_downloader.finished.connect(self.update_cover_art) self.image_downloader.finished.connect(self.update_cover_art)
self.image_downloader.start() self.image_downloader.start()
self.input_widget.hide() self.input_widget.hide()
self.track_widget.show() self.track_widget.show()
self.download_button.show() self.download_button.show()
self.cancel_button.show() self.cancel_button.show()
self.update_button.hide() self.update_button.hide()
self.status_label.clear() self.status_label.clear()
self.adjustWindowHeight()
def adjustWindowHeight(self):
title_height = self.title_label.sizeHint().height()
artist_height = self.artist_label.sizeHint().height()
base_height = 180
additional_height = max(0, (title_height + artist_height) - 40)
new_height = min(300, base_height + additional_height)
self.setFixedHeight(int(new_height))
def update_cover_art(self, image_data): def update_cover_art(self, image_data):
pixmap = QPixmap() pixmap = QPixmap()
-99
View File
@@ -1,99 +0,0 @@
import asyncio
import zendriver as zd
async def get_metadata(page, headless=True):
max_attempts = 40
attempts = 0
await asyncio.sleep(2)
await page.evaluate("""
window.downloadInfo = null;
const originalFetch = window.fetch;
window.fetch = async function(...args) {
const [url, config] = args;
if (url.includes('/api/load?url=%2Fapi%2Ffetch%2Fstream%2Fv2')) {
const payload = JSON.parse(config.body);
const title = document.querySelector('h1.svelte-6pt9ji').textContent.trim();
const artists = Array.from(document.querySelectorAll('h2.svelte-6pt9ji a.normal'))
.map(a => a.textContent.trim())
.join(', ');
const cover = document.querySelector('.svelte-6pt9ji .meta.svelte-6pt9ji a').href;
window.downloadInfo = {
url: payload.url,
cover: cover,
title: title,
artists: artists,
token: payload.token.primary,
expiry: payload.token.expiry
};
}
return originalFetch.apply(this, args);
};
""")
await page.evaluate("""
function waitForElement(selector) {
return new Promise(resolve => {
if (document.querySelector(selector)) {
return resolve(document.querySelector(selector));
}
const observer = new MutationObserver(mutations => {
if (document.querySelector(selector)) {
observer.disconnect();
resolve(document.querySelector(selector));
}
});
observer.observe(document.documentElement, {
childList: true,
subtree: true
});
});
}
(async () => {
if (!window.location.hostname.includes('lucida.')) return;
await Promise.race([
waitForElement('.d1-track button'),
waitForElement('button[class*="download-button"]')
]);
const clickDownloadButton = () => {
const button = document.querySelector('.d1-track button') ||
document.querySelector('button[class*="download-button"]');
if (button) button.click();
};
clickDownloadButton();
})();
""")
while attempts < max_attempts:
download_info = await page.evaluate("window.downloadInfo")
if download_info:
return download_info
await asyncio.sleep(0.5)
attempts += 1
raise TimeoutError("Timeout")
async def main(headless=True):
browser = await zd.start(headless=headless)
try:
track_id = "2plbrEY59IikOBgBGLjaoe"
url = f"https://lucida.to/?url=https%3A%2F%2Fopen.spotify.com%2Ftrack%2F{track_id}&country=auto&to=tidal"
page = await browser.get(url)
download_info = await get_metadata(page)
print(download_info)
return download_info
finally:
await browser.stop()
if __name__ == "__main__":
asyncio.run(main())
+19 -7
View File
@@ -2,7 +2,6 @@ import requests
import time import time
import os import os
import asyncio import asyncio
from getMetadata import main as get_metadata
class TrackDownloader: class TrackDownloader:
def __init__(self, use_fallback=False): def __init__(self, use_fallback=False):
@@ -14,6 +13,7 @@ class TrackDownloader:
self.filename_format = 'title_artist' self.filename_format = 'title_artist'
self.use_fallback = use_fallback self.use_fallback = use_fallback
self.base_domain = "lucida.su" if use_fallback else "lucida.to" self.base_domain = "lucida.su" if use_fallback else "lucida.to"
self.api_base = "https://apislucida.vercel.app"
def set_progress_callback(self, callback): def set_progress_callback(self, callback):
self.progress_callback = callback self.progress_callback = callback
@@ -28,9 +28,19 @@ class TrackDownloader:
filename = f"{metadata['title']} - {metadata['artists']}.flac" filename = f"{metadata['title']} - {metadata['artists']}.flac"
return self.sanitize_filename(filename) return self.sanitize_filename(filename)
async def get_track_info(self): async def get_track_info(self, track_id, service="amazon", use_fallback=None):
metadata = await get_metadata() if use_fallback is None:
return metadata use_fallback = self.use_fallback
fallback = "su" if use_fallback else "to"
api_url = f"{self.api_base}/{fallback}/{track_id}/{service}"
try:
response = requests.get(api_url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to get track info: {str(e)}")
def sanitize_filename(self, filename): def sanitize_filename(self, filename):
invalid_chars = '<>:"/\\|?*' invalid_chars = '<>:"/\\|?*'
@@ -48,8 +58,8 @@ class TrackDownloader:
def download(self, metadata, output_dir): def download(self, metadata, output_dir):
track_url = metadata['url'] track_url = metadata['url']
primary_token = metadata['token'] primary_token = metadata['token']['primary']
expiry = metadata['expiry'] expiry = metadata['token']['expiry']
print(f"Starting download for: {track_url}") print(f"Starting download for: {track_url}")
@@ -131,9 +141,11 @@ class TrackDownloader:
async def main(): async def main():
downloader = TrackDownloader() downloader = TrackDownloader()
output_dir = "." output_dir = "."
track_id = "2plbrEY59IikOBgBGLjaoe"
service = "amazon"
try: try:
metadata = await downloader.get_track_info() metadata = await downloader.get_track_info(track_id, service)
downloaded_file = downloader.download(metadata, output_dir) downloaded_file = downloader.download(metadata, output_dir)
print(f"File downloaded successfully: {downloaded_file}") print(f"File downloaded successfully: {downloaded_file}")
except Exception as e: except Exception as e:
+1 -1
View File
@@ -1,3 +1,3 @@
{ {
"version": "1.7" "version": "1.8"
} }