Compare commits

..

13 Commits

Author SHA1 Message Date
afkarxyz 966536f127 v2.7 2025-05-10 20:13:01 +07:00
afkarxyz 21946321f5 Update README.md 2025-05-06 13:25:27 +07:00
afkarxyz 3e3cb0610d Update README.md 2025-05-06 12:45:25 +07:00
afkarxyz 160eba0987 Update README.md 2025-05-06 12:43:29 +07:00
afkarxyz 71a60ded47 Update README.md 2025-05-06 10:42:11 +07:00
afkarxyz e0a0514df9 v2.6 2025-05-06 10:41:11 +07:00
afkarxyz 1e7a48d263 v2.6 2025-05-06 10:38:01 +07:00
afkarxyz 0a83a0dd6e Update README.md 2025-05-06 10:35:17 +07:00
afkarxyz da429d9410 v2.5 2025-04-25 06:33:32 +07:00
afkarxyz 63211c726b v2.5 2025-04-25 06:30:14 +07:00
afkarxyz 055cb6991a Update v2.4 2025-04-08 13:10:12 +07:00
afkarxyz 222d681551 Update v2.4 2025-04-08 13:07:26 +07:00
afkarxyz 479c6ede2b Update v2.3 2025-03-20 05:47:20 +07:00
6 changed files with 757 additions and 158 deletions
+9 -9
View File
@@ -1,29 +1,29 @@
[![GitHub All Releases](https://img.shields.io/github/downloads/afkarxyz/SpotiFLAC/total?style=for-the-badge)](https://github.com/afkarxyz/SpotiFLAC/releases) [![GitHub All Releases](https://img.shields.io/github/downloads/afkarxyz/SpotiFLAC/total?style=for-the-badge)](https://github.com/afkarxyz/SpotiFLAC/releases)
![spotiflac](https://github.com/user-attachments/assets/a233a276-14a4-4f4c-b267-f182dd3912a0) ![SpotiFLAC](https://github.com/user-attachments/assets/b4c4f403-edbd-4a71-b74b-c7d433d47d06)
<div align="center"> <div align="center">
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music and Deezer with the help of Lucida. <b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music and Deezer with the help of Lucida.
</div> </div>
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v2.2/SpotiFLAC.exe) ### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v2.6/SpotiFLAC.exe)
# #
> [!WARNING] > [!Note]
Sometimes, the **download speed** from Lucida can be fast or slow; it varies unpredictably. **Download speed** from Lucida is unpredictable—sometimes fast, sometimes slow. Join their [Discord](https://discord.com/invite/dXEGRWqEbS) for updates.
## Screenshots ## Screenshots
![image](https://github.com/user-attachments/assets/611b8b52-6615-44fe-b6f8-905c07801c47) ![image](https://github.com/user-attachments/assets/70a5dceb-3374-4255-8f6a-4afb5ee534b0)
![image](https://github.com/user-attachments/assets/81e65977-11f0-4162-96f3-90730dd87e74) ![image](https://github.com/user-attachments/assets/9f0d6aa5-456b-4a90-b48a-7e0c22819ebd)
![image](https://github.com/user-attachments/assets/4dd37c0a-30e3-479a-9b3d-57fd360d87b3) ![image](https://github.com/user-attachments/assets/be9a79b5-260e-4948-9f72-10c735210ab7)
![image](https://github.com/user-attachments/assets/66f1ae70-e049-4b4c-ba81-1df5054d0e7d) ![image](https://github.com/user-attachments/assets/1feec621-f8bf-4b2a-ae73-afcb1fb1deba)
![image](https://github.com/user-attachments/assets/04954db9-e94a-4f9d-8eac-46d7ff7a4c33) ![image](https://github.com/user-attachments/assets/66cc3398-547d-4568-8d49-a05ad4997370)
> When **Fallback** is enabled, it will use the backup server `Lucida.su` > When **Fallback** is enabled, it will use the backup server `Lucida.su`
+297 -86
View File
@@ -5,19 +5,21 @@ from datetime import datetime
import requests import requests
import re import re
from packaging import version from packaging import version
import json
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit, QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit,
QLabel, QFileDialog, QListWidget, QTextEdit, QTabWidget, QButtonGroup, QRadioButton, QLabel, QFileDialog, QListWidget, QTextEdit, QTabWidget, QButtonGroup, QRadioButton,
QAbstractItemView, QSpacerItem, QSizePolicy, QProgressBar, QCheckBox, QDialog, QAbstractItemView, QSpacerItem, QSizePolicy, QProgressBar, QCheckBox, QDialog,
QDialogButtonBox, QComboBox, QStyledItemDelegate, QStyle QDialogButtonBox, QComboBox, QStyledItemDelegate
) )
from PyQt6.QtCore import Qt, QThread, pyqtSignal, QUrl, QTimer, QTime, QSettings, QSize from PyQt6.QtCore import Qt, QThread, pyqtSignal, QUrl, QTimer, QTime, QSettings, QSize
from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush, QPalette from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush
from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException
from getTracks import TrackDownloader from getTracks import TrackDownloader
import SquidWTF
@dataclass @dataclass
class Track: class Track:
@@ -29,13 +31,33 @@ class Track:
duration_ms: int duration_ms: int
id: str id: str
class MetadataFetchWorker(QThread):
finished = pyqtSignal(dict)
error = pyqtSignal(str)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
try:
metadata = get_filtered_data(self.url)
if "error" in metadata:
self.error.emit(metadata["error"])
else:
self.finished.emit(metadata)
except SpotifyInvalidUrlException as e:
self.error.emit(str(e))
except Exception as e:
self.error.emit(f'Failed to fetch metadata: {str(e)}')
class DownloadWorker(QThread): class DownloadWorker(QThread):
finished = pyqtSignal(bool, str, list) finished = pyqtSignal(bool, str, list)
progress = pyqtSignal(str, int) progress = pyqtSignal(str, int)
def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False, def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False,
album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True, album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True,
use_album_subfolders=False, use_fallback=False, service="amazon"): use_album_subfolders=False, use_fallback=False, service="amazon", timeout=30):
super().__init__() super().__init__()
self.tracks = tracks self.tracks = tracks
self.outpath = outpath self.outpath = outpath
@@ -48,6 +70,7 @@ class DownloadWorker(QThread):
self.use_album_subfolders = use_album_subfolders self.use_album_subfolders = use_album_subfolders
self.use_fallback = use_fallback self.use_fallback = use_fallback
self.service = service self.service = service
self.timeout = timeout
self.is_paused = False self.is_paused = False
self.is_stopped = False self.is_stopped = False
self.failed_tracks = [] self.failed_tracks = []
@@ -61,7 +84,7 @@ class DownloadWorker(QThread):
def run(self): def run(self):
try: try:
downloader = TrackDownloader(self.use_fallback) downloader = TrackDownloader(self.use_fallback, self.timeout)
def progress_update(current, total): def progress_update(current, total):
if total > 0: if total > 0:
@@ -99,34 +122,138 @@ class DownloadWorker(QThread):
else: else:
track_outpath = self.outpath track_outpath = self.outpath
import asyncio if self.service == "qobuz":
metadata = asyncio.run(downloader.get_track_info(track_id, self.service)) self.progress.emit(f"Getting track metadata for: {track.title} - {track.artists}", 0)
self.progress.emit(f"Track info received, starting download process", 0) isrc = None
is_paused_callback = lambda: self.is_paused try:
is_stopped_callback = lambda: self.is_stopped from getMetadata import get_raw_spotify_data, parse_uri
downloaded_file = downloader.download( track_url = track.external_urls
metadata,
track_outpath, self.progress.emit(f"Fetching Spotify metadata for ISRC...", 0)
is_paused_callback=is_paused_callback, raw_data = get_raw_spotify_data(track_url)
is_stopped_callback=is_stopped_callback
) if raw_data and "external_ids" in raw_data and "isrc" in raw_data["external_ids"]:
isrc = raw_data["external_ids"]["isrc"]
if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers: self.progress.emit(f"Found ISRC from Spotify: {isrc}", 0)
new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}" except Exception as e:
self.progress.emit(f"Could not get ISRC from Spotify raw data: {str(e)}", 0)
if not isrc:
self.progress.emit(f"No ISRC found, searching by title and artist", 0)
search_query = f"{track.title} {track.artists}"
try:
self.progress.emit(f"Searching Qobuz for: {search_query}", 0)
qobuz_track_info = SquidWTF.search_track(track.title, track.artists, strict_match=True)
if qobuz_track_info:
qobuz_track_id = qobuz_track_info["id"]
self.progress.emit(f"Found track on Qobuz by title search: {qobuz_track_info['title']} - {qobuz_track_info['performer']['name']}", 0)
found_artist = qobuz_track_info['performer']['name'].lower()
expected_artist = track.artists.lower()
if expected_artist not in found_artist and found_artist not in expected_artist:
self.progress.emit(f"Warning: Artist mismatch! Expected: {track.artists}, Found: {qobuz_track_info['performer']['name']}", 0)
raise Exception(f"Artist mismatch: Expected '{track.artists}', found '{qobuz_track_info['performer']['name']}'")
else:
raise Exception(f"Could not find track on Qobuz: {track.title} - {track.artists}")
except Exception as e:
self.progress.emit(f"Search by title failed: {str(e)}", 0)
raise Exception(f"Could not find track on Qobuz: {track.title} - {track.artists}")
else:
self.progress.emit(f"Searching Qobuz with ISRC: {isrc}", 0)
qobuz_track_info = SquidWTF.get_track_info(isrc)
qobuz_track_id = qobuz_track_info["id"]
self.progress.emit(f"Found track on Qobuz: {qobuz_track_info['title']} - {qobuz_track_info['performer']['name']}", 0)
found_artist = qobuz_track_info['performer']['name'].lower()
expected_artist = track.artists.lower()
if expected_artist not in found_artist and found_artist not in expected_artist:
self.progress.emit(f"Warning: Artist mismatch! Expected: {track.artists}, Found: {qobuz_track_info['performer']['name']}", 0)
download_url = SquidWTF.get_download_url(qobuz_track_id)
os.makedirs(track_outpath, exist_ok=True)
temp_filename = os.path.join(track_outpath, f"temp_{qobuz_track_id}.flac")
self.progress.emit(f"Downloading from Qobuz...", 0)
def progress_callback(current, total):
if total > 0:
percent = (current / total) * 100
current_mb = current / (1024 * 1024)
total_mb = total / (1024 * 1024)
self.progress.emit(f"Download progress: {percent:.2f}% ({current_mb:.2f}MB/{total_mb:.2f}MB)",
int(percent))
try:
SquidWTF.download_file(download_url, temp_filename, progress_callback)
if not os.path.exists(temp_filename) or os.path.getsize(temp_filename) == 0:
raise Exception(f"Downloaded file is empty or does not exist: {temp_filename}")
self.progress.emit(f"Embedding metadata...", 0)
SquidWTF.embed_metadata(temp_filename, qobuz_track_info)
if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers:
new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}"
else:
new_filename = self.get_formatted_filename(track)
new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename)
new_filepath = os.path.join(track_outpath, new_filename)
if os.path.exists(new_filepath):
os.remove(new_filepath)
os.rename(temp_filename, new_filepath)
downloaded_file = new_filepath
self.progress.emit(f"File renamed to: {new_filename}", 0)
except Exception as e:
self.progress.emit(f"Error during download or processing: {str(e)}", 0)
if os.path.exists(temp_filename):
try:
os.remove(temp_filename)
self.progress.emit(f"Removed incomplete download file", 0)
except:
pass
raise Exception(f"Failed to download or process file: {str(e)}")
else: else:
new_filename = self.get_formatted_filename(track) import asyncio
metadata = asyncio.run(downloader.get_track_info(track_id, self.service))
new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename)
new_filepath = os.path.join(track_outpath, new_filename) self.progress.emit(f"Track info received, starting download process", 0)
if os.path.exists(downloaded_file) and downloaded_file != new_filepath: is_paused_callback = lambda: self.is_paused
if os.path.exists(new_filepath): is_stopped_callback = lambda: self.is_stopped
os.remove(new_filepath)
os.rename(downloaded_file, new_filepath) downloaded_file = downloader.download(
self.progress.emit(f"File renamed to: {new_filename}", 0) metadata,
track_outpath,
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
)
if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers:
new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}"
else:
new_filename = self.get_formatted_filename(track)
new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename)
new_filepath = os.path.join(track_outpath, new_filename)
if os.path.exists(downloaded_file) and downloaded_file != new_filepath:
if os.path.exists(new_filepath):
os.remove(new_filepath)
os.rename(downloaded_file, new_filepath)
self.progress.emit(f"File renamed to: {new_filename}", 0)
self.progress.emit(f"Successfully downloaded: {track.title} - {track.artists}", self.progress.emit(f"Successfully downloaded: {track.title} - {track.artists}",
int((i + 1) / total_tracks * 100)) int((i + 1) / total_tracks * 100))
@@ -197,23 +324,48 @@ class ServiceStatusChecker(QThread):
error = pyqtSignal(str) error = pyqtSignal(str)
def run(self): def run(self):
services_status = {
'amazon': False,
'tidal': False,
'deezer': False,
'qobuz': False
}
try: try:
response = requests.get("https://lucida.to/api/stats", timeout=5) response = requests.get("https://lucida.to/api/stats", timeout=5)
if response.status_code == 200: if response.status_code == 200:
data = response.json() try:
services_status = {} data = response.json()
current_services = data.get('all', {}).get('downloads', {}).get('current', {}).get('services', {})
current_services = data.get('all', {}).get('downloads', {}).get('current', {}).get('services', {})
services_status['amazon'] = current_services.get('amazon', 0) > 0
services_status['amazon'] = current_services.get('amazon', 0) > 0 services_status['tidal'] = current_services.get('tidal', 0) > 0
services_status['tidal'] = current_services.get('tidal', 0) > 0 services_status['deezer'] = current_services.get('deezer', 0) > 0
services_status['deezer'] = current_services.get('deezer', 0) > 0
print("Lucida services status check successful")
self.status_updated.emit(services_status) except json.JSONDecodeError as e:
print(f"Lucida API returned invalid JSON: {e}")
except Exception as e:
print(f"Error processing Lucida API response: {str(e)}")
else: else:
self.error.emit(f"Server returned status code: {response.status_code}") print(f"Lucida API returned status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error connecting to Lucida API: {str(e)}")
except Exception as e: except Exception as e:
self.error.emit(f"Error checking service status: {str(e)}") print(f"Unexpected error checking Lucida services: {str(e)}")
try:
qobuz_response = requests.get("https://us.qobuz.squid.wtf", timeout=5)
services_status['qobuz'] = qobuz_response.status_code in [200, 304]
print(f"SquidWTF (Qobuz) status check: {qobuz_response.status_code} - {'Online' if services_status['qobuz'] else 'Offline'}")
except requests.exceptions.RequestException as e:
print(f"Error connecting to SquidWTF API (Qobuz): {str(e)}")
services_status['qobuz'] = False
except Exception as e:
print(f"Unexpected error checking SquidWTF (Qobuz): {str(e)}")
services_status['qobuz'] = False
self.status_updated.emit(services_status)
class StatusIndicatorDelegate(QStyledItemDelegate): class StatusIndicatorDelegate(QStyledItemDelegate):
def paint(self, painter, option, index): def paint(self, painter, option, index):
@@ -222,11 +374,6 @@ class StatusIndicatorDelegate(QStyledItemDelegate):
super().paint(painter, option, index) super().paint(painter, option, index)
if option.state & QStyle.StateFlag.State_Selected:
text_color = option.palette.color(QPalette.ColorGroup.Active, QPalette.ColorRole.HighlightedText)
else:
text_color = option.palette.color(QPalette.ColorGroup.Active, QPalette.ColorRole.Text)
indicator_color = Qt.GlobalColor.green if is_online else Qt.GlobalColor.red indicator_color = Qt.GlobalColor.green if is_online else Qt.GlobalColor.red
circle_size = 6 circle_size = 6
@@ -264,7 +411,8 @@ class ServiceComboBox(QComboBox):
self.services = [ self.services = [
{'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False}, {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False},
{'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False}, {'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False},
{'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False} {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False},
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False}
] ]
for service in self.services: for service in self.services:
@@ -310,7 +458,7 @@ class ServiceComboBox(QComboBox):
class SpotiFLACGUI(QWidget): class SpotiFLACGUI(QWidget):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.current_version = "2.3" self.current_version = "2.7"
self.tracks = [] self.tracks = []
self.reset_state() self.reset_state()
@@ -323,6 +471,7 @@ class SpotiFLACGUI(QWidget):
self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool) self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool)
self.use_fallback = self.settings.value('use_fallback', False, type=bool) self.use_fallback = self.settings.value('use_fallback', False, type=bool)
self.service = self.settings.value('service', 'amazon') self.service = self.settings.value('service', 'amazon')
self.timeout_value = self.settings.value('timeout_value', 30, type=int)
self.check_for_updates = self.settings.value('check_for_updates', True, type=bool) self.check_for_updates = self.settings.value('check_for_updates', True, type=bool)
self.elapsed_time = QTime(0, 0, 0) self.elapsed_time = QTime(0, 0, 0)
@@ -397,6 +546,7 @@ class SpotiFLACGUI(QWidget):
self.setup_tabs() self.setup_tabs()
self.setLayout(self.main_layout) self.setLayout(self.main_layout)
QTimer.singleShot(0, self.update_service_ui_visibility)
def setup_spotify_section(self): def setup_spotify_section(self):
spotify_layout = QHBoxLayout() spotify_layout = QHBoxLayout()
@@ -566,7 +716,7 @@ class SpotiFLACGUI(QWidget):
output_layout.setSpacing(5) output_layout.setSpacing(5)
output_label = QLabel('Output Directory') output_label = QLabel('Output Directory')
output_label.setStyleSheet("font-weight: bold; color: palette(text);") output_label.setStyleSheet("font-weight: bold;")
output_layout.addWidget(output_label) output_layout.addWidget(output_label)
output_dir_layout = QHBoxLayout() output_dir_layout = QHBoxLayout()
@@ -589,21 +739,18 @@ class SpotiFLACGUI(QWidget):
file_layout.setSpacing(5) file_layout.setSpacing(5)
file_label = QLabel('File Settings') file_label = QLabel('File Settings')
file_label.setStyleSheet("font-weight: bold; color: palette(text);") file_label.setStyleSheet("font-weight: bold;")
file_layout.addWidget(file_label) file_layout.addWidget(file_label)
format_layout = QHBoxLayout() format_layout = QHBoxLayout()
format_label = QLabel('Filename Format:') format_label = QLabel('Filename Format:')
format_label.setStyleSheet("color: palette(text);")
self.format_group = QButtonGroup(self) self.format_group = QButtonGroup(self)
self.title_artist_radio = QRadioButton('Title - Artist') self.title_artist_radio = QRadioButton('Title - Artist')
self.title_artist_radio.setStyleSheet("color: palette(text);")
self.title_artist_radio.setCursor(Qt.CursorShape.PointingHandCursor) self.title_artist_radio.setCursor(Qt.CursorShape.PointingHandCursor)
self.title_artist_radio.toggled.connect(self.save_filename_format) self.title_artist_radio.toggled.connect(self.save_filename_format)
self.artist_title_radio = QRadioButton('Artist - Title') self.artist_title_radio = QRadioButton('Artist - Title')
self.artist_title_radio.setStyleSheet("color: palette(text);")
self.artist_title_radio.setCursor(Qt.CursorShape.PointingHandCursor) self.artist_title_radio.setCursor(Qt.CursorShape.PointingHandCursor)
self.artist_title_radio.toggled.connect(self.save_filename_format) self.artist_title_radio.toggled.connect(self.save_filename_format)
@@ -624,14 +771,12 @@ class SpotiFLACGUI(QWidget):
checkbox_layout = QHBoxLayout() checkbox_layout = QHBoxLayout()
self.track_number_checkbox = QCheckBox('Add Track Numbers to Album Files') self.track_number_checkbox = QCheckBox('Add Track Numbers to Album Files')
self.track_number_checkbox.setStyleSheet("color: palette(text);")
self.track_number_checkbox.setCursor(Qt.CursorShape.PointingHandCursor) self.track_number_checkbox.setCursor(Qt.CursorShape.PointingHandCursor)
self.track_number_checkbox.setChecked(self.use_track_numbers) self.track_number_checkbox.setChecked(self.use_track_numbers)
self.track_number_checkbox.toggled.connect(self.save_track_numbering) self.track_number_checkbox.toggled.connect(self.save_track_numbering)
checkbox_layout.addWidget(self.track_number_checkbox) checkbox_layout.addWidget(self.track_number_checkbox)
self.album_subfolder_checkbox = QCheckBox('Create Album Subfolders for Playlist Downloads') self.album_subfolder_checkbox = QCheckBox('Create Album Subfolders for Playlist Downloads')
self.album_subfolder_checkbox.setStyleSheet("color: palette(text);")
self.album_subfolder_checkbox.setCursor(Qt.CursorShape.PointingHandCursor) self.album_subfolder_checkbox.setCursor(Qt.CursorShape.PointingHandCursor)
self.album_subfolder_checkbox.setChecked(self.use_album_subfolders) self.album_subfolder_checkbox.setChecked(self.use_album_subfolders)
self.album_subfolder_checkbox.toggled.connect(self.save_album_subfolder_setting) self.album_subfolder_checkbox.toggled.connect(self.save_album_subfolder_setting)
@@ -646,39 +791,67 @@ class SpotiFLACGUI(QWidget):
auth_layout = QVBoxLayout(auth_group) auth_layout = QVBoxLayout(auth_group)
auth_layout.setSpacing(5) auth_layout.setSpacing(5)
auth_label = QLabel('Lucida') auth_label = QLabel('Service Setting')
auth_label.setStyleSheet("font-weight: bold; color: palette(text);") auth_label.setStyleSheet("font-weight: bold;")
auth_layout.addWidget(auth_label) auth_layout.addWidget(auth_label)
service_fallback_layout = QHBoxLayout() source_fallback_layout = QHBoxLayout()
service_label = QLabel('Service:') service_label = QLabel('Source:')
service_label.setStyleSheet("color: palette(text);")
self.service_dropdown = ServiceComboBox() self.service_dropdown = ServiceComboBox()
self.service_dropdown.currentIndexChanged.connect(self.save_service_setting) self.service_dropdown.currentIndexChanged.connect(self.save_service_setting)
service_fallback_layout.addWidget(service_label) saved_service = self.service
service_fallback_layout.addWidget(self.service_dropdown) for i in range(self.service_dropdown.count()):
if self.service_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == saved_service:
self.service_dropdown.setCurrentIndex(i)
break
service_fallback_layout.addSpacing(20) source_fallback_layout.addWidget(service_label)
source_fallback_layout.addWidget(self.service_dropdown)
source_fallback_layout.addSpacing(20)
self.fallback_checkbox = QCheckBox('Fallback') self.fallback_checkbox = QCheckBox('Fallback')
self.fallback_checkbox.setStyleSheet("color: palette(text);")
self.fallback_checkbox.setCursor(Qt.CursorShape.PointingHandCursor) self.fallback_checkbox.setCursor(Qt.CursorShape.PointingHandCursor)
self.fallback_checkbox.setChecked(self.use_fallback) self.fallback_checkbox.setChecked(self.use_fallback)
self.fallback_checkbox.toggled.connect(self.save_fallback_setting) self.fallback_checkbox.toggled.connect(self.save_fallback_setting)
service_fallback_layout.addWidget(self.fallback_checkbox) source_fallback_layout.addWidget(self.fallback_checkbox)
service_fallback_layout.addStretch() source_fallback_layout.addSpacing(20)
auth_layout.addLayout(service_fallback_layout)
timeout_label = QLabel('Timeout:')
self.timeout_label = timeout_label
self.timeout_input = QLineEdit()
self.timeout_input.setText(str(self.timeout_value))
self.timeout_input.setFixedWidth(60)
self.timeout_input.textChanged.connect(self.save_timeout_setting)
source_fallback_layout.addWidget(timeout_label)
source_fallback_layout.addWidget(self.timeout_input)
source_fallback_layout.addStretch()
auth_layout.addLayout(source_fallback_layout)
settings_layout.addWidget(auth_group) settings_layout.addWidget(auth_group)
settings_layout.addStretch() settings_layout.addStretch()
settings_tab.setLayout(settings_layout) settings_tab.setLayout(settings_layout)
self.tab_widget.addTab(settings_tab, "Settings") self.tab_widget.addTab(settings_tab, "Settings")
def update_service_ui_visibility(self):
if hasattr(self, 'fallback_checkbox') and hasattr(self, 'timeout_input') and hasattr(self, 'timeout_label'):
service = self.service_dropdown.currentData() if hasattr(self, 'service_dropdown') else self.service
if service == "qobuz":
self.fallback_checkbox.setVisible(False)
self.timeout_input.setVisible(False)
self.timeout_label.setVisible(False)
else:
self.fallback_checkbox.setVisible(True)
self.timeout_input.setVisible(True)
self.timeout_label.setVisible(True)
def setup_about_tab(self): def setup_about_tab(self):
about_tab = QWidget() about_tab = QWidget()
about_layout = QVBoxLayout() about_layout = QVBoxLayout()
@@ -729,8 +902,8 @@ class SpotiFLACGUI(QWidget):
spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed) spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
about_layout.addItem(spacer) about_layout.addItem(spacer)
footer_label = QLabel("v2.3 | March 2025") footer_label = QLabel("v2.7 | May 2025")
footer_label.setStyleSheet("font-size: 12px; color: palette(text); margin-top: 10px;") footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;")
about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter) about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter)
about_tab.setLayout(about_layout) about_tab.setLayout(about_layout)
@@ -761,12 +934,29 @@ class SpotiFLACGUI(QWidget):
self.settings.sync() self.settings.sync()
self.log_output.append("Fallback setting saved successfully!") self.log_output.append("Fallback setting saved successfully!")
def save_timeout_setting(self):
try:
timeout = int(self.timeout_input.text())
if timeout > 0:
self.timeout_value = timeout
self.settings.setValue('timeout_value', self.timeout_value)
self.settings.sync()
self.log_output.append(f"Timeout setting saved: {self.timeout_value} seconds")
else:
self.timeout_input.setText(str(self.timeout_value))
self.log_output.append("Timeout must be a positive number")
except ValueError:
self.timeout_input.setText(str(self.timeout_value))
self.log_output.append("Timeout must be a valid number")
def save_service_setting(self): def save_service_setting(self):
service = self.service_dropdown.currentData() service = self.service_dropdown.currentData()
self.service = service self.service = service
self.settings.setValue('service', service) self.settings.setValue('service', service)
self.settings.sync() self.settings.sync()
self.log_output.append(f"Service setting saved: {self.service_dropdown.currentText()}") self.log_output.append(f"Service setting saved: {self.service_dropdown.currentText()}")
self.update_service_ui_visibility()
def save_settings(self): def save_settings(self):
self.settings.setValue('output_path', self.output_dir.text().strip()) self.settings.setValue('output_path', self.output_dir.text().strip())
@@ -788,11 +978,20 @@ class SpotiFLACGUI(QWidget):
self.reset_state() self.reset_state()
self.reset_ui() self.reset_ui()
metadata = get_filtered_data(url) self.log_output.append('Just a moment. Fetching metadata...')
if "error" in metadata: self.tab_widget.setCurrentWidget(self.process_tab)
raise Exception(metadata["error"])
self.metadata_worker = MetadataFetchWorker(url)
url_info = parse_uri(url) self.metadata_worker.finished.connect(self.on_metadata_fetched)
self.metadata_worker.error.connect(self.on_metadata_error)
self.metadata_worker.start()
except Exception as e:
self.log_output.append(f'Error: Failed to start metadata fetch: {str(e)}')
def on_metadata_fetched(self, metadata):
try:
url_info = parse_uri(self.spotify_url.text().strip())
if url_info["type"] == "track": if url_info["type"] == "track":
self.handle_track_metadata(metadata["track"]) self.handle_track_metadata(metadata["track"])
@@ -803,11 +1002,11 @@ class SpotiFLACGUI(QWidget):
self.update_button_states() self.update_button_states()
self.tab_widget.setCurrentIndex(0) self.tab_widget.setCurrentIndex(0)
except SpotifyInvalidUrlException as e:
self.log_output.append(f'Error: {str(e)}')
except Exception as e: except Exception as e:
self.log_output.append(f'Error: Failed to fetch metadata: {str(e)}') self.log_output.append(f'Error: {str(e)}')
def on_metadata_error(self, error_message):
self.log_output.append(f'Error: {error_message}')
def handle_track_metadata(self, track_data): def handle_track_metadata(self, track_data):
track_id = track_data["external_urls"].split("/")[-1] track_id = track_data["external_urls"].split("/")[-1]
@@ -921,10 +1120,21 @@ class SpotiFLACGUI(QWidget):
self.followers_label.hide() self.followers_label.hide()
if metadata.get('releaseDate'): if metadata.get('releaseDate'):
release_date = datetime.strptime(metadata['releaseDate'], "%Y-%m-%d") try:
formatted_date = release_date.strftime("%d-%m-%Y") release_date = metadata['releaseDate']
self.release_date_label.setText(f"<b>Released</b> {formatted_date}") if len(release_date) == 4:
self.release_date_label.show() date_obj = datetime.strptime(release_date, "%Y")
elif len(release_date) == 7:
date_obj = datetime.strptime(release_date, "%Y-%m")
else:
date_obj = datetime.strptime(release_date, "%Y-%m-%d")
formatted_date = date_obj.strftime("%d-%m-%Y")
self.release_date_label.setText(f"<b>Released</b> {formatted_date}")
self.release_date_label.show()
except ValueError:
self.release_date_label.setText(f"<b>Released</b> {metadata['releaseDate']}")
self.release_date_label.show()
else: else:
self.release_date_label.hide() self.release_date_label.hide()
@@ -1035,7 +1245,8 @@ class SpotiFLACGUI(QWidget):
self.use_track_numbers, self.use_track_numbers,
self.use_album_subfolders, self.use_album_subfolders,
self.use_fallback, self.use_fallback,
service service,
self.timeout_value
) )
self.worker.finished.connect(self.on_download_finished) self.worker.finished.connect(self.on_download_finished)
self.worker.progress.connect(self.update_progress) self.worker.progress.connect(self.update_progress)
+228
View File
@@ -0,0 +1,228 @@
import requests
from mutagen.flac import FLAC, Picture
from datetime import datetime
import sys
import os
def get_track_info(isrc):
print(f"Search: {isrc}")
url = f"https://us.qobuz.squid.wtf/api/get-music?q={isrc}&offset=0"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to get track info")
tracks = data["data"]["tracks"]["items"]
if not tracks:
print(f"Not Found: {isrc}")
raise Exception(f"No tracks found for ISRC: {isrc}")
track = None
for item in tracks:
if item["isrc"] == isrc:
track = item
break
if not track:
print(f"Not Found: {isrc}")
raise Exception(f"No track with matching ISRC: {isrc}")
print(f"Found: {track['title']} - {track['performer']['name']}")
return track
def search_track(title, artist, strict_match=False):
print(f"Search by title/artist: {title} - {artist}")
search_query = f"{title} {artist}".replace("feat.", "").replace("ft.", "")
url = f"https://us.qobuz.squid.wtf/api/get-music?q={search_query}&offset=0"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to search for track")
tracks = data["data"]["tracks"]["items"]
if not tracks:
print(f"Not Found: {title} - {artist}")
raise Exception(f"No tracks found for: {title} - {artist}")
best_match = None
title_lower = title.lower()
artist_lower = artist.lower()
for item in tracks:
item_title = item["title"].lower()
item_artist = item["performer"]["name"].lower()
if title_lower == item_title and (artist_lower in item_artist or item_artist in artist_lower):
best_match = item
print(f"Found exact title match with artist: {item['title']} - {item['performer']['name']}")
break
if not best_match and not strict_match:
for item in tracks:
item_title = item["title"].lower()
item_artist = item["performer"]["name"].lower()
if title_lower in item_title and (artist_lower in item_artist or item_artist in artist_lower):
best_match = item
print(f"Found partial match: {item['title']} - {item['performer']['name']}")
break
if strict_match and best_match:
item_artist = best_match["performer"]["name"].lower()
if artist_lower not in item_artist and item_artist not in artist_lower:
print(f"Artist mismatch in strict mode: Expected '{artist}', found '{best_match['performer']['name']}'")
best_match = None
if not best_match and not strict_match and tracks:
best_match = tracks[0]
print(f"No good match, using first result: {best_match['title']} - {best_match['performer']['name']}")
if not best_match:
print(f"Not Found: {title} - {artist}")
raise Exception(f"No suitable track found for: {title} - {artist}")
print(f"Found by title search: {best_match['title']} - {best_match['performer']['name']}")
return best_match
def get_download_url(track_id):
url = f"https://us.qobuz.squid.wtf/api/download-music?track_id={track_id}&quality=27"
response = requests.get(url)
data = response.json()
if not data.get("success"):
raise Exception("Failed to get download URL")
return data["data"]["url"]
def download_file(url, filename, progress_callback=None):
directory = os.path.dirname(filename)
if directory and not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
print(f"Created directory: {directory}")
except Exception as e:
raise Exception(f"Failed to create directory {directory}: {str(e)}")
try:
with open(filename, 'wb') as test_file:
pass
except Exception as e:
raise Exception(f"Cannot write to file {filename}: {str(e)}")
try:
response = requests.get(url, stream=True)
if response.status_code != 200:
raise Exception(f"Failed to download file: {response.status_code}")
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0 and progress_callback:
progress_callback(downloaded, total_size)
elif total_size > 0:
progress = (downloaded / total_size) * 100
sys.stdout.write(f"\rProgress Download: {progress:.1f}%")
sys.stdout.flush()
if total_size > 0:
sys.stdout.write("\n")
if not os.path.exists(filename) or os.path.getsize(filename) == 0:
raise Exception(f"Download failed: File {filename} is empty or does not exist")
return filename
except Exception as e:
if os.path.exists(filename):
try:
os.remove(filename)
print(f"Removed incomplete file: {filename}")
except:
pass
raise Exception(f"Download failed: {str(e)}")
def embed_metadata(filename, track_info):
if not os.path.exists(filename):
raise Exception(f"Cannot embed metadata: File {filename} does not exist")
try:
print("Embedding Tags...")
audio = FLAC(filename)
audio.clear()
audio["TITLE"] = track_info["title"]
audio["ARTIST"] = track_info["performer"]["name"]
audio["ALBUM"] = track_info["album"]["title"]
audio["ALBUMARTIST"] = track_info["album"]["artist"]["name"]
audio["TRACKNUMBER"] = str(track_info["track_number"])
audio["LABEL"] = track_info["album"]["label"]["name"]
audio["GENRE"] = track_info["album"]["genre"]["name"]
release_date = datetime.fromtimestamp(track_info["album"]["released_at"]).strftime("%Y-%m-%d")
release_year = release_date.split("-")[0]
audio["DATE"] = release_date
audio["YEAR"] = release_year
audio["ISRC"] = track_info["isrc"]
audio["COPYRIGHT"] = track_info["copyright"]
if track_info["album"]["image"]["large"]:
try:
cover_data = download_cover_image(track_info["album"]["image"]["large"])
picture = Picture()
picture.type = 3
picture.mime = "image/jpeg"
picture.desc = ""
picture.data = cover_data
audio.add_picture(picture)
except Exception as e:
print(f"Warning: Could not add cover image: {str(e)}")
audio.save()
except Exception as e:
raise Exception(f"Failed to embed metadata: {str(e)}")
def download_cover_image(url):
response = requests.get(url)
if response.status_code != 200:
raise Exception(f"Failed to download cover image: {response.status_code}")
return response.content
def main():
try:
isrc = "USUM72409273"
track_info = get_track_info(isrc)
track_id = track_info["id"]
if track_info["isrc"] != isrc:
raise Exception(f"ISRC mismatch: {track_info['isrc']} != {isrc}")
download_url = get_download_url(track_id)
filename = f"{track_info['title']} - {track_info['performer']['name']}.flac"
filename = filename.replace('/', '_').replace('\\', '_')
download_file(download_url, filename)
embed_metadata(filename, track_info)
print("Downloaded Successfully!")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
+218 -59
View File
@@ -5,7 +5,7 @@ import json
import hmac import hmac
import time import time
import hashlib import hashlib
from typing import Tuple, Callable from typing import Tuple, Callable, Dict, Any, List
_TOTP_SECRET = bytearray([53,53,48,55,49,52,53,56,53,51,52,56,55,52,57,57,53,57,50,50,52,56,54,51,48,51,50,57,51,52,55]) _TOTP_SECRET = bytearray([53,53,48,55,49,52,53,56,53,51,52,56,55,52,57,57,53,57,50,50,52,56,54,51,48,51,50,57,51,52,55])
@@ -100,9 +100,7 @@ def get_json_from_api(api_url, access_token):
return req.json() return req.json()
def get_raw_spotify_data(spotify_url): def get_access_token():
url_info = parse_uri(spotify_url)
try: try:
totp, timestamp = generate_totp() totp, timestamp = generate_totp()
@@ -117,34 +115,116 @@ def get_raw_spotify_data(spotify_url):
req = requests.get(token_url, headers=headers, params=params, timeout=10) req = requests.get(token_url, headers=headers, params=params, timeout=10)
if req.status_code != 200: if req.status_code != 200:
return {"error": f"Failed to get access token. Status code: {req.status_code}"} return {"error": f"Failed to get access token. Status code: {req.status_code}"}
token = req.json() return req.json()
except Exception as e: except Exception as e:
return {"error": f"Failed to get access token: {str(e)}"} return {"error": f"Failed to get access token: {str(e)}"}
def fetch_tracks_in_batches(url: str, access_token: str, batch_size: int = 100, delay: float = 1.0) -> Tuple[List[Dict[str, Any]], int]:
all_tracks = []
current_batch = 0
while url:
print(f"Batch : {current_batch}")
url_parts = url.split("offset=")
if len(url_parts) > 1:
offset_part = url_parts[1].split("&")[0]
print(f"Offset : {offset_part}")
print("-------------")
track_data = get_json_from_api(url, access_token)
if not track_data:
break
items = track_data.get('items', [])
all_tracks.extend(items)
url = track_data.get('next')
if url and "&locale=" in url:
url = url.split("&locale=")[0]
if url and delay > 0:
sleep(delay)
current_batch += 1
return all_tracks, current_batch
def get_raw_spotify_data(spotify_url, batch: bool = False, delay: float = 1.0):
url_info = parse_uri(spotify_url)
token = get_access_token()
if "error" in token:
return token
access_token = token["accessToken"]
raw_data = {} raw_data = {}
if url_info['type'] == "playlist": if url_info['type'] == "playlist":
try: try:
playlist_data = get_json_from_api( playlist_data = get_json_from_api(
playlist_base_url.format(url_info["id"]), playlist_base_url.format(url_info["id"]),
token["accessToken"] access_token
) )
if not playlist_data: if not playlist_data:
return {"error": "Failed to get playlist data"} return {"error": "Failed to get playlist data"}
raw_data = playlist_data raw_data = playlist_data
total_tracks = playlist_data.get('tracks', {}).get('total', 0)
tracks = [] if batch:
tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100' tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100'
while tracks_url: tracks, num_batches = fetch_tracks_in_batches(tracks_url, access_token, 100, delay)
track_data = get_json_from_api(tracks_url, token["accessToken"]) raw_data['tracks']['items'] = tracks
if not track_data: raw_data['_batch_count'] = num_batches
break raw_data['_batch_enabled'] = True
tracks.extend(track_data['items']) if len(tracks) < total_tracks:
tracks_url = track_data.get('next') last_offset = len(tracks)
remaining_tracks = []
while last_offset < total_tracks:
print(f"Batch : {num_batches}")
print(f"Offset : {last_offset}")
print("-------------")
remainder_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?offset={last_offset}&limit=100'
track_data = get_json_from_api(remainder_url, access_token)
if not track_data or not track_data.get('items'):
break
items = track_data.get('items', [])
remaining_tracks.extend(items)
if len(items) < 100:
break
last_offset += len(items)
num_batches += 1
if delay > 0:
sleep(delay)
tracks.extend(remaining_tracks)
raw_data['tracks']['items'] = tracks
raw_data['_batch_count'] = num_batches
else:
tracks = []
tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100'
while tracks_url:
track_data = get_json_from_api(tracks_url, access_token)
if not track_data:
break
tracks.extend(track_data['items'])
tracks_url = track_data.get('next')
if tracks_url and "&locale=" in tracks_url:
tracks_url = tracks_url.split("&locale=")[0]
raw_data['tracks']['items'] = tracks
raw_data['_batch_enabled'] = False
raw_data['tracks']['items'] = tracks
except Exception as e: except Exception as e:
return {"error": f"Failed to get playlist data: {str(e)}"} return {"error": f"Failed to get playlist data: {str(e)}"}
@@ -152,25 +232,68 @@ def get_raw_spotify_data(spotify_url):
try: try:
album_data = get_json_from_api( album_data = get_json_from_api(
album_base_url.format(url_info["id"]), album_base_url.format(url_info["id"]),
token["accessToken"] access_token
) )
if not album_data: if not album_data:
return {"error": "Failed to get album data"} return {"error": "Failed to get album data"}
album_data['_token'] = token["accessToken"] album_data['_token'] = access_token
raw_data = album_data raw_data = album_data
total_tracks = album_data.get('total_tracks', 0)
tracks = [] if batch:
tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50' tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50'
while tracks_url: tracks, num_batches = fetch_tracks_in_batches(tracks_url, access_token, 50, delay)
track_data = get_json_from_api(tracks_url, token["accessToken"]) raw_data['tracks']['items'] = tracks
if not track_data: raw_data['_batch_count'] = num_batches
break raw_data['_batch_enabled'] = True
tracks.extend(track_data['items']) if len(tracks) < total_tracks:
tracks_url = track_data.get('next') last_offset = len(tracks)
remaining_tracks = []
while last_offset < total_tracks:
print(f"Batch : {num_batches}")
print(f"Offset : {last_offset}")
print("-------------")
remainder_url = f'{album_base_url.format(url_info["id"])}/tracks?offset={last_offset}&limit=50'
track_data = get_json_from_api(remainder_url, access_token)
if not track_data or not track_data.get('items'):
break
items = track_data.get('items', [])
remaining_tracks.extend(items)
if len(items) < 50:
break
last_offset += len(items)
num_batches += 1
if delay > 0:
sleep(delay)
tracks.extend(remaining_tracks)
raw_data['tracks']['items'] = tracks
raw_data['_batch_count'] = num_batches
else:
tracks = []
tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50'
while tracks_url:
track_data = get_json_from_api(tracks_url, access_token)
if not track_data:
break
tracks.extend(track_data['items'])
tracks_url = track_data.get('next')
if tracks_url and "&locale=" in tracks_url:
tracks_url = tracks_url.split("&locale=")[0]
raw_data['tracks']['items'] = tracks
raw_data['_batch_enabled'] = False
raw_data['tracks']['items'] = tracks
except Exception as e: except Exception as e:
return {"error": f"Failed to get album data: {str(e)}"} return {"error": f"Failed to get album data: {str(e)}"}
@@ -178,7 +301,7 @@ def get_raw_spotify_data(spotify_url):
try: try:
track_data = get_json_from_api( track_data = get_json_from_api(
track_base_url.format(url_info["id"]), track_base_url.format(url_info["id"]),
token["accessToken"] access_token
) )
if not track_data: if not track_data:
return {"error": "Failed to get track data"} return {"error": "Failed to get track data"}
@@ -191,10 +314,12 @@ def get_raw_spotify_data(spotify_url):
def format_track_data(track_data): def format_track_data(track_data):
artists = [] artists = []
for artist in track_data['artists']: for artist in track_data.get('artists', []):
artists.append(artist['name']) artists.append(artist['name'])
image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') if track_data.get('album', {}).get('images') else ''
isrc = track_data.get('external_ids', {}).get('isrc', '')
return { return {
"track": { "track": {
@@ -205,22 +330,37 @@ def format_track_data(track_data):
"images": image_url, "images": image_url,
"release_date": track_data.get('album', {}).get('release_date', ''), "release_date": track_data.get('album', {}).get('release_date', ''),
"track_number": track_data.get('track_number', 0), "track_number": track_data.get('track_number', 0),
"external_urls": track_data.get('external_urls', {}).get('spotify', '') "external_urls": track_data.get('external_urls', {}).get('spotify', ''),
"isrc": isrc
} }
} }
def format_album_data(album_data): def format_album_data(album_data):
artists = [] artists = []
for artist in album_data['artists']: for artist in album_data.get('artists', []):
artists.append(artist['name']) artists.append(artist['name'])
image_url = album_data.get('images', [{}])[0].get('url', '') image_url = album_data.get('images', [{}])[0].get('url', '') if album_data.get('images') else ''
track_list = [] track_list = []
for track in album_data.get('tracks', {}).get('items', []): for track in album_data.get('tracks', {}).get('items', []):
track_artists = [] track_artists = []
for artist in track.get('artists', []): for artist in track.get('artists', []):
track_artists.append(artist['name']) track_artists.append(artist['name'])
track_id = track.get('id', '')
track_isrc = ''
if track_id and album_data.get('_token'):
try:
full_track_data = get_json_from_api(
track_base_url.format(track_id),
album_data.get('_token')
)
if full_track_data:
track_isrc = full_track_data.get('external_ids', {}).get('isrc', '')
except:
pass
track_list.append({ track_list.append({
"artists": ", ".join(track_artists), "artists": ", ".join(track_artists),
@@ -230,31 +370,44 @@ def format_album_data(album_data):
"images": image_url, "images": image_url,
"release_date": album_data.get('release_date', ''), "release_date": album_data.get('release_date', ''),
"track_number": track.get('track_number', 0), "track_number": track.get('track_number', 0),
"external_urls": track.get('external_urls', {}).get('spotify', '') "external_urls": track.get('external_urls', {}).get('spotify', ''),
"isrc": track_isrc
}) })
album_info = {
"total_tracks": album_data.get('total_tracks', 0),
"name": album_data.get('name', ''),
"release_date": album_data.get('release_date', ''),
"artists": ", ".join(artists),
"images": image_url
}
if album_data.get('_batch_enabled', False):
album_info["batch"] = f"{album_data.get('_batch_count', 1)}"
return { return {
"album_info": { "album_info": album_info,
"total_tracks": album_data.get('total_tracks', 0),
"name": album_data.get('name', ''),
"release_date": album_data.get('release_date', ''),
"artists": ", ".join(artists),
"images": image_url
},
"track_list": track_list "track_list": track_list
} }
def format_playlist_data(playlist_data): def format_playlist_data(playlist_data):
image_url = playlist_data.get('images', [{}])[0].get('url', '') image_url = playlist_data.get('images', [{}])[0].get('url', '') if playlist_data.get('images') else ''
track_list = [] track_list = []
for item in playlist_data.get('tracks', {}).get('items', []): for item in playlist_data.get('tracks', {}).get('items', []):
track = item.get('track', {}) track = item.get('track', {})
if not track:
continue
artists = [] artists = []
for artist in track.get('artists', []): for artist in track.get('artists', []):
artists.append(artist['name']) artists.append(artist['name'])
track_image = track.get('album', {}).get('images', [{}])[0].get('url', '') track_image = ''
if track.get('album', {}).get('images'):
track_image = track.get('album', {}).get('images', [{}])[0].get('url', '')
track_isrc = track.get('external_ids', {}).get('isrc', '')
track_list.append({ track_list.append({
"artists": ", ".join(artists), "artists": ", ".join(artists),
@@ -264,19 +417,25 @@ def format_playlist_data(playlist_data):
"images": track_image, "images": track_image,
"release_date": track.get('album', {}).get('release_date', ''), "release_date": track.get('album', {}).get('release_date', ''),
"track_number": track.get('track_number', 0), "track_number": track.get('track_number', 0),
"external_urls": track.get('external_urls', {}).get('spotify', '') "external_urls": track.get('external_urls', {}).get('spotify', ''),
"isrc": track_isrc
}) })
playlist_info = {
"tracks": {"total": playlist_data.get('tracks', {}).get('total', 0)},
"followers": {"total": playlist_data.get('followers', {}).get('total', 0)},
"owner": {
"display_name": playlist_data.get('owner', {}).get('display_name', ''),
"name": playlist_data.get('name', ''),
"images": image_url
}
}
if playlist_data.get('_batch_enabled', False):
playlist_info["batch"] = f"{playlist_data.get('_batch_count', 1)}"
return { return {
"playlist_info": { "playlist_info": playlist_info,
"tracks": {"total": playlist_data.get('tracks', {}).get('total', 0)},
"followers": {"total": playlist_data.get('followers', {}).get('total', 0)},
"owner": {
"display_name": playlist_data.get('owner', {}).get('display_name', ''),
"name": playlist_data.get('name', ''),
"images": image_url
}
},
"track_list": track_list "track_list": track_list
} }
@@ -296,8 +455,8 @@ def process_spotify_data(raw_data, data_type):
except Exception as e: except Exception as e:
return {"error": f"Error processing data: {str(e)}"} return {"error": f"Error processing data: {str(e)}"}
def get_filtered_data(spotify_url): def get_filtered_data(spotify_url, batch=False, delay=1.0):
raw_data = get_raw_spotify_data(spotify_url) raw_data = get_raw_spotify_data(spotify_url, batch=batch, delay=delay)
if raw_data and "error" not in raw_data: if raw_data and "error" not in raw_data:
url_info = parse_uri(spotify_url) url_info = parse_uri(spotify_url)
filtered_data = process_spotify_data(raw_data, url_info['type']) filtered_data = process_spotify_data(raw_data, url_info['type'])
@@ -306,10 +465,10 @@ def get_filtered_data(spotify_url):
if __name__ == '__main__': if __name__ == '__main__':
playlist = "https://open.spotify.com/playlist/37i9dQZEVXbNG2KDcFcKOF" playlist = "https://open.spotify.com/playlist/37i9dQZEVXbNG2KDcFcKOF"
album = "https://open.spotify.com/album/7kFyd5oyJdVX2pIi6P4iHE" album = "https://open.spotify.com/album/6J84szYCnMfzEcvIcfWMFL"
song = "https://open.spotify.com/track/4wJ5Qq0jBN4ajy7ouZIV1c" song = "https://open.spotify.com/track/7so0lgd0zP2Sbgs2d7a1SZ"
filtered_playlist = get_filtered_data(playlist) filtered_playlist = get_filtered_data(playlist, batch=True, delay=0.1)
print(json.dumps(filtered_playlist, indent=2)) print(json.dumps(filtered_playlist, indent=2))
filtered_album = get_filtered_data(album) filtered_album = get_filtered_data(album)
+4 -3
View File
@@ -6,13 +6,14 @@ import re
import base64 import base64
class TrackDownloader: class TrackDownloader:
def __init__(self, use_fallback=False): def __init__(self, use_fallback=False, timeout=30):
self.client = requests.Session() self.client = requests.Session()
self.headers = { self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
} }
self.progress_callback = None self.progress_callback = None
self.use_fallback = use_fallback self.use_fallback = use_fallback
self.timeout = timeout
self.base_domain = "lucida.su" if use_fallback else "lucida.to" self.base_domain = "lucida.su" if use_fallback else "lucida.to"
def set_progress_callback(self, callback): def set_progress_callback(self, callback):
@@ -73,7 +74,7 @@ class TrackDownloader:
base_url, base_url,
params=request_params, params=request_params,
headers=headers, headers=headers,
timeout=30 timeout=self.timeout
) )
html_content = response.text html_content = response.text
@@ -294,7 +295,7 @@ async def main():
output_dir = "." output_dir = "."
track_id = "2plbrEY59IikOBgBGLjaoe" track_id = "2plbrEY59IikOBgBGLjaoe"
service = "amazon" service = "tidal"
def progress_update(current, total): def progress_update(current, total):
if total > 0: if total > 0:
+1 -1
View File
@@ -1,3 +1,3 @@
{ {
"version": "2.2" "version": "2.6"
} }