v4.4
This commit is contained in:
+56
-2
@@ -23,6 +23,7 @@ from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException
|
||||
from qobuzDL import QobuzDownloader
|
||||
from tidalDL import TidalDownloader
|
||||
from deezerDL import DeezerDownloader
|
||||
from amazonDL import LucidaDownloader
|
||||
|
||||
@dataclass
|
||||
class Track:
|
||||
@@ -96,6 +97,8 @@ class DownloadWorker(QThread):
|
||||
downloader = TidalDownloader()
|
||||
elif self.service == "deezer":
|
||||
downloader = DeezerDownloader()
|
||||
elif self.service == "amazon":
|
||||
downloader = LucidaDownloader()
|
||||
else:
|
||||
downloader = TidalDownloader()
|
||||
|
||||
@@ -223,6 +226,21 @@ class DownloadWorker(QThread):
|
||||
raise Exception("Downloaded file not found")
|
||||
else:
|
||||
raise Exception("Deezer download failed")
|
||||
elif self.service == "amazon":
|
||||
self.progress.emit(f"Downloading from Amazon Music: {track.title} - {track.artists}", 0)
|
||||
|
||||
is_paused_callback = lambda: self.is_paused
|
||||
is_stopped_callback = lambda: self.is_stopped
|
||||
|
||||
downloaded_file = downloader.download(
|
||||
track.id,
|
||||
track_outpath,
|
||||
is_paused_callback=is_paused_callback,
|
||||
is_stopped_callback=is_stopped_callback
|
||||
)
|
||||
|
||||
if not downloaded_file or not os.path.exists(downloaded_file):
|
||||
raise Exception("Amazon Music download failed")
|
||||
else:
|
||||
track_id = track.id
|
||||
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0)
|
||||
@@ -368,6 +386,19 @@ class DeezerStatusChecker(QThread):
|
||||
self.error.emit(f"Error checking Deezer status: {str(e)}")
|
||||
self.status_updated.emit(False)
|
||||
|
||||
class AmazonStatusChecker(QThread):
|
||||
status_updated = pyqtSignal(bool)
|
||||
error = pyqtSignal(str)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
response = requests.get("https://lucida.to/api/load?url=%2Fapi%2Fcountries%3Fservice%3Damazon", timeout=5)
|
||||
is_online = response.status_code == 200
|
||||
self.status_updated.emit(is_online)
|
||||
except Exception as e:
|
||||
self.error.emit(f"Error checking Amazon Music status: {str(e)}")
|
||||
self.status_updated.emit(False)
|
||||
|
||||
class StatusIndicatorDelegate(QStyledItemDelegate):
|
||||
def paint(self, painter, option, index):
|
||||
item_data = index.data(Qt.ItemDataRole.UserRole)
|
||||
@@ -414,13 +445,23 @@ class ServiceComboBox(QComboBox):
|
||||
self.deezer_status_timer.timeout.connect(self.refresh_deezer_status)
|
||||
self.deezer_status_timer.start(60000)
|
||||
|
||||
self.amazon_status_checker = AmazonStatusChecker()
|
||||
self.amazon_status_checker.status_updated.connect(self.update_amazon_service_status)
|
||||
self.amazon_status_checker.error.connect(lambda e: print(f"Amazon Music status check error: {e}"))
|
||||
self.amazon_status_checker.start()
|
||||
|
||||
self.amazon_status_timer = QTimer(self)
|
||||
self.amazon_status_timer.timeout.connect(self.refresh_amazon_status)
|
||||
self.amazon_status_timer.start(60000)
|
||||
|
||||
def setup_items(self):
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
self.services = [
|
||||
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False},
|
||||
{'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False},
|
||||
{'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False}
|
||||
{'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False},
|
||||
{'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False}
|
||||
]
|
||||
|
||||
for service in self.services:
|
||||
@@ -476,6 +517,19 @@ class ServiceComboBox(QComboBox):
|
||||
self.deezer_status_checker.error.connect(lambda e: print(f"Deezer status check error: {e}"))
|
||||
self.deezer_status_checker.start()
|
||||
|
||||
def update_amazon_service_status(self, is_online):
|
||||
self.update_service_status('amazon', is_online)
|
||||
|
||||
def refresh_amazon_status(self):
|
||||
if hasattr(self, 'amazon_status_checker') and self.amazon_status_checker.isRunning():
|
||||
self.amazon_status_checker.quit()
|
||||
self.amazon_status_checker.wait()
|
||||
|
||||
self.amazon_status_checker = AmazonStatusChecker()
|
||||
self.amazon_status_checker.status_updated.connect(self.update_amazon_service_status)
|
||||
self.amazon_status_checker.error.connect(lambda e: print(f"Amazon Music status check error: {e}"))
|
||||
self.amazon_status_checker.start()
|
||||
|
||||
def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
|
||||
return super().currentData(role)
|
||||
|
||||
@@ -573,7 +627,7 @@ class QobuzRegionComboBox(QComboBox):
|
||||
class SpotiFLACGUI(QWidget):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.current_version = "4.3"
|
||||
self.current_version = "4.4"
|
||||
self.tracks = []
|
||||
self.all_tracks = []
|
||||
self.reset_state()
|
||||
|
||||
BIN
Binary file not shown.
|
After Width: | Height: | Size: 82 KiB |
+123
@@ -0,0 +1,123 @@
|
||||
import requests
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
import base64
|
||||
import urllib3
|
||||
from urllib.parse import unquote
|
||||
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
def extract_data(html, patterns):
|
||||
for pattern in patterns:
|
||||
if match := re.search(pattern, html):
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
def download_track(track_id, service="amazon", output_dir="."):
|
||||
client = requests.Session()
|
||||
client.verify = False
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
|
||||
|
||||
try:
|
||||
spotify_url = f"https://open.spotify.com/track/{track_id}"
|
||||
params = {"url": spotify_url, "country": "auto", "to": service}
|
||||
|
||||
response = client.get("https://lucida.to", params=params, headers=headers, timeout=30)
|
||||
html = response.text
|
||||
|
||||
token = extract_data(html, [r'token:"([^"]+)"', r'"token"\s*:\s*"([^"]+)"'])
|
||||
url = extract_data(html, [r'"url":"([^"]+)"', r'url:"([^"]+)"'])
|
||||
expiry = extract_data(html, [r'tokenExpiry:(\d+)', r'"tokenExpiry"\s*:\s*(\d+)'])
|
||||
|
||||
if not (token and url):
|
||||
raise Exception("Could not extract required data")
|
||||
|
||||
try:
|
||||
decoded_token = base64.b64decode(base64.b64decode(token).decode('latin1')).decode('latin1')
|
||||
except:
|
||||
decoded_token = token
|
||||
|
||||
clean_url = url.replace('\\/', '/')
|
||||
print(f"Starting download for: {clean_url}")
|
||||
|
||||
request_data = {
|
||||
"account": {"id": "auto", "type": "country"},
|
||||
"compat": "false", "downscale": "original", "handoff": True,
|
||||
"metadata": True, "private": True,
|
||||
"token": {"primary": decoded_token, "expiry": int(expiry) if expiry else None},
|
||||
"upload": {"enabled": False, "service": "pixeldrain"},
|
||||
"url": clean_url
|
||||
}
|
||||
|
||||
response = client.post("https://lucida.to/api/load?url=/api/fetch/stream/v2",
|
||||
json=request_data, headers=headers)
|
||||
|
||||
if csrf_token := response.cookies.get('csrf_token'):
|
||||
headers['X-CSRF-Token'] = csrf_token
|
||||
|
||||
data = response.json()
|
||||
if not data.get("success"):
|
||||
raise Exception(f"Request failed: {data.get('error', 'Unknown error')}")
|
||||
|
||||
completion_url = f"https://{data['server']}.lucida.to/api/fetch/request/{data['handoff']}"
|
||||
print("Processing track...")
|
||||
|
||||
while True:
|
||||
resp = client.get(completion_url, headers=headers).json()
|
||||
if resp["status"] == "completed":
|
||||
print("Processing completed!")
|
||||
break
|
||||
elif resp["status"] == "error":
|
||||
raise Exception(f"Processing failed: {resp.get('message', 'Unknown error')}")
|
||||
elif progress := resp.get("progress"):
|
||||
percent = int((progress.get("current", 0) / progress.get("total", 100)) * 100)
|
||||
print(f"Progress: {percent}%")
|
||||
time.sleep(1)
|
||||
|
||||
download_url = f"https://{data['server']}.lucida.to/api/fetch/request/{data['handoff']}/download"
|
||||
response = client.get(download_url, stream=True, headers=headers)
|
||||
|
||||
file_name = "track.flac"
|
||||
if content_disp := response.headers.get('content-disposition'):
|
||||
if match := re.search(r'filename[*]?=([^;]+)', content_disp):
|
||||
raw_name = match.group(1).strip('"\'')
|
||||
file_name = unquote(raw_name[7:] if raw_name.startswith("UTF-8''") else raw_name)
|
||||
for char in '<>:"/\\|?*':
|
||||
file_name = file_name.replace(char, '')
|
||||
file_name = file_name.strip()
|
||||
|
||||
file_path = os.path.join(output_dir, file_name)
|
||||
print(f"Downloading: {file_name}")
|
||||
|
||||
with open(file_path, 'wb') as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
|
||||
print(f"Download completed: {file_path}")
|
||||
return file_path
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {str(e)}")
|
||||
return None
|
||||
|
||||
class LucidaDownloader:
|
||||
def __init__(self):
|
||||
self.progress_callback = None
|
||||
|
||||
def set_progress_callback(self, callback):
|
||||
self.progress_callback = callback
|
||||
|
||||
def download(self, track_id, output_dir, is_paused_callback=None, is_stopped_callback=None):
|
||||
"""Download track using Lucida service"""
|
||||
try:
|
||||
return download_track(track_id, service="amazon", output_dir=output_dir)
|
||||
except Exception as e:
|
||||
raise Exception(f"Amazon Music download failed: {str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
track_id = "2plbrEY59IikOBgBGLjaoe"
|
||||
service = "amazon"
|
||||
|
||||
download_track(track_id, service)
|
||||
Reference in New Issue
Block a user