Compare commits

...

18 Commits

Author SHA1 Message Date
afkarxyz c13855fadd v2.9 2025-05-30 22:22:14 +07:00
afkarxyz 2b12684960 v2.8 2025-05-23 16:47:38 +07:00
afkarxyz 4bc164cc56 v2.8 2025-05-23 16:43:45 +07:00
afkarxyz 46cb65665e Update README.md 2025-05-19 10:33:04 +07:00
afkarxyz 276b3b4951 Update README.md 2025-05-13 20:13:37 +07:00
afkarxyz e15aadbd61 Update README.md 2025-05-13 20:11:35 +07:00
afkarxyz d7639bae8f v2.7 2025-05-13 20:11:32 +07:00
afkarxyz 1af7ab65c9 v2.7 2025-05-13 20:07:19 +07:00
afkarxyz c5240596cb Revert 2025-05-13 12:06:55 +07:00
afkarxyz c4a9042adc v2.9 2025-05-12 00:08:55 +07:00
afkarxyz 45ac08ecbd v2.9 2025-05-12 00:05:34 +07:00
afkarxyz 0add305d9c v2.8 2025-05-11 18:34:58 +07:00
afkarxyz 9b6b43c0a4 v2.8 2025-05-11 18:31:28 +07:00
afkarxyz 60d20cbebe Revert 2025-05-11 17:03:23 +07:00
afkarxyz 626d58667e v2.8 2025-05-11 16:32:26 +07:00
afkarxyz 4dd1a7ea12 v2.8 2025-05-11 15:58:46 +07:00
afkarxyz 67964e4acb Update README.md 2025-05-11 04:40:22 +07:00
afkarxyz 1486fb13df v2.7 2025-05-10 20:33:17 +07:00
5 changed files with 1177 additions and 506 deletions
+5 -3
View File
@@ -3,10 +3,10 @@
![SpotiFLAC](https://github.com/user-attachments/assets/b4c4f403-edbd-4a71-b74b-c7d433d47d06)
<div align="center">
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music and Deezer with the help of Lucida.
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music, and Deezer <code>(via Lucida)</code>, as well as Qobuz <code>(via SquidWTF)</code>.
</div>
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v2.6/SpotiFLAC.exe)
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v2.8/SpotiFLAC.exe)
#
@@ -21,9 +21,11 @@
![image](https://github.com/user-attachments/assets/be9a79b5-260e-4948-9f72-10c735210ab7)
![image](https://github.com/user-attachments/assets/c4403934-9003-447e-a27b-fc74cab23454)
![image](https://github.com/user-attachments/assets/1feec621-f8bf-4b2a-ae73-afcb1fb1deba)
![image](https://github.com/user-attachments/assets/66cc3398-547d-4568-8d49-a05ad4997370)
![image](https://github.com/user-attachments/assets/c64b9a08-c99a-4d3a-ae8b-5f834623915b)
> When **Fallback** is enabled, it will use the backup server `Lucida.su`
+376 -223
View File
@@ -5,7 +5,6 @@ from datetime import datetime
import requests
import re
from packaging import version
import json
from PyQt6.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit,
@@ -18,8 +17,7 @@ from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush
from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException
from getTracks import TrackDownloader
import SquidWTF
from getTracks import LucidaDownloader, SquidWTFDownloader, TidalDownloader
@dataclass
class Track:
@@ -30,6 +28,7 @@ class Track:
track_number: int
duration_ms: int
id: str
isrc: str = ""
class MetadataFetchWorker(QThread):
finished = pyqtSignal(dict)
@@ -57,7 +56,7 @@ class DownloadWorker(QThread):
def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False,
album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True,
use_album_subfolders=False, use_fallback=False, service="amazon", timeout=30):
use_album_subfolders=False, use_fallback=False, service="amazon", timeout=30, qobuz_region="us"):
super().__init__()
self.tracks = tracks
self.outpath = outpath
@@ -71,6 +70,7 @@ class DownloadWorker(QThread):
self.use_fallback = use_fallback
self.service = service
self.timeout = timeout
self.qobuz_region = qobuz_region
self.is_paused = False
self.is_stopped = False
self.failed_tracks = []
@@ -84,7 +84,12 @@ class DownloadWorker(QThread):
def run(self):
try:
downloader = TrackDownloader(self.use_fallback, self.timeout)
if self.service == "qobuz":
downloader = SquidWTFDownloader(self.qobuz_region, self.timeout)
elif self.service == "tidal_api":
downloader = TidalDownloader(timeout=self.timeout)
else:
downloader = LucidaDownloader(self.use_fallback, self.timeout)
def progress_update(current, total):
if total > 0:
@@ -112,9 +117,6 @@ class DownloadWorker(QThread):
int((i) / total_tracks * 100))
try:
track_id = track.id
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0)
if self.is_playlist and self.use_album_subfolders:
album_folder = re.sub(r'[<>:"/\\|?*]', '_', track.album)
track_outpath = os.path.join(self.outpath, album_folder)
@@ -122,113 +124,88 @@ class DownloadWorker(QThread):
else:
track_outpath = self.outpath
if self.service == "qobuz":
self.progress.emit(f"Getting track metadata for: {track.title} - {track.artists}", 0)
isrc = None
try:
from getMetadata import get_raw_spotify_data, parse_uri
track_url = track.external_urls
self.progress.emit(f"Fetching Spotify metadata for ISRC...", 0)
raw_data = get_raw_spotify_data(track_url)
if raw_data and "external_ids" in raw_data and "isrc" in raw_data["external_ids"]:
isrc = raw_data["external_ids"]["isrc"]
self.progress.emit(f"Found ISRC from Spotify: {isrc}", 0)
except Exception as e:
self.progress.emit(f"Could not get ISRC from Spotify raw data: {str(e)}", 0)
if not isrc:
self.progress.emit(f"No ISRC found, searching by title and artist", 0)
search_query = f"{track.title} {track.artists}"
try:
self.progress.emit(f"Searching Qobuz for: {search_query}", 0)
qobuz_track_info = SquidWTF.search_track(track.title, track.artists, strict_match=True)
if qobuz_track_info:
qobuz_track_id = qobuz_track_info["id"]
self.progress.emit(f"Found track on Qobuz by title search: {qobuz_track_info['title']} - {qobuz_track_info['performer']['name']}", 0)
found_artist = qobuz_track_info['performer']['name'].lower()
expected_artist = track.artists.lower()
if expected_artist not in found_artist and found_artist not in expected_artist:
self.progress.emit(f"Warning: Artist mismatch! Expected: {track.artists}, Found: {qobuz_track_info['performer']['name']}", 0)
raise Exception(f"Artist mismatch: Expected '{track.artists}', found '{qobuz_track_info['performer']['name']}'")
else:
raise Exception(f"Could not find track on Qobuz: {track.title} - {track.artists}")
except Exception as e:
self.progress.emit(f"Search by title failed: {str(e)}", 0)
raise Exception(f"Could not find track on Qobuz: {track.title} - {track.artists}")
else:
self.progress.emit(f"Searching Qobuz with ISRC: {isrc}", 0)
qobuz_track_info = SquidWTF.get_track_info(isrc)
qobuz_track_id = qobuz_track_info["id"]
self.progress.emit(f"Found track on Qobuz: {qobuz_track_info['title']} - {qobuz_track_info['performer']['name']}", 0)
found_artist = qobuz_track_info['performer']['name'].lower()
expected_artist = track.artists.lower()
if expected_artist not in found_artist and found_artist not in expected_artist:
self.progress.emit(f"Warning: Artist mismatch! Expected: {track.artists}, Found: {qobuz_track_info['performer']['name']}", 0)
download_url = SquidWTF.get_download_url(qobuz_track_id)
os.makedirs(track_outpath, exist_ok=True)
temp_filename = os.path.join(track_outpath, f"temp_{qobuz_track_id}.flac")
self.progress.emit(f"Downloading from Qobuz...", 0)
def progress_callback(current, total):
if total > 0:
percent = (current / total) * 100
current_mb = current / (1024 * 1024)
total_mb = total / (1024 * 1024)
self.progress.emit(f"Download progress: {percent:.2f}% ({current_mb:.2f}MB/{total_mb:.2f}MB)",
int(percent))
try:
SquidWTF.download_file(download_url, temp_filename, progress_callback)
if not os.path.exists(temp_filename) or os.path.getsize(temp_filename) == 0:
raise Exception(f"Downloaded file is empty or does not exist: {temp_filename}")
self.progress.emit(f"Embedding metadata...", 0)
SquidWTF.embed_metadata(temp_filename, qobuz_track_info)
if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers:
new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}"
else:
new_filename = self.get_formatted_filename(track)
new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename)
new_filepath = os.path.join(track_outpath, new_filename)
if os.path.exists(new_filepath):
os.remove(new_filepath)
os.rename(temp_filename, new_filepath)
downloaded_file = new_filepath
self.progress.emit(f"File renamed to: {new_filename}", 0)
except Exception as e:
self.progress.emit(f"Error during download or processing: {str(e)}", 0)
if os.path.exists(temp_filename):
try:
os.remove(temp_filename)
self.progress.emit(f"Removed incomplete download file", 0)
except:
pass
raise Exception(f"Failed to download or process file: {str(e)}")
if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers:
new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}"
else:
import asyncio
metadata = asyncio.run(downloader.get_track_info(track_id, self.service))
new_filename = self.get_formatted_filename(track)
new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename)
new_filepath = os.path.join(track_outpath, new_filename)
if os.path.exists(new_filepath) and os.path.getsize(new_filepath) > 0:
self.progress.emit(f"File already exists: {new_filename}. Skipping download.", 0)
self.progress.emit(f"Skipped: {track.title} - {track.artists}",
int((i + 1) / total_tracks * 100))
continue
if self.service == "qobuz":
if not track.isrc:
self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0)
self.failed_tracks.append((track.title, track.artists, "No ISRC available"))
continue
self.progress.emit(f"Getting track from Qobuz with ISRC: {track.isrc}", 0)
is_paused_callback = lambda: self.is_paused
is_stopped_callback = lambda: self.is_stopped
downloaded_file = downloader.download(
track.isrc,
track_outpath,
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
)
elif self.service == "tidal_api":
if not track.isrc:
self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0)
self.failed_tracks.append((track.title, track.artists, "No ISRC available"))
continue
self.progress.emit(f"Searching and downloading from Tidal (API) for ISRC: {track.isrc} - {track.title} - {track.artists}", 0)
import asyncio
is_paused_callback = lambda: self.is_paused
is_stopped_callback = lambda: self.is_stopped
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
download_result_details = loop.run_until_complete(downloader.download(
query=f"{track.title} {track.artists}",
isrc=track.isrc,
output_dir=track_outpath,
quality="LOSSLESS",
embed_metadata=True,
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
))
if isinstance(download_result_details, str) and os.path.exists(download_result_details):
downloaded_file = download_result_details
elif isinstance(download_result_details, dict) and download_result_details.get("success") == False and download_result_details.get("error") == "Download stopped by user":
self.progress.emit(f"Download stopped by user for: {track.title}",0)
return
elif isinstance(download_result_details, dict) and download_result_details.get("success") == False:
raise Exception(download_result_details.get("error", "Tidal API download failed"))
elif isinstance(download_result_details, dict) and download_result_details.get("status") == "all_skipped" or download_result_details.get("status") == "skipped_exists":
self.progress.emit(f"File already exists or skipped: {new_filename}",0)
downloaded_file = new_filepath
else:
downloaded_file = None
raise Exception(f"Tidal API download failed or returned unexpected result: {download_result_details}")
else:
track_id = track.id
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0)
metadata = downloader.get_track_info(track_id, self.service)
self.progress.emit(f"Track info received, starting download process", 0)
is_paused_callback = lambda: self.is_paused
@@ -240,20 +217,21 @@ class DownloadWorker(QThread):
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
)
if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers:
new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}"
else:
new_filename = self.get_formatted_filename(track)
new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename)
new_filepath = os.path.join(track_outpath, new_filename)
if os.path.exists(downloaded_file) and downloaded_file != new_filepath:
if os.path.exists(new_filepath):
os.remove(new_filepath)
os.rename(downloaded_file, new_filepath)
self.progress.emit(f"File renamed to: {new_filename}", 0)
if self.is_stopped:
return
if downloaded_file == new_filepath:
self.progress.emit(f"File already exists: {new_filename}", 0)
self.progress.emit(f"Skipped: {track.title} - {track.artists}",
int((i + 1) / total_tracks * 100))
continue
if os.path.exists(downloaded_file) and downloaded_file != new_filepath:
if os.path.exists(new_filepath):
os.remove(new_filepath)
os.rename(downloaded_file, new_filepath)
self.progress.emit(f"File renamed to: {new_filename}", 0)
self.progress.emit(f"Successfully downloaded: {track.title} - {track.artists}",
int((i + 1) / total_tracks * 100))
@@ -324,48 +302,50 @@ class ServiceStatusChecker(QThread):
error = pyqtSignal(str)
def run(self):
services_status = {
'amazon': False,
'tidal': False,
'deezer': False,
'qobuz': False
}
try:
response = requests.get("https://lucida.to/api/stats", timeout=5)
services_status = {}
if response.status_code == 200:
try:
data = response.json()
current_services = data.get('all', {}).get('downloads', {}).get('current', {}).get('services', {})
services_status['amazon'] = current_services.get('amazon', 0) > 0
services_status['tidal'] = current_services.get('tidal', 0) > 0
services_status['deezer'] = current_services.get('deezer', 0) > 0
print("Lucida services status check successful")
except json.JSONDecodeError as e:
print(f"Lucida API returned invalid JSON: {e}")
except Exception as e:
print(f"Error processing Lucida API response: {str(e)}")
data = response.json()
current_services = data.get('all', {}).get('downloads', {}).get('current', {}).get('services', {})
services_status['amazon'] = current_services.get('amazon', 0) > 0
services_status['tidal'] = current_services.get('tidal', 0) > 0
services_status['deezer'] = current_services.get('deezer', 0) > 0
else:
print(f"Lucida API returned status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error connecting to Lucida API: {str(e)}")
self.error.emit(f"Lucida API error: {response.status_code}")
self.status_updated.emit(services_status)
except Exception as e:
print(f"Unexpected error checking Lucida services: {str(e)}")
self.error.emit(f"Error checking Lucida service status: {str(e)}")
class TidalStatusChecker(QThread):
status_updated = pyqtSignal(bool)
error = pyqtSignal(str)
def run(self):
try:
qobuz_response = requests.get("https://us.qobuz.squid.wtf", timeout=5)
services_status['qobuz'] = qobuz_response.status_code in [200, 304]
print(f"SquidWTF (Qobuz) status check: {qobuz_response.status_code} - {'Online' if services_status['qobuz'] else 'Offline'}")
except requests.exceptions.RequestException as e:
print(f"Error connecting to SquidWTF API (Qobuz): {str(e)}")
services_status['qobuz'] = False
response = requests.get("https://tidal.401658.xyz", timeout=5)
is_online = response.status_code == 200
self.status_updated.emit(is_online)
except Exception as e:
print(f"Unexpected error checking SquidWTF (Qobuz): {str(e)}")
services_status['qobuz'] = False
self.status_updated.emit(services_status)
self.error.emit(f"Error checking Tidal (API) status: {str(e)}")
self.status_updated.emit(False)
class QobuzStatusChecker(QThread):
status_updated = pyqtSignal(bool)
error = pyqtSignal(str)
def __init__(self, region="us"):
super().__init__()
self.region = region
def run(self):
try:
response = requests.get(f"https://{self.region}.qobuz.squid.wtf", timeout=5)
self.status_updated.emit(response.status_code == 200)
except Exception as e:
self.error.emit(f"Error checking Qobuz status: {str(e)}")
self.status_updated.emit(False)
class StatusIndicatorDelegate(QStyledItemDelegate):
def paint(self, painter, option, index):
@@ -398,12 +378,21 @@ class ServiceComboBox(QComboBox):
self.status_checker = ServiceStatusChecker()
self.status_checker.status_updated.connect(self.update_service_status)
self.status_checker.error.connect(lambda e: print(f"Status check error: {e}"))
self.status_checker.error.connect(lambda e: print(f"General status check error: {e}"))
self.status_checker.start()
self.status_timer = QTimer(self)
self.status_timer.timeout.connect(self.refresh_status)
self.status_timer.start(5000)
self.status_timer.start(5000)
self.tidal_api_status_checker = TidalStatusChecker()
self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status)
self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}"))
self.tidal_api_status_checker.start()
self.tidal_api_status_timer = QTimer(self)
self.tidal_api_status_timer.timeout.connect(self.refresh_tidal_api_status)
self.tidal_api_status_timer.start(6000)
def setup_items(self):
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -412,7 +401,8 @@ class ServiceComboBox(QComboBox):
{'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False},
{'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False},
{'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False},
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False}
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False},
{'id': 'tidal_api', 'name': 'Tidal (API)', 'icon': 'tidal.png', 'online': False}
]
for service in self.services:
@@ -433,12 +423,12 @@ class ServiceComboBox(QComboBox):
pixmap.save(path)
def update_service_status(self, status_dict):
self.services_status = status_dict
self.services_status.update(status_dict)
for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id in self.services_status:
if service_id in self.services_status:
service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict):
service_data['online'] = self.services_status[service_id]
@@ -449,8 +439,110 @@ class ServiceComboBox(QComboBox):
def refresh_status(self):
self.status_checker = ServiceStatusChecker()
self.status_checker.status_updated.connect(self.update_service_status)
self.status_checker.error.connect(lambda e: print(f"Status check error: {e}"))
self.status_checker.error.connect(lambda e: print(f"General status check error: {e}"))
self.status_checker.start()
def update_tidal_api_service_status(self, is_online):
for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id == 'tidal_api':
service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict):
service_data['online'] = is_online
self.setItemData(i, service_data, Qt.ItemDataRole.UserRole)
break
self.update()
def refresh_tidal_api_status(self):
self.tidal_api_status_checker = TidalStatusChecker()
self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status)
self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}"))
self.tidal_api_status_checker.start()
def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
return super().currentData(role)
def update_qobuz_status(self, region_id, is_online):
for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id == 'qobuz':
service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict):
if is_online or service_data.get('online', False):
service_data['online'] = True
self.setItemData(i, service_data, Qt.ItemDataRole.UserRole)
break
self.update()
class QobuzRegionComboBox(QComboBox):
status_updated = pyqtSignal(str, bool)
def __init__(self, parent=None):
super().__init__(parent)
self.setIconSize(QSize(16, 16))
self.setItemDelegate(StatusIndicatorDelegate())
self.setup_items()
self.status_checkers = {}
self.check_status()
self.status_timer = QTimer(self)
self.status_timer.timeout.connect(self.check_status)
self.status_timer.start(10000)
def setup_items(self):
current_dir = os.path.dirname(os.path.abspath(__file__))
self.regions = [
{'id': 'eu', 'name': 'Europe', 'icon': 'eu.svg', 'online': False},
{'id': 'us', 'name': 'North America', 'icon': 'us.svg', 'online': False}
]
for region in self.regions:
icon_path = os.path.join(current_dir, region['icon'])
if not os.path.exists(icon_path):
self.create_placeholder_icon(icon_path)
icon = QIcon(icon_path)
self.addItem(icon, region['name'])
item_index = self.count() - 1
self.setItemData(item_index, region['id'], Qt.ItemDataRole.UserRole + 1)
self.setItemData(item_index, region, Qt.ItemDataRole.UserRole)
def create_placeholder_icon(self, path):
pixmap = QPixmap(16, 16)
pixmap.fill(Qt.GlobalColor.transparent)
pixmap.save(path)
def update_region_status(self, region_id, is_online):
for i in range(self.count()):
current_region_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if current_region_id == region_id:
region_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(region_data, dict):
region_data['online'] = is_online
self.setItemData(i, region_data, Qt.ItemDataRole.UserRole)
break
self.update()
def check_status(self):
for region in self.regions:
region_id = region['id']
checker = QobuzStatusChecker(region_id)
checker.status_updated.connect(lambda status, rid=region_id: self.handle_status_update(rid, status))
checker.start()
self.status_checkers[region_id] = checker
def handle_status_update(self, region_id, is_online):
self.update_region_status(region_id, is_online)
self.status_updated.emit(region_id, is_online)
def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
return super().currentData(role)
@@ -458,7 +550,7 @@ class ServiceComboBox(QComboBox):
class SpotiFLACGUI(QWidget):
def __init__(self):
super().__init__()
self.current_version = "2.7"
self.current_version = "2.9"
self.tracks = []
self.reset_state()
@@ -471,6 +563,7 @@ class SpotiFLACGUI(QWidget):
self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool)
self.use_fallback = self.settings.value('use_fallback', False, type=bool)
self.service = self.settings.value('service', 'amazon')
self.qobuz_region = self.settings.value('qobuz_region', 'us')
self.timeout_value = self.settings.value('timeout_value', 30, type=int)
self.check_for_updates = self.settings.value('check_for_updates', True, type=bool)
@@ -546,7 +639,6 @@ class SpotiFLACGUI(QWidget):
self.setup_tabs()
self.setLayout(self.main_layout)
QTimer.singleShot(0, self.update_service_ui_visibility)
def setup_spotify_section(self):
spotify_layout = QHBoxLayout()
@@ -730,6 +822,7 @@ class SpotiFLACGUI(QWidget):
output_dir_layout.addWidget(self.output_dir)
output_dir_layout.addWidget(self.output_browse)
output_layout.addLayout(output_dir_layout)
settings_layout.addWidget(output_group)
@@ -791,67 +884,67 @@ class SpotiFLACGUI(QWidget):
auth_layout = QVBoxLayout(auth_group)
auth_layout.setSpacing(5)
auth_label = QLabel('Service Setting')
auth_label = QLabel('Service Settings')
auth_label.setStyleSheet("font-weight: bold;")
auth_layout.addWidget(auth_label)
source_fallback_layout = QHBoxLayout()
service_fallback_layout = QHBoxLayout()
service_label = QLabel('Source:')
service_label = QLabel('Service:')
self.service_dropdown = ServiceComboBox()
self.service_dropdown.currentIndexChanged.connect(self.save_service_setting)
self.service_dropdown.currentIndexChanged.connect(self.on_service_changed)
saved_service = self.service
for i in range(self.service_dropdown.count()):
if self.service_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == saved_service:
self.service_dropdown.setCurrentIndex(i)
break
source_fallback_layout.addWidget(service_label)
source_fallback_layout.addWidget(self.service_dropdown)
source_fallback_layout.addSpacing(20)
service_fallback_layout.addWidget(service_label)
service_fallback_layout.addWidget(self.service_dropdown)
service_fallback_layout.addSpacing(10)
self.fallback_checkbox = QCheckBox('Fallback')
self.fallback_checkbox.setCursor(Qt.CursorShape.PointingHandCursor)
self.fallback_checkbox.setChecked(self.use_fallback)
self.fallback_checkbox.toggled.connect(self.save_fallback_setting)
source_fallback_layout.addWidget(self.fallback_checkbox)
source_fallback_layout.addSpacing(20)
service_fallback_layout.addWidget(self.fallback_checkbox)
timeout_label = QLabel('Timeout:')
self.timeout_label = timeout_label
self.timeout_input = QLineEdit()
self.timeout_input.setText(str(self.timeout_value))
self.timeout_input.setFixedWidth(60)
self.timeout_input.setFixedWidth(35)
self.timeout_input.textChanged.connect(self.save_timeout_setting)
source_fallback_layout.addWidget(timeout_label)
source_fallback_layout.addWidget(self.timeout_input)
source_fallback_layout.addStretch()
auth_layout.addLayout(source_fallback_layout)
service_fallback_layout.addWidget(timeout_label)
service_fallback_layout.addWidget(self.timeout_input)
region_label = QLabel('Region:')
self.qobuz_region_dropdown = QobuzRegionComboBox()
self.qobuz_region_dropdown.currentIndexChanged.connect(self.save_qobuz_region_setting)
service_fallback_layout.addWidget(region_label)
service_fallback_layout.addWidget(self.qobuz_region_dropdown)
region_label.hide()
self.qobuz_region_dropdown.hide()
service_fallback_layout.addStretch()
auth_layout.addLayout(service_fallback_layout)
settings_layout.addWidget(auth_group)
settings_layout.addStretch()
settings_tab.setLayout(settings_layout)
self.tab_widget.addTab(settings_tab, "Settings")
def update_service_ui_visibility(self):
if hasattr(self, 'fallback_checkbox') and hasattr(self, 'timeout_input') and hasattr(self, 'timeout_label'):
service = self.service_dropdown.currentData() if hasattr(self, 'service_dropdown') else self.service
if service == "qobuz":
self.fallback_checkbox.setVisible(False)
self.timeout_input.setVisible(False)
self.timeout_label.setVisible(False)
else:
self.fallback_checkbox.setVisible(True)
self.timeout_input.setVisible(True)
self.timeout_label.setVisible(True)
for i in range(self.service_dropdown.count()):
if self.service_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == self.service:
self.service_dropdown.setCurrentIndex(i)
break
for i in range(self.qobuz_region_dropdown.count()):
if self.qobuz_region_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == self.qobuz_region:
self.qobuz_region_dropdown.setCurrentIndex(i)
break
self.qobuz_region_dropdown.status_updated.connect(
lambda region_id, is_online: self.service_dropdown.update_qobuz_status(region_id, is_online)
)
def setup_about_tab(self):
about_tab = QWidget()
about_layout = QVBoxLayout()
@@ -861,7 +954,7 @@ class SpotiFLACGUI(QWidget):
sections = [
("Check for Updates", "https://github.com/afkarxyz/SpotiFLAC/releases"),
("Report an Issue", "https://github.com/afkarxyz/SpotiFLAC/issues"),
("Lucida Site", "https://lucida.to/stats")
("Lucida Status", "https://status.lucida.to")
]
for title, url in sections:
@@ -902,13 +995,60 @@ class SpotiFLACGUI(QWidget):
spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
about_layout.addItem(spacer)
footer_label = QLabel("v2.7 | May 2025")
footer_label = QLabel("v2.9 | May 2025")
footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;")
about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter)
about_tab.setLayout(about_layout)
self.tab_widget.addTab(about_tab, "About")
def on_service_changed(self, index):
service = self.service_dropdown.currentData()
self.service = service
self.settings.setValue('service', service)
self.settings.sync()
timeout_label = None
for widget in self.timeout_input.parentWidget().children():
if isinstance(widget, QLabel) and widget.text() == "Timeout:":
timeout_label = widget
break
region_label = None
for widget in self.qobuz_region_dropdown.parentWidget().children():
if isinstance(widget, QLabel) and widget.text() == "Region:":
region_label = widget
break
if service == "qobuz":
self.fallback_checkbox.hide()
self.timeout_input.hide()
if timeout_label:
timeout_label.hide()
if region_label:
region_label.show()
self.qobuz_region_dropdown.show()
elif service == "tidal_api":
self.fallback_checkbox.hide()
self.timeout_input.hide()
if timeout_label:
timeout_label.hide()
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
else:
self.fallback_checkbox.show()
self.timeout_input.show()
if timeout_label:
timeout_label.show()
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}")
def save_url(self):
self.settings.setValue('spotify_url', self.spotify_url.text().strip())
self.settings.sync()
@@ -949,14 +1089,12 @@ class SpotiFLACGUI(QWidget):
self.timeout_input.setText(str(self.timeout_value))
self.log_output.append("Timeout must be a valid number")
def save_service_setting(self):
service = self.service_dropdown.currentData()
self.service = service
self.settings.setValue('service', service)
def save_qobuz_region_setting(self):
region = self.qobuz_region_dropdown.currentData()
self.qobuz_region = region
self.settings.setValue('qobuz_region', region)
self.settings.sync()
self.log_output.append(f"Service setting saved: {self.service_dropdown.currentText()}")
self.update_service_ui_visibility()
self.log_output.append(f"Qobuz region setting saved: {self.qobuz_region_dropdown.currentText()}")
def save_settings(self):
self.settings.setValue('output_path', self.output_dir.text().strip())
@@ -1018,7 +1156,8 @@ class SpotiFLACGUI(QWidget):
album=track_data["album_name"],
track_number=1,
duration_ms=track_data.get("duration_ms", 0),
id=track_id
id=track_id,
isrc=track_data.get("isrc", "")
)]
self.is_single_track = True
self.is_album = self.is_playlist = False
@@ -1047,7 +1186,8 @@ class SpotiFLACGUI(QWidget):
album=self.album_or_playlist_name,
track_number=track["track_number"],
duration_ms=track.get("duration_ms", 0),
id=track_id
id=track_id,
isrc=track.get("isrc", "")
))
self.is_album = True
@@ -1076,7 +1216,8 @@ class SpotiFLACGUI(QWidget):
album=track["album_name"],
track_number=len(self.tracks) + 1,
duration_ms=track.get("duration_ms", 0),
id=track_id
id=track_id,
isrc=track.get("isrc", "")
))
self.is_playlist = True
@@ -1233,6 +1374,7 @@ class SpotiFLACGUI(QWidget):
def start_download_worker(self, tracks_to_download, outpath):
service = self.service_dropdown.currentData()
qobuz_region = self.qobuz_region_dropdown.currentData() if service == "qobuz" else "us"
self.worker = DownloadWorker(
tracks_to_download,
@@ -1246,7 +1388,8 @@ class SpotiFLACGUI(QWidget):
self.use_album_subfolders,
self.use_fallback,
service,
self.timeout_value
self.timeout_value,
qobuz_region
)
self.worker.finished.connect(self.on_download_finished)
self.worker.progress.connect(self.update_progress)
@@ -1286,7 +1429,7 @@ class SpotiFLACGUI(QWidget):
else:
self.log_output.append(message)
if percentage > 0:
if percentage > 0 and not "Download progress:" in message:
self.progress_bar.setValue(percentage)
def stop_download(self):
@@ -1361,6 +1504,16 @@ class SpotiFLACGUI(QWidget):
self.time_label.hide()
if __name__ == '__main__':
try:
if sys.platform == "win32":
import os
os.system("chcp 65001 > nul")
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
except Exception as e:
print(f"Warning: Could not set UTF-8 encoding: {e}")
app = QApplication(sys.argv)
ex = SpotiFLACGUI()
ex.show()
-228
View File
@@ -1,228 +0,0 @@
import requests
from mutagen.flac import FLAC, Picture
from datetime import datetime
import sys
import os
def get_track_info(isrc):
print(f"Search: {isrc}")
url = f"https://us.qobuz.squid.wtf/api/get-music?q={isrc}&offset=0"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to get track info")
tracks = data["data"]["tracks"]["items"]
if not tracks:
print(f"Not Found: {isrc}")
raise Exception(f"No tracks found for ISRC: {isrc}")
track = None
for item in tracks:
if item["isrc"] == isrc:
track = item
break
if not track:
print(f"Not Found: {isrc}")
raise Exception(f"No track with matching ISRC: {isrc}")
print(f"Found: {track['title']} - {track['performer']['name']}")
return track
def search_track(title, artist, strict_match=False):
print(f"Search by title/artist: {title} - {artist}")
search_query = f"{title} {artist}".replace("feat.", "").replace("ft.", "")
url = f"https://us.qobuz.squid.wtf/api/get-music?q={search_query}&offset=0"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to search for track")
tracks = data["data"]["tracks"]["items"]
if not tracks:
print(f"Not Found: {title} - {artist}")
raise Exception(f"No tracks found for: {title} - {artist}")
best_match = None
title_lower = title.lower()
artist_lower = artist.lower()
for item in tracks:
item_title = item["title"].lower()
item_artist = item["performer"]["name"].lower()
if title_lower == item_title and (artist_lower in item_artist or item_artist in artist_lower):
best_match = item
print(f"Found exact title match with artist: {item['title']} - {item['performer']['name']}")
break
if not best_match and not strict_match:
for item in tracks:
item_title = item["title"].lower()
item_artist = item["performer"]["name"].lower()
if title_lower in item_title and (artist_lower in item_artist or item_artist in artist_lower):
best_match = item
print(f"Found partial match: {item['title']} - {item['performer']['name']}")
break
if strict_match and best_match:
item_artist = best_match["performer"]["name"].lower()
if artist_lower not in item_artist and item_artist not in artist_lower:
print(f"Artist mismatch in strict mode: Expected '{artist}', found '{best_match['performer']['name']}'")
best_match = None
if not best_match and not strict_match and tracks:
best_match = tracks[0]
print(f"No good match, using first result: {best_match['title']} - {best_match['performer']['name']}")
if not best_match:
print(f"Not Found: {title} - {artist}")
raise Exception(f"No suitable track found for: {title} - {artist}")
print(f"Found by title search: {best_match['title']} - {best_match['performer']['name']}")
return best_match
def get_download_url(track_id):
url = f"https://us.qobuz.squid.wtf/api/download-music?track_id={track_id}&quality=27"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to get download URL")
return data["data"]["url"]
def download_file(url, filename, progress_callback=None):
directory = os.path.dirname(filename)
if directory and not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
print(f"Created directory: {directory}")
except Exception as e:
raise Exception(f"Failed to create directory {directory}: {str(e)}")
try:
with open(filename, 'wb') as test_file:
pass
except Exception as e:
raise Exception(f"Cannot write to file {filename}: {str(e)}")
try:
response = requests.get(url, stream=True)
if response.status_code != 200:
raise Exception(f"Failed to download file: {response.status_code}")
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0 and progress_callback:
progress_callback(downloaded, total_size)
elif total_size > 0:
progress = (downloaded / total_size) * 100
sys.stdout.write(f"\rProgress Download: {progress:.1f}%")
sys.stdout.flush()
if total_size > 0:
sys.stdout.write("\n")
if not os.path.exists(filename) or os.path.getsize(filename) == 0:
raise Exception(f"Download failed: File {filename} is empty or does not exist")
return filename
except Exception as e:
if os.path.exists(filename):
try:
os.remove(filename)
print(f"Removed incomplete file: {filename}")
except:
pass
raise Exception(f"Download failed: {str(e)}")
def embed_metadata(filename, track_info):
if not os.path.exists(filename):
raise Exception(f"Cannot embed metadata: File {filename} does not exist")
try:
print("Embedding Tags...")
audio = FLAC(filename)
audio.clear()
audio["TITLE"] = track_info["title"]
audio["ARTIST"] = track_info["performer"]["name"]
audio["ALBUM"] = track_info["album"]["title"]
audio["ALBUMARTIST"] = track_info["album"]["artist"]["name"]
audio["TRACKNUMBER"] = str(track_info["track_number"])
audio["LABEL"] = track_info["album"]["label"]["name"]
audio["GENRE"] = track_info["album"]["genre"]["name"]
release_date = datetime.fromtimestamp(track_info["album"]["released_at"]).strftime("%Y-%m-%d")
release_year = release_date.split("-")[0]
audio["DATE"] = release_date
audio["YEAR"] = release_year
audio["ISRC"] = track_info["isrc"]
audio["COPYRIGHT"] = track_info["copyright"]
if track_info["album"]["image"]["large"]:
try:
cover_data = download_cover_image(track_info["album"]["image"]["large"])
picture = Picture()
picture.type = 3
picture.mime = "image/jpeg"
picture.desc = ""
picture.data = cover_data
audio.add_picture(picture)
except Exception as e:
print(f"Warning: Could not add cover image: {str(e)}")
audio.save()
except Exception as e:
raise Exception(f"Failed to embed metadata: {str(e)}")
def download_cover_image(url):
response = requests.get(url)
if response.status_code != 200:
raise Exception(f"Failed to download cover image: {response.status_code}")
return response.content
def main():
try:
isrc = "USUM72409273"
track_info = get_track_info(isrc)
track_id = track_info["id"]
if track_info["isrc"] != isrc:
raise Exception(f"ISRC mismatch: {track_info['isrc']} != {isrc}")
download_url = get_download_url(track_id)
filename = f"{track_info['title']} - {track_info['performer']['name']}.flac"
filename = filename.replace('/', '_').replace('\\', '_')
download_file(download_url, filename)
embed_metadata(filename, track_info)
print("Downloaded Successfully!")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
+795 -51
View File
@@ -4,17 +4,35 @@ import os
import asyncio
import re
import base64
import json
import tempfile
import httpx
import aiofiles
from datetime import datetime
from mutagen.flac import FLAC, Picture
from mutagen.id3 import PictureType
class TrackDownloader:
def __init__(self, use_fallback=False, timeout=30):
class ProgressCallback:
def __call__(self, current, total):
if total > 0:
percent = (current / total) * 100
print(f"\r{percent:.2f}% ({current}/{total})", end="")
else:
print(f"\r{current / (1024 * 1024):.2f} MB", end="")
class LucidaDownloader:
def __init__(self, domain="to", timeout=30):
self.client = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.progress_callback = None
self.use_fallback = use_fallback
self.progress_callback = ProgressCallback()
self.timeout = timeout
self.base_domain = "lucida.su" if use_fallback else "lucida.to"
if domain not in ["to", "su"]:
raise ValueError("Domain must be either 'to' or 'su'")
self.base_domain = f"lucida.{domain}"
def set_progress_callback(self, callback):
self.progress_callback = callback
@@ -22,37 +40,34 @@ class TrackDownloader:
def generate_filename(self, track_id, service):
return f"{track_id}_{service}.flac"
async def get_track_info(self, track_id, service="amazon", use_fallback=None):
if use_fallback is None:
use_fallback = self.use_fallback
async def get_track_info(self, track_id, service="tidal"):
if service not in ["tidal", "amazon", "deezer"]:
raise ValueError("Service must be one of 'tidal', 'amazon', or 'deezer'")
domain_type = "su" if use_fallback else "to"
spotify_url = f"https://open.spotify.com/track/{track_id}"
result = self.convert_spotify_link(spotify_url, service, domain_type)
result = self._convert_spotify_link(spotify_url, service)
if "error" in result:
raise Exception(f"Failed to get track info: {result['error']}")
raise Exception(f"Error: {result['error']}")
result["track_id"] = track_id
return result
def convert_spotify_link(self, spotify_url, target_service="amazon", domain_type="to"):
def _convert_spotify_link(self, spotify_url, target_service="tidal"):
track_id_match = re.search(r'track/([a-zA-Z0-9]+)', spotify_url)
if not track_id_match:
return {"error": "Invalid Spotify URL"}
domain = "lucida.to" if domain_type == "to" else "lucida.su"
base_url = f"https://{domain}"
base_url = f"https://{self.base_domain}"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "id-ID,id;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": domain,
"Host": self.base_domain,
"Pragma": "no-cache",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
@@ -142,17 +157,26 @@ class TrackDownloader:
except Exception as error:
return {"error": str(error)}
def download(self, metadata, output_dir, is_paused_callback=None, is_stopped_callback=None):
def download(self, metadata, output_dir=".", is_paused_callback=None, is_stopped_callback=None):
track_url = metadata['url']
primary_token = metadata['token']['primary']
expiry = metadata['token']['expiry']
track_id = metadata['track_id']
service = metadata['service']
print(f"Starting download for: {track_url}")
print(f"Starting download: track ID {track_id}")
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped by user")
raise Exception("Download stopped")
file_name = self.generate_filename(track_id, service)
file_path = os.path.join(output_dir, file_name)
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
if file_size > 0:
print(f"File already exists: {file_path} ({file_size / (1024 * 1024):.2f} MB)")
return file_path
initial_request = {
"account": {"id": "auto", "type": "country"},
@@ -180,7 +204,7 @@ class TrackDownloader:
initial_response = response.json()
if not initial_response.get("success", False):
raise Exception(f"Initial request failed: {initial_response.get('error', 'Unknown error')}")
raise Exception(f"Request failed: {initial_response.get('error', 'Unknown error')}")
handoff = initial_response["handoff"]
server = initial_response["server"]
@@ -189,24 +213,24 @@ class TrackDownloader:
completion_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}"
print("Waiting for track processing to complete")
print("Waiting for processing...")
while True:
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped by user")
raise Exception("Download stopped")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
raise Exception("Download stopped by user")
raise Exception("Download stopped")
completion_response = self.client.get(completion_url, headers=self.headers).json()
status = completion_response["status"]
if status == "completed":
print("Processing completed: 100%")
print("Processing: 100%")
break
elif status == "error":
raise Exception(f"API request failed: {completion_response.get('message', 'Unknown error')}")
raise Exception(f"API error: {completion_response.get('message', 'Unknown error')}")
else:
progress = completion_response.get("progress", {})
if progress:
@@ -214,13 +238,13 @@ class TrackDownloader:
total = progress.get("total", 100)
percent = int((current / total) * 100) if total > 0 else 0
action = progress.get("action", "Processing")
print(f"Progress: {percent}% - {action} ({current}/{total})")
print(f"{percent}% - {action}")
if action.lower() == "metadata":
if self.progress_callback:
self.progress_callback(0, 0)
else:
print(f"Status: {status} - Waiting for progress information...")
print(f"Status: {status}")
if status.lower() == "metadata":
if self.progress_callback:
self.progress_callback(0, 0)
@@ -228,7 +252,7 @@ class TrackDownloader:
time.sleep(1)
download_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}/download"
print(f"Starting download of: {file_name}")
print(f"Downloading file...")
response = self.client.get(download_url, stream=True, headers=self.headers)
total_size = int(response.headers.get('content-length', 0))
@@ -246,7 +270,7 @@ class TrackDownloader:
file.close()
if os.path.exists(file_path):
os.remove(file_path)
raise Exception("Download stopped by user")
raise Exception("Download stopped")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
@@ -254,7 +278,7 @@ class TrackDownloader:
file.close()
if os.path.exists(file_path):
os.remove(file_path)
raise Exception("Download stopped by user")
raise Exception("Download stopped")
if chunk:
file.write(chunk)
@@ -266,9 +290,9 @@ class TrackDownloader:
progress_percent = (downloaded_size / total_size) * 100
elapsed_time = current_time - start_time
speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0
print(f"Download progress: {progress_percent:.2f}% ({downloaded_size}/{total_size}) - {speed:.2f} MB/s")
print(f"{progress_percent:.2f}% - {speed:.2f} MB/s")
else:
print(f"Downloaded {downloaded_size / (1024 * 1024):.2f} MB")
print(f"{downloaded_size / (1024 * 1024):.2f} MB")
last_update_time = current_time
@@ -276,9 +300,9 @@ class TrackDownloader:
self.progress_callback(downloaded_size, total_size)
if downloaded_size == 0:
raise Exception("No data received from server")
raise Exception("No data received")
print(f"Download completed: {file_path}")
print(f"Complete. File saved: {file_path}")
return file_path
except Exception as e:
@@ -289,30 +313,750 @@ class TrackDownloader:
pass
raise e
async def main():
use_fallback = False
downloader = TrackDownloader(use_fallback)
class SquidWTFDownloader:
def __init__(self, region="us", timeout=30):
if region not in ["eu", "us"]:
raise ValueError("Region must be either 'us' or 'eu'")
self.region = region
self.timeout = timeout
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.base_api_url = f"https://{region}.qobuz.squid.wtf/api"
self.download_chunk_size = 256 * 1024
self.progress_callback = ProgressCallback()
def set_progress_callback(self, callback):
self.progress_callback = callback
def sanitize_filename(self, filename):
if not filename:
return "Unknown Track"
sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename))
return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track"
def get_track_info(self, isrc):
print(f"Fetching: {isrc}")
search_url = f"{self.base_api_url}/get-music"
params = {'q': isrc, 'offset': 0, 'limit': 10}
try:
response = self.session.get(search_url, params=params, timeout=self.timeout)
response.raise_for_status()
data = response.json()
selected_track = None
if data and data.get("success"):
items = data.get("data", {}).get("tracks", {}).get("items", [])
priority = {24: 1, 16: 2}
for track in items:
if track.get("isrc") == isrc:
current_prio = priority.get(track.get("maximum_bit_depth"), 3)
if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3):
selected_track = track
if current_prio == 1:
break
if not selected_track:
raise Exception(f"Track not found: {isrc}")
title = selected_track.get('title', 'Unknown')
bit_depth = selected_track.get('maximum_bit_depth', 'Unknown')
print(f"Found: {title} ({bit_depth}b)")
return selected_track
except requests.exceptions.RequestException as e:
raise Exception(f"Request error: {e}")
except Exception as e:
raise Exception(f"Error: {e}")
def get_download_url(self, track_id):
print("Fetching URL...")
download_api_url = f"{self.base_api_url}/download-music"
params = {'track_id': track_id, 'quality': 27}
try:
response = self.session.get(download_api_url, params=params, timeout=self.timeout)
response.raise_for_status()
data = response.json()
if data and data.get("success") and data.get("data", {}).get("url"):
download_url = data["data"]["url"]
print("URL found")
return download_url
else:
error_msg = data.get('error', {}).get('message', 'Unknown API error')
raise Exception(f"API error: {error_msg}")
except requests.exceptions.RequestException as e:
raise Exception(f"Request error: {e}")
except Exception as e:
raise Exception(f"Error: {e}")
def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None):
if output_dir != ".":
try:
os.makedirs(output_dir, exist_ok=True)
except OSError as e:
raise Exception(f"Directory error: {e}")
track_info = self.get_track_info(isrc)
track_id = track_info.get("id")
if not track_id:
raise Exception("No track ID found")
artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name'))
track_title = self.sanitize_filename(track_info.get('title'))
output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac")
if os.path.exists(output_filename):
file_size = os.path.getsize(output_filename)
if file_size > 0:
print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)")
return output_filename
download_url = self.get_download_url(track_id)
temp_filename = output_filename + ".part"
print(f"Downloading...")
try:
with self.session.get(download_url, stream=True, timeout=900) as response, \
open(temp_filename, 'wb') as f:
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
start_time = time.time()
last_update_time = start_time
for chunk in response.iter_content(chunk_size=self.download_chunk_size):
if is_stopped_callback and is_stopped_callback():
f.close()
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception("Download stopped")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
f.close()
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception("Download stopped")
f.write(chunk)
downloaded_size += len(chunk)
current_time = time.time()
if current_time - last_update_time >= 1:
if total_size > 0:
progress_percent = (downloaded_size / total_size) * 100
elapsed_time = current_time - start_time
speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0
print(f"{progress_percent:.2f}% - {speed:.2f} MB/s")
else:
print(f"{downloaded_size / (1024 * 1024):.2f} MB")
last_update_time = current_time
if self.progress_callback:
self.progress_callback(downloaded_size, total_size)
os.rename(temp_filename, output_filename)
print("Download complete")
except requests.exceptions.RequestException as e:
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception(f"Download failed: {e}")
except Exception as e:
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception(f"File error: {e}")
print("Adding metadata...")
try:
self._embed_metadata(output_filename, track_info)
print("Metadata saved")
except Exception as e:
print(f"Tagging failed: {e}")
print(f"Done")
return output_filename
def _embed_metadata(self, filename, track_info):
try:
audio = FLAC(filename)
audio.delete()
audio.clear_pictures()
album_info = track_info.get('album', {})
artist = track_info.get('performer', {}).get('name')
if track_info.get('title'):
audio['TITLE'] = track_info['title']
if artist:
audio['ARTIST'] = artist
if album_info.get('title'):
audio['ALBUM'] = album_info['title']
if album_info.get('artist', {}).get('name', artist):
audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist)
if track_info.get('track_number'):
audio['TRACKNUMBER'] = str(track_info['track_number'])
if track_info.get('release_date_original'):
audio['DATE'] = track_info['release_date_original']
try:
audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year)
except ValueError:
pass
if album_info.get('genre', {}).get('name'):
audio['GENRE'] = album_info['genre']['name']
if track_info.get('copyright'):
audio['COPYRIGHT'] = track_info['copyright']
if track_info.get('isrc'):
audio['ISRC'] = track_info['isrc']
if album_info.get('label', {}).get('name'):
audio['ORGANIZATION'] = album_info['label']['name']
img_info = album_info.get('image', {})
cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail')
if cover_url:
try:
img_response = self.session.get(cover_url, timeout=30)
img_response.raise_for_status()
mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower()
if mime_type in ['image/jpeg', 'image/png']:
picture = Picture()
picture.data = img_response.content
picture.type = PictureType.COVER_FRONT
picture.mime = mime_type
audio.add_picture(picture)
print("Cover added")
except Exception as e:
print(f"Cover error: {str(e)}")
audio.save()
except Exception as e:
raise Exception(f"Metadata error: {e}")
class TidalDownloader:
def __init__(self, client_id="zU4XHVVkc2tDPo4t", client_secret="VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4=", timeout=30):
self.client_id = client_id
self.client_secret = client_secret
self.timeout = timeout
self.progress_callback = ProgressCallback()
self.temp_dir = tempfile.gettempdir()
self.token_path = os.path.join(self.temp_dir, "tidal_token.json")
self.access_token = None
if os.path.exists(self.token_path):
try:
with open(self.token_path, "r") as tok:
token = json.loads(tok.read())
self.access_token = token.get("access_token")
except:
pass
output_dir = "."
def set_progress_callback(self, callback):
self.progress_callback = callback
async def get_access_token(self):
refresh_url = "https://auth.tidal.com/v1/oauth2/token"
payload = {
"client_id": self.client_id,
"grant_type": "client_credentials",
}
async with httpx.AsyncClient(http2=True) as client:
try:
response = await client.post(
url=refresh_url,
data=payload,
auth=(self.client_id, self.client_secret),
)
if response.status_code == 200:
token_data = response.json()
new_token = token_data.get("access_token")
try:
with open(self.token_path, "w") as f:
json.dump({
"access_token": new_token
}, f)
except:
pass
self.access_token = new_token
return new_token
return None
except:
return None
async def search_tracks(self, query):
try:
tidal_token = self.access_token or await self.get_access_token()
if not tidal_token:
return {"error": "Failed to get access token"}
search_url = f"https://api.tidal.com/v1/search/tracks?query={query}&limit=25&offset=0&countryCode=US"
header = {"authorization": f"Bearer {tidal_token}"}
async with httpx.AsyncClient(http2=True) as client:
search_data = await client.get(url=search_url, headers=header)
response_data = search_data.json()
filtered_items = [{
"id": item.get("id"),
"title": item.get("title"),
"url": item.get("url"),
"isrc": item.get("isrc"),
"audioQuality": item.get("audioQuality"),
"mediaMetadata": item.get("mediaMetadata"),
"album": item.get("album", {}),
"artists": item.get("artists", []),
"artist": item.get("artist", {}),
"trackNumber": item.get("trackNumber"),
"volumeNumber": item.get("volumeNumber"),
"duration": item.get("duration"),
"copyright": item.get("copyright"),
"explicit": item.get("explicit")
} for item in response_data.get("items", [])]
return {
"limit": response_data.get("limit"),
"offset": response_data.get("offset"),
"totalNumberOfItems": response_data.get("totalNumberOfItems"),
"items": filtered_items
}
except Exception as e:
return {"error": f"Error: {str(e)}"}
async def filter_by_isrc(self, query, isrc=None):
try:
result = await self.search_tracks(query)
if "error" in result:
return result
if isrc:
isrc_items = [item for item in result["items"] if item.get("isrc") == isrc]
if len(isrc_items) > 1:
hires_items = []
for item in isrc_items:
media_metadata = item.get("mediaMetadata", {})
tags = media_metadata.get("tags", []) if media_metadata else []
if "HIRES_LOSSLESS" in tags:
hires_items.append(item)
if hires_items:
result["items"] = hires_items
else:
result["items"] = isrc_items
else:
result["items"] = isrc_items
result["totalNumberOfItems"] = len(result["items"])
return result
except Exception as e:
return {"error": f"Error: {str(e)}"}
async def get_track_download_info(self, track_id, quality="LOSSLESS"):
try:
download_api_url = f"https://tidal.401658.xyz/track/?id={track_id}&quality={quality}"
async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client:
response = await client.get(download_api_url)
if response.status_code == 200:
data = response.json()
for item in data:
if "OriginalTrackUrl" in item:
return {
"success": True,
"download_url": item["OriginalTrackUrl"],
"track_info": data[0] if data else {}
}
return {"success": False, "error": "OriginalTrackUrl not found in response"}
else:
return {"success": False, "error": f"API returned status code: {response.status_code}"}
except Exception as e:
return {"success": False, "error": f"Error getting download info: {str(e)}"}
async def download_album_art(self, album_id, size="1280x1280"):
try:
art_url = f"https://resources.tidal.com/images/{album_id.replace('-', '/')}/{size}.jpg"
async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client:
response = await client.get(art_url)
if response.status_code == 200:
return response.content
else:
print(f"Failed to download album art: HTTP {response.status_code}")
return None
except Exception as e:
print(f"Error downloading album art: {str(e)}")
return None
async def embed_metadata(self, filepath, track_info, search_info=None):
try:
audio = FLAC(filepath)
audio.clear()
if track_info.get("title"):
audio["TITLE"] = track_info["title"]
artists_list = []
if search_info and search_info.get("artists"):
for artist in search_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif search_info and search_info.get("artist") and search_info["artist"].get("name"):
artists_list.append(search_info["artist"]["name"])
elif track_info.get("artists"):
for artist in track_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif track_info.get("artist") and track_info["artist"].get("name"):
artists_list.append(track_info["artist"]["name"])
if artists_list:
audio["ARTIST"] = artists_list[0]
if len(artists_list) > 1:
audio["ALBUMARTIST"] = "; ".join(artists_list)
else:
audio["ALBUMARTIST"] = artists_list[0]
album_info = search_info.get("album", {}) if search_info else track_info.get("album", {})
if album_info.get("title"):
audio["ALBUM"] = album_info["title"]
if search_info and search_info.get("trackNumber"):
audio["TRACKNUMBER"] = str(search_info["trackNumber"])
elif track_info.get("trackNumber"):
audio["TRACKNUMBER"] = str(track_info["trackNumber"])
if search_info and search_info.get("volumeNumber"):
audio["DISCNUMBER"] = str(search_info["volumeNumber"])
elif track_info.get("volumeNumber"):
audio["DISCNUMBER"] = str(track_info["volumeNumber"])
isrc = search_info.get("isrc") if search_info else track_info.get("isrc")
if isrc:
audio["ISRC"] = isrc
copyright_info = search_info.get("copyright") if search_info else track_info.get("copyright")
if copyright_info:
audio["COPYRIGHT"] = copyright_info
release_date = None
if search_info and search_info.get("streamStartDate"):
release_date = search_info["streamStartDate"]
elif track_info.get("streamStartDate"):
release_date = track_info["streamStartDate"]
if release_date:
if "T" in release_date:
date_part = release_date.split("T")[0]
audio["DATE"] = date_part
else:
audio["DATE"] = release_date
if track_info.get("genre"):
audio["GENRE"] = track_info["genre"]
if album_info.get("cover"):
album_art = await self.download_album_art(album_info["cover"])
if album_art:
picture = Picture()
picture.data = album_art
picture.type = PictureType.COVER_FRONT
picture.mime = "image/jpeg"
picture.desc = "Cover"
audio.add_picture(picture)
print("Album art embedded successfully")
audio.save()
print(f"Metadata embedded successfully for: {track_info.get('title', 'Unknown')}")
return True
except Exception as e:
print(f"Error embedding metadata: {str(e)}")
return False
async def download_file(self, url, filename, max_retries=3, is_paused_callback=None, is_stopped_callback=None):
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(http2=True, timeout=60.0) as client:
async with client.stream('GET', url) as response:
if response.status_code == 200:
total_size_in_bytes = int(response.headers.get('content-length', 0))
bytes_downloaded = 0
async with aiofiles.open(filename, 'wb') as f:
async for chunk in response.aiter_bytes(chunk_size=8192):
if is_stopped_callback and is_stopped_callback():
print("\\nDownload stopped.")
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as e:
print(f"Error removing partial file: {e}")
return {"success": False, "error": "Download stopped by user"}
while is_paused_callback and is_paused_callback():
print("\\nDownload paused. Waiting...")
await asyncio.sleep(1)
await f.write(chunk)
bytes_downloaded += len(chunk)
if total_size_in_bytes > 0:
if self.progress_callback:
self.progress_callback(bytes_downloaded, total_size_in_bytes)
if total_size_in_bytes > 0 and bytes_downloaded == total_size_in_bytes:
print()
print(f"Successfully downloaded: {filename} ({bytes_downloaded} bytes)")
return {"success": True, "size": bytes_downloaded}
else:
print(f"\\nFailed to download {filename}. HTTP Status: {response.status_code}")
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as e:
print(f"Error removing partial file after server error: {e}")
return {"success": False, "error": f"HTTP {response.status_code}"}
except Exception as e:
print()
if os.path.exists(filename):
try:
os.remove(filename)
except OSError as ose:
print(f"Error removing partial file after exception: {ose}")
if attempt < max_retries - 1:
print(f"Download attempt {attempt + 1} failed, retrying...")
await asyncio.sleep(2)
else:
return {"success": False, "error": f"Download failed after {max_retries} attempts: {str(e)}"}
async def download_track(self, track_ids, search_results, output_dir=".", quality="LOSSLESS", embed_meta=True, is_paused_callback=None, is_stopped_callback=None):
if not isinstance(track_ids, list):
track_ids = [track_ids]
if output_dir != ".":
os.makedirs(output_dir, exist_ok=True)
search_map = {}
if search_results and search_results.get("items"):
for item in search_results["items"]:
search_map[item["id"]] = item
all_skipped = True
skipped_files = []
for i, track_id in enumerate(track_ids):
download_info = await self.get_track_download_info(track_id, quality)
if not download_info["success"]:
print(f"Failed to get download info for track {track_id}: {download_info['error']}")
continue
download_url = download_info["download_url"]
track_info = download_info["track_info"]
search_info = search_map.get(track_id)
title = track_info.get("title", f"track_{track_id}")
artists_list = []
if search_info and search_info.get("artists"):
for artist in search_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif search_info and search_info.get("artist") and search_info["artist"].get("name"):
artists_list.append(search_info["artist"]["name"])
elif track_info.get("artists"):
for artist in track_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif track_info.get("artist") and track_info["artist"].get("name"):
artists_list.append(track_info["artist"]["name"])
artist_names = ", ".join(artists_list) if artists_list else ""
safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).rstrip()
safe_artists = "".join(c for c in artist_names if c.isalnum() or c in (' ', '-', '_', ',', '.')).rstrip()
if safe_artists:
filename = f"{safe_title} - {safe_artists}.flac"
else:
filename = f"{safe_title}.flac"
filepath = os.path.join(output_dir, filename)
if os.path.exists(filepath):
print(f"File {filename} already exists. Skipping download.")
skipped_files.append(filename)
if len(track_ids) == 1:
return {
"success": True,
"status": "skipped_exists",
"track_id": track_id,
"filename": filename,
"filepath": filepath,
"message": f"File {filename} already exists."
}
continue
all_skipped = False
print(f"Downloading: {filename}")
download_result = await self.download_file(download_url, filepath, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
if download_result["success"]:
print(f"Successfully downloaded track {track_id}")
if embed_meta:
print("Embedding metadata...")
await self.embed_metadata(filepath, track_info, search_info)
return {
"success": True,
"track_id": track_id,
"filename": filename,
"filepath": filepath,
"size": download_result["size"],
"track_info": track_info,
"metadata_embedded": embed_meta
}
else:
print(f"Failed to download track {track_id}: {download_result['error']}")
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
pass
if download_result.get("error") == "Download stopped by user":
return {"success": False, "error": "Download stopped by user", "track_id": track_id}
if all_skipped and skipped_files:
return {
"success": True,
"status": "all_skipped",
"message": f"All files already exist: {', '.join(skipped_files)}"
}
return {"success": False, "error": "All track IDs failed to download or were stopped"}
async def search_and_download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None):
print(f"Searching for: {query}")
if isrc:
print(f"ISRC: {isrc}")
search_result = await self.filter_by_isrc(query, isrc)
if "error" in search_result:
print(f"Search error: {search_result['error']}")
return {"success": False, "error": search_result['error']}
if not search_result["items"]:
print("No tracks found")
return {"success": False, "error": "No tracks found"}
track_ids = [item["id"] for item in search_result["items"]]
print(f"Found {len(track_ids)} track(s): {track_ids}")
download_result = await self.download_track(track_ids, search_result, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
return download_result
async def download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", embed_metadata=True, is_paused_callback=None, is_stopped_callback=None):
result = await self.search_and_download(query, isrc, output_dir, quality, embed_metadata, is_paused_callback=is_paused_callback, is_stopped_callback=is_stopped_callback)
if result["success"]:
if result.get("status") == "all_skipped":
print(f"Skipped: {result['message']}")
if "filepath" in result:
return result["filepath"]
return output_dir
elif result.get("status") == "skipped_exists":
print(f"Skipped: {result['message']}")
return result["filepath"]
else:
print("Download completed!")
return result["filepath"]
else:
print(f"Download failed: {result['error']}")
if result.get("error") == "Download stopped by user":
raise Exception("Download stopped by user")
raise Exception(result["error"])
async def main():
print("=== LucidaDownloader ===")
lucida = LucidaDownloader(domain="to")
track_id = "2plbrEY59IikOBgBGLjaoe"
service = "tidal"
output_dir = "."
try:
print(f"Getting track: {track_id} from {service}")
metadata = await lucida.get_track_info(track_id, service)
print("Starting download")
downloaded_file = await lucida.download(metadata, output_dir)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
print("\n\n=== SquidWTFDownloader ===")
squid = SquidWTFDownloader(region="us")
def progress_update(current, total):
if total > 0:
percent = (current / total) * 100
print(f"\rDownload progress: {percent:.2f}% ({current}/{total})", end="")
downloader.set_progress_callback(progress_update)
isrc = "USAT22409172"
output_dir = "."
try:
print(f"Getting track info for ID: {track_id} from {service}")
metadata = await downloader.get_track_info(track_id, service)
print(f"Track info received, starting download process")
downloaded_file = downloader.download(metadata, output_dir)
print(f"\nFile downloaded successfully: {downloaded_file}")
downloaded_file = squid.download(isrc, output_dir)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"An error occurred: {str(e)}")
print(f"Error: {str(e)}")
print("\n\n=== TidalDownloader ===")
tidal = TidalDownloader()
query = "APT."
isrc = "USAT22409172"
output_dir = "."
try:
downloaded_file = await tidal.download(query, isrc, output_dir, quality="LOSSLESS", embed_metadata=True)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
try:
import sys
if sys.platform == "win32":
import os
os.system("chcp 65001 > nul")
try:
sys.stdout.reconfigure(encoding='utf-8')
except:
pass
except:
pass
asyncio.run(main())
+1 -1
View File
@@ -1,3 +1,3 @@
{
"version": "2.6"
"version": "2.8"
}