This commit is contained in:
afkarxyz
2025-05-13 12:06:55 +07:00
parent c4a9042adc
commit c5240596cb
5 changed files with 101 additions and 648 deletions
+7 -5
View File
@@ -3,15 +3,15 @@
![SpotiFLAC](https://github.com/user-attachments/assets/b4c4f403-edbd-4a71-b74b-c7d433d47d06)
<div align="center">
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music, and Deezer (via Lucida), as well as Qobuz (via SquidWTF).
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music and Deezer with the help of Lucida.
</div>
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v2.9/SpotiFLAC.exe)
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v2.6/SpotiFLAC.exe)
#
> [!Note]
**Download speed** from Lucida is unpredictable—sometimes fast, sometimes slow. Join their [Discord](https://discord.com/invite/dXEGRWqEbS) for updates.
> [!WARNING]
Sometimes, the **download speed** from Lucida can be fast or slow; it varies unpredictably.
## Screenshots
@@ -21,9 +21,11 @@
![image](https://github.com/user-attachments/assets/be9a79b5-260e-4948-9f72-10c735210ab7)
![image](https://github.com/user-attachments/assets/c4403934-9003-447e-a27b-fc74cab23454)
![image](https://github.com/user-attachments/assets/1feec621-f8bf-4b2a-ae73-afcb1fb1deba)
![image](https://github.com/user-attachments/assets/e535592a-83da-4b7e-8c4b-dac10884ed4e)
![image](https://github.com/user-attachments/assets/66cc3398-547d-4568-8d49-a05ad4997370)
> When **Fallback** is enabled, it will use the backup server `Lucida.su`
+51 -302
View File
@@ -5,22 +5,19 @@ from datetime import datetime
import requests
import re
from packaging import version
import json
import asyncio
from PyQt6.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit,
QLabel, QFileDialog, QListWidget, QTextEdit, QTabWidget, QButtonGroup, QRadioButton,
QAbstractItemView, QSpacerItem, QSizePolicy, QProgressBar, QCheckBox, QDialog,
QDialogButtonBox, QComboBox, QStyledItemDelegate
QDialogButtonBox, QComboBox, QStyledItemDelegate, QStyle
)
from PyQt6.QtCore import Qt, QThread, pyqtSignal, QUrl, QTimer, QTime, QSettings, QSize
from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush
from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush, QPalette
from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException, get_raw_spotify_data
from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException
from getTracks import TrackDownloader
import SquidWTF
@dataclass
class Track:
@@ -58,8 +55,7 @@ class DownloadWorker(QThread):
def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False,
album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True,
use_album_subfolders=False, use_fallback=False, service="amazon", timeout=30,
qobuz_region="us"):
use_album_subfolders=False, use_fallback=False, service="amazon", timeout=30):
super().__init__()
self.tracks = tracks
self.outpath = outpath
@@ -73,7 +69,6 @@ class DownloadWorker(QThread):
self.use_fallback = use_fallback
self.service = service
self.timeout = timeout
self.qobuz_region = qobuz_region
self.is_paused = False
self.is_stopped = False
self.failed_tracks = []
@@ -89,15 +84,17 @@ class DownloadWorker(QThread):
try:
downloader = TrackDownloader(self.use_fallback, self.timeout)
def progress_update_lucida(current, total, current_overall_progress):
def progress_update(current, total):
if total > 0:
percent = (current / total) * 100
current_mb = current / (1024 * 1024)
total_mb = total / (1024 * 1024)
self.progress.emit(f"Download progress: {percent:.2f}% ({current_mb:.2f}MB/{total_mb:.2f}MB)",
current_overall_progress)
int(percent))
else:
self.progress.emit(f"Processing metadata...", current_overall_progress)
self.progress.emit(f"Processing metadata...", 0)
downloader.set_progress_callback(progress_update)
total_tracks = len(self.tracks)
@@ -109,15 +106,12 @@ class DownloadWorker(QThread):
if self.is_stopped:
return
current_overall_progress = int((i / total_tracks) * 100)
next_overall_progress = int(((i + 1) / total_tracks) * 100)
self.progress.emit(f"Starting download ({i+1}/{total_tracks}): {track.title} - {track.artists}",
current_overall_progress)
int((i) / total_tracks * 100))
try:
track_id = track.id
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", current_overall_progress)
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0)
if self.is_playlist and self.use_album_subfolders:
album_folder = re.sub(r'[<>:"/\\|?*]', '_', track.album)
@@ -126,106 +120,14 @@ class DownloadWorker(QThread):
else:
track_outpath = self.outpath
if self.service == "qobuz":
self.progress.emit(f"Getting track metadata for: {track.title} - {track.artists}", current_overall_progress)
isrc = None
try:
track_url = track.external_urls
self.progress.emit(f"Fetching Spotify metadata for ISRC...", current_overall_progress)
raw_data = get_raw_spotify_data(track_url)
if raw_data and "external_ids" in raw_data and "isrc" in raw_data["external_ids"]:
isrc = raw_data["external_ids"]["isrc"]
self.progress.emit(f"Found ISRC from Spotify: {isrc}", current_overall_progress)
except Exception as e:
self.progress.emit(f"Could not get ISRC from Spotify raw data: {str(e)}", current_overall_progress)
if not isrc:
self.progress.emit(f"No ISRC found, searching by title and artist", current_overall_progress)
search_query = f"{track.title} {track.artists}"
try:
self.progress.emit(f"Searching Qobuz for: {search_query}", current_overall_progress)
qobuz_track_info = SquidWTF.search_track(track.title, track.artists, strict_match=True, region=self.qobuz_region)
if qobuz_track_info:
qobuz_track_id = qobuz_track_info["id"]
self.progress.emit(f"Found track on Qobuz by title search: {qobuz_track_info['title']} - {qobuz_track_info['performer']['name']}", current_overall_progress)
found_artist = qobuz_track_info['performer']['name'].lower()
expected_artist = track.artists.lower()
if expected_artist not in found_artist and found_artist not in expected_artist:
self.progress.emit(f"Warning: Artist mismatch! Expected: {track.artists}, Found: {qobuz_track_info['performer']['name']}", current_overall_progress)
raise Exception(f"Artist mismatch: Expected '{track.artists}', found '{qobuz_track_info['performer']['name']}'")
else:
raise Exception(f"Could not find track on Qobuz: {track.title} - {track.artists}")
except Exception as e:
self.progress.emit(f"Search by title failed: {str(e)}", current_overall_progress)
raise Exception(f"Could not find track on Qobuz: {track.title} - {track.artists}")
else:
self.progress.emit(f"Searching Qobuz with ISRC: {isrc}", current_overall_progress)
qobuz_track_info = SquidWTF.get_track_info(isrc, region=self.qobuz_region)
qobuz_track_id = qobuz_track_info["id"]
self.progress.emit(f"Found track on Qobuz: {qobuz_track_info['title']} - {qobuz_track_info['performer']['name']}", current_overall_progress)
found_artist = qobuz_track_info['performer']['name'].lower()
expected_artist = track.artists.lower()
if expected_artist not in found_artist and found_artist not in expected_artist:
self.progress.emit(f"Warning: Artist mismatch! Expected: {track.artists}, Found: {qobuz_track_info['performer']['name']}", current_overall_progress)
download_url = SquidWTF.get_download_url(qobuz_track_id, region=self.qobuz_region)
os.makedirs(track_outpath, exist_ok=True)
temp_filename = os.path.join(track_outpath, f"temp_{qobuz_track_id}.flac")
self.progress.emit(f"Downloading from Qobuz...", current_overall_progress)
def progress_callback_qobuz(current, total):
if total > 0:
percent = (current / total) * 100
current_mb = current / (1024 * 1024)
total_mb = total / (1024 * 1024)
self.progress.emit(f"Download progress: {percent:.2f}% ({current_mb:.2f}MB/{total_mb:.2f}MB)",
current_overall_progress)
try:
SquidWTF.download_file(download_url, temp_filename, progress_callback_qobuz)
if not os.path.exists(temp_filename) or os.path.getsize(temp_filename) == 0:
raise Exception(f"Downloaded file is empty or does not exist: {temp_filename}")
self.progress.emit(f"Embedding metadata...", current_overall_progress)
SquidWTF.embed_metadata(temp_filename, qobuz_track_info)
if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers:
new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}"
else:
new_filename = self.get_formatted_filename(track)
new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename)
new_filepath = os.path.join(track_outpath, new_filename)
if os.path.exists(new_filepath):
os.remove(new_filepath)
os.rename(temp_filename, new_filepath)
downloaded_file = new_filepath
self.progress.emit(f"File renamed to: {new_filename}", current_overall_progress)
except Exception as e:
self.progress.emit(f"Error during download or processing: {str(e)}", current_overall_progress)
if os.path.exists(temp_filename):
try:
os.remove(temp_filename)
self.progress.emit(f"Removed incomplete download file", current_overall_progress)
except:
pass
raise Exception(f"Failed to download or process file: {str(e)}")
else:
import asyncio
metadata = asyncio.run(downloader.get_track_info(track_id, self.service))
self.progress.emit(f"Track info received, starting download process", current_overall_progress)
self.progress.emit(f"Track info received, starting download process", 0)
is_paused_callback = lambda: self.is_paused
is_stopped_callback = lambda: self.is_stopped
downloader.set_progress_callback(
lambda current, total: progress_update_lucida(current, total, current_overall_progress)
)
downloaded_file = downloader.download(
metadata,
track_outpath,
@@ -245,14 +147,14 @@ class DownloadWorker(QThread):
if os.path.exists(new_filepath):
os.remove(new_filepath)
os.rename(downloaded_file, new_filepath)
self.progress.emit(f"File renamed to: {new_filename}", current_overall_progress)
self.progress.emit(f"File renamed to: {new_filename}", 0)
self.progress.emit(f"Successfully downloaded: {track.title} - {track.artists}",
next_overall_progress)
int((i + 1) / total_tracks * 100))
except Exception as e:
self.failed_tracks.append((track.title, track.artists, str(e)))
self.progress.emit(f"Failed to download: {track.title} - {track.artists}\nError: {str(e)}",
next_overall_progress)
int((i + 1) / total_tracks * 100))
continue
if not self.is_stopped:
@@ -316,51 +218,23 @@ class ServiceStatusChecker(QThread):
error = pyqtSignal(str)
def run(self):
services_status = {
'amazon': False,
'tidal': False,
'deezer': False,
'qobuz': False
}
try:
response = requests.get("https://lucida.to/api/stats", timeout=5)
if response.status_code == 200:
try:
data = response.json()
services_status = {}
current_services = data.get('all', {}).get('downloads', {}).get('current', {}).get('services', {})
services_status['amazon'] = current_services.get('amazon', 0) > 0
services_status['tidal'] = current_services.get('tidal', 0) > 0
services_status['deezer'] = current_services.get('deezer', 0) > 0
except json.JSONDecodeError:
pass
except Exception:
pass
except requests.exceptions.RequestException:
pass
except Exception:
pass
eu_online = False
us_online = False
try:
eu_response = requests.get("https://eu.qobuz.squid.wtf", timeout=5)
eu_online = eu_response.status_code in [200, 304]
except Exception:
pass
try:
us_response = requests.get("https://us.qobuz.squid.wtf", timeout=5)
us_online = us_response.status_code in [200, 304]
except Exception:
pass
services_status['qobuz'] = eu_online or us_online
self.status_updated.emit(services_status)
else:
self.error.emit(f"Server returned status code: {response.status_code}")
except Exception as e:
self.error.emit(f"Error checking service status: {str(e)}")
class StatusIndicatorDelegate(QStyledItemDelegate):
def paint(self, painter, option, index):
@@ -369,8 +243,10 @@ class StatusIndicatorDelegate(QStyledItemDelegate):
super().paint(painter, option, index)
if item_data and item_data.get('online') is None:
return
if option.state & QStyle.StateFlag.State_Selected:
text_color = option.palette.color(QPalette.ColorGroup.Active, QPalette.ColorRole.HighlightedText)
else:
text_color = option.palette.color(QPalette.ColorGroup.Active, QPalette.ColorRole.Text)
indicator_color = Qt.GlobalColor.green if is_online else Qt.GlobalColor.red
@@ -409,8 +285,7 @@ class ServiceComboBox(QComboBox):
self.services = [
{'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False},
{'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False},
{'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False},
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False}
{'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False}
]
for service in self.services:
@@ -453,90 +328,10 @@ class ServiceComboBox(QComboBox):
def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
return super().currentData(role)
class QobuzRegionComboBox(QComboBox):
def __init__(self, parent=None):
super().__init__(parent)
self.setIconSize(QSize(16, 16))
self.regions_status = {}
self.setItemDelegate(StatusIndicatorDelegate())
self.setup_items()
self.status_checker = QThread()
self.status_checker.run = self.check_status
self.status_checker.finished.connect(self.update_region_status)
self.status_checker.start()
self.status_timer = QTimer(self)
self.status_timer.timeout.connect(self.refresh_status)
self.status_timer.start(5000)
def setup_items(self):
current_dir = os.path.dirname(os.path.abspath(__file__))
self.regions = [
{'id': 'eu', 'name': 'Europe', 'icon': 'eu.svg', 'online': False, 'url': 'https://eu.qobuz.squid.wtf'},
{'id': 'us', 'name': 'North America', 'icon': 'us.svg', 'online': False, 'url': 'https://us.qobuz.squid.wtf'}
]
for region in self.regions:
icon_path = os.path.join(current_dir, region['icon'])
if not os.path.exists(icon_path):
self.create_placeholder_icon(icon_path)
icon = QIcon(icon_path)
self.addItem(icon, region['name'])
item_index = self.count() - 1
self.setItemData(item_index, region['id'], Qt.ItemDataRole.UserRole + 1)
self.setItemData(item_index, region, Qt.ItemDataRole.UserRole)
def create_placeholder_icon(self, path):
pixmap = QPixmap(16, 16)
pixmap.fill(Qt.GlobalColor.transparent)
pixmap.save(path)
def check_status(self):
regions_status = {}
for region in self.regions:
try:
response = requests.get(region['url'], timeout=5)
regions_status[region['id']] = response.status_code in [200, 304]
except requests.exceptions.RequestException:
regions_status[region['id']] = False
except Exception:
regions_status[region['id']] = False
self.regions_status = regions_status
def update_region_status(self):
for i in range(self.count()):
region_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if region_id in self.regions_status:
region_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(region_data, dict):
region_data['online'] = self.regions_status[region_id]
self.setItemData(i, region_data, Qt.ItemDataRole.UserRole)
self.update()
def refresh_status(self):
self.status_checker = QThread()
self.status_checker.run = self.check_status
self.status_checker.finished.connect(self.update_region_status)
self.status_checker.start()
def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
return super().currentData(role)
class SpotiFLACGUI(QWidget):
def __init__(self):
super().__init__()
self.current_version = "2.9"
self.current_version = "2.6"
self.tracks = []
self.reset_state()
@@ -549,7 +344,6 @@ class SpotiFLACGUI(QWidget):
self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool)
self.use_fallback = self.settings.value('use_fallback', False, type=bool)
self.service = self.settings.value('service', 'amazon')
self.qobuz_region = self.settings.value('qobuz_region', 'us')
self.timeout_value = self.settings.value('timeout_value', 30, type=int)
self.check_for_updates = self.settings.value('check_for_updates', True, type=bool)
@@ -583,8 +377,8 @@ class SpotiFLACGUI(QWidget):
if result == QDialog.DialogCode.Accepted:
QDesktopServices.openUrl(QUrl("https://github.com/afkarxyz/SpotiFLAC/releases"))
except Exception:
pass
except Exception as e:
print(f"Error checking for updates: {e}")
@staticmethod
def format_duration(ms):
@@ -625,7 +419,6 @@ class SpotiFLACGUI(QWidget):
self.setup_tabs()
self.setLayout(self.main_layout)
QTimer.singleShot(0, self.update_service_ui_visibility)
def setup_spotify_section(self):
spotify_layout = QHBoxLayout()
@@ -870,59 +663,40 @@ class SpotiFLACGUI(QWidget):
auth_layout = QVBoxLayout(auth_group)
auth_layout.setSpacing(5)
auth_label = QLabel('Service Setting')
auth_label = QLabel('Lucida Settings')
auth_label.setStyleSheet("font-weight: bold;")
auth_layout.addWidget(auth_label)
source_fallback_layout = QHBoxLayout()
service_fallback_layout = QHBoxLayout()
service_label = QLabel('Source:')
service_label = QLabel('Service:')
self.service_dropdown = ServiceComboBox()
self.service_dropdown.currentIndexChanged.connect(self.save_service_setting)
saved_service = self.service
for i in range(self.service_dropdown.count()):
if self.service_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == saved_service:
self.service_dropdown.setCurrentIndex(i)
break
service_fallback_layout.addWidget(service_label)
service_fallback_layout.addWidget(self.service_dropdown)
source_fallback_layout.addWidget(service_label)
source_fallback_layout.addWidget(self.service_dropdown)
self.qobuz_region_dropdown = QobuzRegionComboBox()
self.qobuz_region_dropdown.setFixedWidth(150)
self.qobuz_region_dropdown.currentIndexChanged.connect(self.save_qobuz_region_setting)
self.qobuz_region_dropdown.setVisible(False)
source_fallback_layout.addWidget(self.qobuz_region_dropdown)
saved_region = self.qobuz_region
for i in range(self.qobuz_region_dropdown.count()):
if self.qobuz_region_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == saved_region:
self.qobuz_region_dropdown.setCurrentIndex(i)
break
source_fallback_layout.addSpacing(20)
service_fallback_layout.addSpacing(20)
self.fallback_checkbox = QCheckBox('Fallback')
self.fallback_checkbox.setCursor(Qt.CursorShape.PointingHandCursor)
self.fallback_checkbox.setChecked(self.use_fallback)
self.fallback_checkbox.toggled.connect(self.save_fallback_setting)
source_fallback_layout.addWidget(self.fallback_checkbox)
service_fallback_layout.addWidget(self.fallback_checkbox)
source_fallback_layout.addSpacing(20)
service_fallback_layout.addSpacing(20)
timeout_label = QLabel('Timeout:')
self.timeout_label = timeout_label
self.timeout_input = QLineEdit()
self.timeout_input.setText(str(self.timeout_value))
self.timeout_input.setFixedWidth(60)
self.timeout_input.textChanged.connect(self.save_timeout_setting)
source_fallback_layout.addWidget(timeout_label)
source_fallback_layout.addWidget(self.timeout_input)
service_fallback_layout.addWidget(timeout_label)
service_fallback_layout.addWidget(self.timeout_input)
source_fallback_layout.addStretch()
auth_layout.addLayout(source_fallback_layout)
service_fallback_layout.addStretch()
auth_layout.addLayout(service_fallback_layout)
settings_layout.addWidget(auth_group)
@@ -930,23 +704,6 @@ class SpotiFLACGUI(QWidget):
settings_tab.setLayout(settings_layout)
self.tab_widget.addTab(settings_tab, "Settings")
def update_service_ui_visibility(self):
if hasattr(self, 'fallback_checkbox') and hasattr(self, 'timeout_input') and hasattr(self, 'timeout_label'):
service = self.service_dropdown.currentData() if hasattr(self, 'service_dropdown') else self.service
if service == "qobuz":
self.fallback_checkbox.setVisible(False)
self.timeout_input.setVisible(False)
self.timeout_label.setVisible(False)
if hasattr(self, 'qobuz_region_dropdown'):
self.qobuz_region_dropdown.setVisible(True)
else:
self.fallback_checkbox.setVisible(True)
self.timeout_input.setVisible(True)
self.timeout_label.setVisible(True)
if hasattr(self, 'qobuz_region_dropdown'):
self.qobuz_region_dropdown.setVisible(False)
def setup_about_tab(self):
about_tab = QWidget()
about_layout = QVBoxLayout()
@@ -997,7 +754,7 @@ class SpotiFLACGUI(QWidget):
spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
about_layout.addItem(spacer)
footer_label = QLabel("v2.9 | May 2025")
footer_label = QLabel("v2.6 | May 2025")
footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;")
about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter)
@@ -1051,18 +808,6 @@ class SpotiFLACGUI(QWidget):
self.settings.sync()
self.log_output.append(f"Service setting saved: {self.service_dropdown.currentText()}")
if hasattr(self, 'qobuz_region_dropdown'):
self.qobuz_region_dropdown.setVisible(service == "qobuz")
self.update_service_ui_visibility()
def save_qobuz_region_setting(self):
region = self.qobuz_region_dropdown.currentData()
self.qobuz_region = region
self.settings.setValue('qobuz_region', region)
self.settings.sync()
self.log_output.append(f"Qobuz region setting saved: {self.qobuz_region_dropdown.currentText()}")
def save_settings(self):
self.settings.setValue('output_path', self.output_dir.text().strip())
self.settings.sync()
@@ -1338,7 +1083,6 @@ class SpotiFLACGUI(QWidget):
def start_download_worker(self, tracks_to_download, outpath):
service = self.service_dropdown.currentData()
qobuz_region_val = self.qobuz_region_dropdown.currentData() if service == "qobuz" else self.qobuz_region
self.worker = DownloadWorker(
tracks_to_download,
@@ -1352,8 +1096,7 @@ class SpotiFLACGUI(QWidget):
self.use_album_subfolders,
self.use_fallback,
service,
self.timeout_value,
qobuz_region=qobuz_region_val
self.timeout_value
)
self.worker.finished.connect(self.on_download_finished)
self.worker.progress.connect(self.update_progress)
@@ -1374,12 +1117,17 @@ class SpotiFLACGUI(QWidget):
def update_progress(self, message, percentage):
if "Download progress:" in message or "Processing metadata..." in message:
current_text = self.log_output.toPlainText()
if current_text:
lines = current_text.split('\n')
if lines and ("Download progress:" in lines[-1] or "Processing metadata..." in lines[-1]):
if "Download progress:" in lines[-1] or "Processing metadata..." in lines[-1]:
lines[-1] = message
new_text = '\n'.join(lines)
self.log_output.setPlainText(new_text)
self.log_output.moveCursor(QTextCursor.MoveOperation.End)
else:
self.log_output.append(message)
@@ -1388,6 +1136,7 @@ class SpotiFLACGUI(QWidget):
else:
self.log_output.append(message)
if percentage > 0:
self.progress_bar.setValue(percentage)
def stop_download(self):
-277
View File
@@ -1,277 +0,0 @@
import requests
from mutagen.flac import FLAC, Picture
from datetime import datetime
import sys
import os
def _safe_print(*args, **kwargs):
if sys.stdout:
try:
print(*args, **kwargs)
except UnicodeEncodeError:
encoding = getattr(sys.stdout, 'encoding', None) or 'ascii'
processed_args = []
for arg in args:
processed_args.append(str(arg).encode(encoding, 'replace').decode(encoding))
processed_kwargs = {}
for k, v in kwargs.items():
if isinstance(v, str):
processed_kwargs[k] = v.encode(encoding, 'replace').decode(encoding)
else:
processed_kwargs[k] = v
try:
print(*processed_args, **processed_kwargs)
except Exception:
pass
except Exception:
pass
def _safe_stdout_write(data_to_write):
if sys.stdout:
try:
sys.stdout.write(data_to_write)
except UnicodeEncodeError:
encoding = getattr(sys.stdout, 'encoding', None) or 'ascii'
safe_data = data_to_write.encode(encoding, 'replace').decode(encoding)
try:
sys.stdout.write(safe_data)
except Exception:
pass
except Exception:
pass
def _safe_flush():
if sys.stdout:
try:
sys.stdout.flush()
except Exception:
pass
def get_track_info(isrc, region="us"):
_safe_print(f"Search: {isrc}")
base_url = f"https://{region}.qobuz.squid.wtf"
url = f"{base_url}/api/get-music?q={isrc}&offset=0"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to get track info")
tracks = data["data"]["tracks"]["items"]
if not tracks:
_safe_print(f"Not Found: {isrc}")
raise Exception(f"No tracks found for ISRC: {isrc}")
track = None
for item in tracks:
if item["isrc"] == isrc:
track = item
break
if not track:
_safe_print(f"Not Found: {isrc}")
raise Exception(f"No track with matching ISRC: {isrc}")
_safe_print(f"Found: {track['title']} - {track['performer']['name']}")
return track
def search_track(title, artist, strict_match=False, region="us"):
_safe_print(f"Search by title/artist: {title} - {artist}")
search_query = f"{title} {artist}".replace("feat.", "").replace("ft.", "")
base_url = f"https://{region}.qobuz.squid.wtf"
url = f"{base_url}/api/get-music?q={search_query}&offset=0"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to search for track")
tracks = data["data"]["tracks"]["items"]
if not tracks:
_safe_print(f"Not Found: {title} - {artist}")
raise Exception(f"No tracks found for: {title} - {artist}")
best_match = None
title_lower = title.lower()
artist_lower = artist.lower()
for item in tracks:
item_title = item["title"].lower()
item_artist = item["performer"]["name"].lower()
if title_lower == item_title and (artist_lower in item_artist or item_artist in artist_lower):
best_match = item
_safe_print(f"Found exact title match with artist: {item['title']} - {item['performer']['name']}")
break
if not best_match and not strict_match:
for item in tracks:
item_title = item["title"].lower()
item_artist = item["performer"]["name"].lower()
if title_lower in item_title and (artist_lower in item_artist or item_artist in artist_lower):
best_match = item
_safe_print(f"Found partial match: {item['title']} - {item['performer']['name']}")
break
if strict_match and best_match:
item_artist = best_match["performer"]["name"].lower()
if artist_lower not in item_artist and item_artist not in artist_lower:
_safe_print(f"Artist mismatch in strict mode: Expected '{artist}', found '{best_match['performer']['name']}'")
best_match = None
if not best_match and not strict_match and tracks:
best_match = tracks[0]
_safe_print(f"No good match, using first result: {best_match['title']} - {best_match['performer']['name']}")
if not best_match:
_safe_print(f"Not Found: {title} - {artist}")
raise Exception(f"No suitable track found for: {title} - {artist}")
_safe_print(f"Found by title search: {best_match['title']} - {best_match['performer']['name']}")
return best_match
def get_download_url(track_id, region="us"):
base_url = f"https://{region}.qobuz.squid.wtf"
url = f"{base_url}/api/download-music?track_id={track_id}&quality=27"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to get download URL")
return data["data"]["url"]
def download_file(url, filename, progress_callback=None):
directory = os.path.dirname(filename)
if directory and not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
_safe_print(f"Created directory: {directory}")
except Exception as e:
raise Exception(f"Failed to create directory {directory}: {str(e)}")
try:
with open(filename, 'wb') as test_file:
pass
except Exception as e:
raise Exception(f"Cannot write to file {filename}: {str(e)}")
try:
response = requests.get(url, stream=True)
if response.status_code != 200:
raise Exception(f"Failed to download file: {response.status_code}")
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0 and progress_callback:
progress_callback(downloaded, total_size)
elif total_size > 0:
progress = (downloaded / total_size) * 100
_safe_stdout_write(f"\rProgress Download: {progress:.1f}%")
_safe_flush()
if total_size > 0 and not progress_callback:
_safe_stdout_write("\n")
if not os.path.exists(filename) or os.path.getsize(filename) == 0:
raise Exception(f"Download failed: File {filename} is empty or does not exist")
return filename
except Exception as e:
if os.path.exists(filename):
try:
os.remove(filename)
_safe_print(f"Removed incomplete file: {filename}")
except:
pass
raise Exception(f"Download failed: {str(e)}")
def embed_metadata(filename, track_info):
if not os.path.exists(filename):
raise Exception(f"Cannot embed metadata: File {filename} does not exist")
try:
_safe_print("Embedding Tags...")
audio = FLAC(filename)
audio.clear()
audio["TITLE"] = track_info["title"]
audio["ARTIST"] = track_info["performer"]["name"]
audio["ALBUM"] = track_info["album"]["title"]
audio["ALBUMARTIST"] = track_info["album"]["artist"]["name"]
audio["TRACKNUMBER"] = str(track_info["track_number"])
audio["LABEL"] = track_info["album"]["label"]["name"]
audio["GENRE"] = track_info["album"]["genre"]["name"]
release_date = datetime.fromtimestamp(track_info["album"]["released_at"]).strftime("%Y-%m-%d")
release_year = release_date.split("-")[0]
audio["DATE"] = release_date
audio["YEAR"] = release_year
audio["ISRC"] = track_info["isrc"]
audio["COPYRIGHT"] = track_info["copyright"]
if track_info["album"]["image"]["large"]:
try:
cover_data = download_cover_image(track_info["album"]["image"]["large"])
picture = Picture()
picture.type = 3
picture.mime = "image/jpeg"
picture.desc = ""
picture.data = cover_data
audio.add_picture(picture)
except Exception as e:
_safe_print(f"Warning: Could not add cover image: {str(e)}")
audio.save()
except Exception as e:
raise Exception(f"Failed to embed metadata: {str(e)}")
def download_cover_image(url):
response = requests.get(url)
if response.status_code != 200:
raise Exception(f"Failed to download cover image: {response.status_code}")
return response.content
def main():
try:
isrc = "USQX92500261"
region = "us"
track_info = get_track_info(isrc, region)
track_id = track_info["id"]
if track_info["isrc"] != isrc:
raise Exception(f"ISRC mismatch: {track_info['isrc']} != {isrc}")
download_url = get_download_url(track_id, region)
filename = f"{track_info['title']} - {track_info['performer']['name']}.flac"
filename = filename.replace('/', '_').replace('\\', '_')
download_file(download_url, filename)
embed_metadata(filename, track_info)
print("Downloaded Successfully!")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
+6 -27
View File
@@ -319,8 +319,6 @@ def format_track_data(track_data):
image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') if track_data.get('album', {}).get('images') else ''
isrc = track_data.get('external_ids', {}).get('isrc', '')
return {
"track": {
"artists": ", ".join(artists),
@@ -330,8 +328,7 @@ def format_track_data(track_data):
"images": image_url,
"release_date": track_data.get('album', {}).get('release_date', ''),
"track_number": track_data.get('track_number', 0),
"external_urls": track_data.get('external_urls', {}).get('spotify', ''),
"isrc": isrc
"external_urls": track_data.get('external_urls', {}).get('spotify', '')
}
}
@@ -348,20 +345,6 @@ def format_album_data(album_data):
for artist in track.get('artists', []):
track_artists.append(artist['name'])
track_id = track.get('id', '')
track_isrc = ''
if track_id and album_data.get('_token'):
try:
full_track_data = get_json_from_api(
track_base_url.format(track_id),
album_data.get('_token')
)
if full_track_data:
track_isrc = full_track_data.get('external_ids', {}).get('isrc', '')
except:
pass
track_list.append({
"artists": ", ".join(track_artists),
"name": track.get('name', ''),
@@ -370,8 +353,7 @@ def format_album_data(album_data):
"images": image_url,
"release_date": album_data.get('release_date', ''),
"track_number": track.get('track_number', 0),
"external_urls": track.get('external_urls', {}).get('spotify', ''),
"isrc": track_isrc
"external_urls": track.get('external_urls', {}).get('spotify', '')
})
album_info = {
@@ -407,8 +389,6 @@ def format_playlist_data(playlist_data):
if track.get('album', {}).get('images'):
track_image = track.get('album', {}).get('images', [{}])[0].get('url', '')
track_isrc = track.get('external_ids', {}).get('isrc', '')
track_list.append({
"artists": ", ".join(artists),
"name": track.get('name', ''),
@@ -417,8 +397,7 @@ def format_playlist_data(playlist_data):
"images": track_image,
"release_date": track.get('album', {}).get('release_date', ''),
"track_number": track.get('track_number', 0),
"external_urls": track.get('external_urls', {}).get('spotify', ''),
"isrc": track_isrc
"external_urls": track.get('external_urls', {}).get('spotify', '')
})
playlist_info = {
@@ -464,9 +443,9 @@ def get_filtered_data(spotify_url, batch=False, delay=1.0):
return {"error": "Failed to get raw data"}
if __name__ == '__main__':
playlist = "https://open.spotify.com/playlist/37i9dQZEVXbNG2KDcFcKOF"
album = "https://open.spotify.com/album/6J84szYCnMfzEcvIcfWMFL"
song = "https://open.spotify.com/track/7so0lgd0zP2Sbgs2d7a1SZ"
playlist = "https://open.spotify.com/playlist/5Qvz8wZIRYbEUUFoPueKI5"
album = "https://open.spotify.com/album/7kFyd5oyJdVX2pIi6P4iHE"
song = "https://open.spotify.com/track/4wJ5Qq0jBN4ajy7ouZIV1c"
filtered_playlist = get_filtered_data(playlist, batch=True, delay=0.1)
print(json.dumps(filtered_playlist, indent=2))
+1 -1
View File
@@ -1,3 +1,3 @@
{
"version": "2.9"
"version": "2.6"
}