Compare commits

...

42 Commits

Author SHA1 Message Date
afkarxyz 2306b1f8d2 v3.3 2025-06-21 05:03:32 +07:00
afkarxyz 1b0d67702d v3.2 2025-06-11 05:11:58 +07:00
afkarxyz 00e369677f v3.2 2025-06-11 05:07:20 +07:00
afkarxyz c3e1607ca6 v3.1 2025-06-02 13:13:10 +07:00
afkarxyz 59428e7679 v3.1 2025-06-02 13:09:18 +07:00
afkarxyz 33c4698286 v3.0 2025-05-31 19:37:36 +07:00
afkarxyz 3ac4c34d73 v3.0 2025-05-31 19:33:16 +07:00
afkarxyz 88e303cbe4 v2.9 2025-05-30 22:28:24 +07:00
afkarxyz c13855fadd v2.9 2025-05-30 22:22:14 +07:00
afkarxyz 2b12684960 v2.8 2025-05-23 16:47:38 +07:00
afkarxyz 4bc164cc56 v2.8 2025-05-23 16:43:45 +07:00
afkarxyz 46cb65665e Update README.md 2025-05-19 10:33:04 +07:00
afkarxyz 276b3b4951 Update README.md 2025-05-13 20:13:37 +07:00
afkarxyz e15aadbd61 Update README.md 2025-05-13 20:11:35 +07:00
afkarxyz d7639bae8f v2.7 2025-05-13 20:11:32 +07:00
afkarxyz 1af7ab65c9 v2.7 2025-05-13 20:07:19 +07:00
afkarxyz c5240596cb Revert 2025-05-13 12:06:55 +07:00
afkarxyz c4a9042adc v2.9 2025-05-12 00:08:55 +07:00
afkarxyz 45ac08ecbd v2.9 2025-05-12 00:05:34 +07:00
afkarxyz 0add305d9c v2.8 2025-05-11 18:34:58 +07:00
afkarxyz 9b6b43c0a4 v2.8 2025-05-11 18:31:28 +07:00
afkarxyz 60d20cbebe Revert 2025-05-11 17:03:23 +07:00
afkarxyz 626d58667e v2.8 2025-05-11 16:32:26 +07:00
afkarxyz 4dd1a7ea12 v2.8 2025-05-11 15:58:46 +07:00
afkarxyz 67964e4acb Update README.md 2025-05-11 04:40:22 +07:00
afkarxyz 1486fb13df v2.7 2025-05-10 20:33:17 +07:00
afkarxyz 966536f127 v2.7 2025-05-10 20:13:01 +07:00
afkarxyz 21946321f5 Update README.md 2025-05-06 13:25:27 +07:00
afkarxyz 3e3cb0610d Update README.md 2025-05-06 12:45:25 +07:00
afkarxyz 160eba0987 Update README.md 2025-05-06 12:43:29 +07:00
afkarxyz 71a60ded47 Update README.md 2025-05-06 10:42:11 +07:00
afkarxyz e0a0514df9 v2.6 2025-05-06 10:41:11 +07:00
afkarxyz 1e7a48d263 v2.6 2025-05-06 10:38:01 +07:00
afkarxyz 0a83a0dd6e Update README.md 2025-05-06 10:35:17 +07:00
afkarxyz da429d9410 v2.5 2025-04-25 06:33:32 +07:00
afkarxyz 63211c726b v2.5 2025-04-25 06:30:14 +07:00
afkarxyz 055cb6991a Update v2.4 2025-04-08 13:10:12 +07:00
afkarxyz 222d681551 Update v2.4 2025-04-08 13:07:26 +07:00
afkarxyz 479c6ede2b Update v2.3 2025-03-20 05:47:20 +07:00
afkarxyz ceb727adb9 Update v2.3 2025-03-20 05:41:28 +07:00
afkarxyz bbea8ca493 Update README.md 2025-03-20 05:36:38 +07:00
afkarxyz f567dd19bf Update v2.2 2025-03-18 08:01:11 +07:00
5 changed files with 1537 additions and 227 deletions
+13 -9
View File
@@ -1,27 +1,31 @@
[![GitHub All Releases](https://img.shields.io/github/downloads/afkarxyz/SpotiFLAC/total?style=for-the-badge)](https://github.com/afkarxyz/SpotiFLAC/releases) [![GitHub All Releases](https://img.shields.io/github/downloads/afkarxyz/SpotiFLAC/total?style=for-the-badge)](https://github.com/afkarxyz/SpotiFLAC/releases)
![spotiflac](https://github.com/user-attachments/assets/a233a276-14a4-4f4c-b267-f182dd3912a0) ![SpotiFLAC](https://github.com/user-attachments/assets/b4c4f403-edbd-4a71-b74b-c7d433d47d06)
<div align="center"> <div align="center">
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music and Deezer with the help of Lucida. <b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music, and Deezer <code>(via Lucida)</code>, as well as Qobuz <code>(via SquidWTF)</code>.
</div> </div>
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v2.2/SpotiFLAC.exe) ### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v3.2/SpotiFLAC.exe)
# #
> [!WARNING] > [!Note]
Sometimes, the **download speed** from Lucida can be fast or slow; it varies unpredictably. **Download speed** from Lucida is unpredictable—sometimes fast, sometimes slow. Join their [Discord](https://discord.com/invite/dXEGRWqEbS) for updates.
## Screenshots ## Screenshots
![image](https://github.com/user-attachments/assets/7fa82a25-0fe8-4b87-ba5c-2dd5933f211b) ![image](https://github.com/user-attachments/assets/70a5dceb-3374-4255-8f6a-4afb5ee534b0)
![image](https://github.com/user-attachments/assets/81e65977-11f0-4162-96f3-90730dd87e74) ![image](https://github.com/user-attachments/assets/9f0d6aa5-456b-4a90-b48a-7e0c22819ebd)
![image](https://github.com/user-attachments/assets/4dd37c0a-30e3-479a-9b3d-57fd360d87b3) ![image](https://github.com/user-attachments/assets/be9a79b5-260e-4948-9f72-10c735210ab7)
![image](https://github.com/user-attachments/assets/04954db9-e94a-4f9d-8eac-46d7ff7a4c33) ![image](https://github.com/user-attachments/assets/c4403934-9003-447e-a27b-fc74cab23454)
![image](https://github.com/user-attachments/assets/1feec621-f8bf-4b2a-ae73-afcb1fb1deba)
![image](https://github.com/user-attachments/assets/c64b9a08-c99a-4d3a-ae8b-5f834623915b)
> When **Fallback** is enabled, it will use the backup server `Lucida.su` > When **Fallback** is enabled, it will use the backup server `Lucida.su`
+475 -80
View File
@@ -10,14 +10,14 @@ from PyQt6.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit, QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit,
QLabel, QFileDialog, QListWidget, QTextEdit, QTabWidget, QButtonGroup, QRadioButton, QLabel, QFileDialog, QListWidget, QTextEdit, QTabWidget, QButtonGroup, QRadioButton,
QAbstractItemView, QSpacerItem, QSizePolicy, QProgressBar, QCheckBox, QDialog, QAbstractItemView, QSpacerItem, QSizePolicy, QProgressBar, QCheckBox, QDialog,
QDialogButtonBox, QComboBox, QStyledItemDelegate, QStyle QDialogButtonBox, QComboBox, QStyledItemDelegate
) )
from PyQt6.QtCore import Qt, QThread, pyqtSignal, QUrl, QTimer, QTime, QSettings, QSize from PyQt6.QtCore import Qt, QThread, pyqtSignal, QUrl, QTimer, QTime, QSettings, QSize
from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush, QPalette from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush
from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException
from getTracks import TrackDownloader from getTracks import LucidaDownloader, SquidWTFDownloader, TidalDownloader
@dataclass @dataclass
class Track: class Track:
@@ -28,6 +28,27 @@ class Track:
track_number: int track_number: int
duration_ms: int duration_ms: int
id: str id: str
isrc: str = ""
class MetadataFetchWorker(QThread):
finished = pyqtSignal(dict)
error = pyqtSignal(str)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
try:
metadata = get_filtered_data(self.url)
if "error" in metadata:
self.error.emit(metadata["error"])
else:
self.finished.emit(metadata)
except SpotifyInvalidUrlException as e:
self.error.emit(str(e))
except Exception as e:
self.error.emit(f'Failed to fetch metadata: {str(e)}')
class DownloadWorker(QThread): class DownloadWorker(QThread):
finished = pyqtSignal(bool, str, list) finished = pyqtSignal(bool, str, list)
@@ -35,7 +56,7 @@ class DownloadWorker(QThread):
def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False, def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False,
album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True, album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True,
use_album_subfolders=False, use_fallback=False, service="amazon"): use_album_subfolders=False, use_fallback=False, service="amazon", timeout=30, qobuz_region="us"):
super().__init__() super().__init__()
self.tracks = tracks self.tracks = tracks
self.outpath = outpath self.outpath = outpath
@@ -48,6 +69,8 @@ class DownloadWorker(QThread):
self.use_album_subfolders = use_album_subfolders self.use_album_subfolders = use_album_subfolders
self.use_fallback = use_fallback self.use_fallback = use_fallback
self.service = service self.service = service
self.timeout = timeout
self.qobuz_region = qobuz_region
self.is_paused = False self.is_paused = False
self.is_stopped = False self.is_stopped = False
self.failed_tracks = [] self.failed_tracks = []
@@ -61,18 +84,24 @@ class DownloadWorker(QThread):
def run(self): def run(self):
try: try:
downloader = TrackDownloader(self.use_fallback) if self.service == "qobuz":
downloader = SquidWTFDownloader(self.qobuz_region, self.timeout)
elif self.service == "tidal_api":
downloader = TidalDownloader(timeout=self.timeout)
else:
downloader = LucidaDownloader(self.use_fallback, self.timeout)
def progress_update(current, total): def progress_update(current, total):
if total > 0: if total > 0:
percent = (current / total) * 100 percent = (current / total) * 100
self.progress.emit(f"Download progress: {percent:.2f}% ({current}/{total})", current_mb = current / (1024 * 1024)
total_mb = total / (1024 * 1024)
self.progress.emit(f"Download progress: {percent:.2f}% ({current_mb:.2f}MB/{total_mb:.2f}MB)",
int(percent)) int(percent))
else: else:
self.progress.emit(f"Processing metadata...", 0) self.progress.emit(f"Processing metadata...", 0)
downloader.set_progress_callback(progress_update) downloader.set_progress_callback(progress_update)
downloader.set_filename_format(self.filename_format)
total_tracks = len(self.tracks) total_tracks = len(self.tracks)
@@ -88,9 +117,6 @@ class DownloadWorker(QThread):
int((i) / total_tracks * 100)) int((i) / total_tracks * 100))
try: try:
track_id = track.id
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0)
if self.is_playlist and self.use_album_subfolders: if self.is_playlist and self.use_album_subfolders:
album_folder = re.sub(r'[<>:"/\\|?*]', '_', track.album) album_folder = re.sub(r'[<>:"/\\|?*]', '_', track.album)
track_outpath = os.path.join(self.outpath, album_folder) track_outpath = os.path.join(self.outpath, album_folder)
@@ -98,12 +124,6 @@ class DownloadWorker(QThread):
else: else:
track_outpath = self.outpath track_outpath = self.outpath
import asyncio
metadata = asyncio.run(downloader.get_track_info(track_id, self.service))
self.progress.emit(f"Track info received, starting download process", 0)
downloaded_file = downloader.download(metadata, track_outpath)
if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers: if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers:
new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}" new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}"
else: else:
@@ -112,6 +132,101 @@ class DownloadWorker(QThread):
new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename) new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename)
new_filepath = os.path.join(track_outpath, new_filename) new_filepath = os.path.join(track_outpath, new_filename)
if os.path.exists(new_filepath) and os.path.getsize(new_filepath) > 0:
self.progress.emit(f"File already exists: {new_filename}. Skipping download.", 0)
self.progress.emit(f"Skipped: {track.title} - {track.artists}",
int((i + 1) / total_tracks * 100))
continue
if self.service == "qobuz":
if not track.isrc:
self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0)
self.failed_tracks.append((track.title, track.artists, "No ISRC available"))
continue
self.progress.emit(f"Getting track from Qobuz with ISRC: {track.isrc}", 0)
is_paused_callback = lambda: self.is_paused
is_stopped_callback = lambda: self.is_stopped
downloaded_file = downloader.download(
track.isrc,
track_outpath,
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
)
elif self.service == "tidal_api":
if not track.isrc:
self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0)
self.failed_tracks.append((track.title, track.artists, "No ISRC available"))
continue
self.progress.emit(f"Searching and downloading from Tidal (API) for ISRC: {track.isrc} - {track.title} - {track.artists}", 0)
import asyncio
is_paused_callback = lambda: self.is_paused
is_stopped_callback = lambda: self.is_stopped
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
download_result_details = loop.run_until_complete(downloader.download(
query=f"{track.title} {track.artists}",
isrc=track.isrc,
output_dir=track_outpath,
quality="LOSSLESS",
embed_metadata=True,
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
))
if isinstance(download_result_details, str) and os.path.exists(download_result_details):
downloaded_file = download_result_details
elif isinstance(download_result_details, dict) and download_result_details.get("success") == False and download_result_details.get("error") == "Download stopped by user":
self.progress.emit(f"Download stopped by user for: {track.title}",0)
return
elif isinstance(download_result_details, dict) and download_result_details.get("success") == False:
raise Exception(download_result_details.get("error", "Tidal API download failed"))
elif isinstance(download_result_details, dict) and download_result_details.get("status") == "all_skipped" or download_result_details.get("status") == "skipped_exists":
self.progress.emit(f"File already exists or skipped: {new_filename}",0)
downloaded_file = new_filepath
else:
downloaded_file = None
raise Exception(f"Tidal API download failed or returned unexpected result: {download_result_details}")
else:
track_id = track.id
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0)
metadata = downloader.get_track_info(track_id, self.service)
self.progress.emit(f"Track info received, starting download process", 0)
is_paused_callback = lambda: self.is_paused
is_stopped_callback = lambda: self.is_stopped
downloaded_file = downloader.download(
metadata,
track_outpath,
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
)
if self.is_stopped:
return
if downloaded_file == new_filepath:
self.progress.emit(f"File already exists: {new_filename}", 0)
self.progress.emit(f"Skipped: {track.title} - {track.artists}",
int((i + 1) / total_tracks * 100))
continue
if os.path.exists(downloaded_file) and downloaded_file != new_filepath: if os.path.exists(downloaded_file) and downloaded_file != new_filepath:
if os.path.exists(new_filepath): if os.path.exists(new_filepath):
os.remove(new_filepath) os.remove(new_filepath)
@@ -189,21 +304,48 @@ class ServiceStatusChecker(QThread):
def run(self): def run(self):
try: try:
response = requests.get("https://lucida.to/api/stats", timeout=5) response = requests.get("https://lucida.to/api/stats", timeout=5)
services_status = {}
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
services_status = {}
current_services = data.get('all', {}).get('downloads', {}).get('current', {}).get('services', {}) current_services = data.get('all', {}).get('downloads', {}).get('current', {}).get('services', {})
services_status['amazon'] = current_services.get('amazon', 0) > 0 services_status['amazon'] = current_services.get('amazon', 0) > 0
services_status['tidal'] = current_services.get('tidal', 0) > 0 services_status['tidal'] = current_services.get('tidal', 0) > 0
services_status['deezer'] = current_services.get('deezer', 0) > 0 services_status['deezer'] = current_services.get('deezer', 0) > 0
self.status_updated.emit(services_status)
else: else:
self.error.emit(f"Server returned status code: {response.status_code}") self.error.emit(f"Lucida API error: {response.status_code}")
self.status_updated.emit(services_status)
except Exception as e: except Exception as e:
self.error.emit(f"Error checking service status: {str(e)}") self.error.emit(f"Error checking Lucida service status: {str(e)}")
class TidalStatusChecker(QThread):
status_updated = pyqtSignal(bool)
error = pyqtSignal(str)
def run(self):
try:
response = requests.get("https://hifi.401658.xyz", timeout=5)
is_online = response.status_code == 200
self.status_updated.emit(is_online)
except Exception as e:
self.error.emit(f"Error checking Tidal (API) status: {str(e)}")
self.status_updated.emit(False)
class QobuzStatusChecker(QThread):
status_updated = pyqtSignal(bool)
error = pyqtSignal(str)
def __init__(self, region="us"):
super().__init__()
self.region = region
def run(self):
try:
response = requests.get(f"https://{self.region}.qobuz.squid.wtf", timeout=5)
self.status_updated.emit(response.status_code == 200)
except Exception as e:
self.error.emit(f"Error checking Qobuz status: {str(e)}")
self.status_updated.emit(False)
class StatusIndicatorDelegate(QStyledItemDelegate): class StatusIndicatorDelegate(QStyledItemDelegate):
def paint(self, painter, option, index): def paint(self, painter, option, index):
@@ -212,11 +354,6 @@ class StatusIndicatorDelegate(QStyledItemDelegate):
super().paint(painter, option, index) super().paint(painter, option, index)
if option.state & QStyle.StateFlag.State_Selected:
text_color = option.palette.color(QPalette.ColorGroup.Active, QPalette.ColorRole.HighlightedText)
else:
text_color = option.palette.color(QPalette.ColorGroup.Active, QPalette.ColorRole.Text)
indicator_color = Qt.GlobalColor.green if is_online else Qt.GlobalColor.red indicator_color = Qt.GlobalColor.green if is_online else Qt.GlobalColor.red
circle_size = 6 circle_size = 6
@@ -241,12 +378,21 @@ class ServiceComboBox(QComboBox):
self.status_checker = ServiceStatusChecker() self.status_checker = ServiceStatusChecker()
self.status_checker.status_updated.connect(self.update_service_status) self.status_checker.status_updated.connect(self.update_service_status)
self.status_checker.error.connect(lambda e: print(f"Status check error: {e}")) self.status_checker.error.connect(lambda e: print(f"General status check error: {e}"))
self.status_checker.start() self.status_checker.start()
self.status_timer = QTimer(self) self.status_timer = QTimer(self)
self.status_timer.timeout.connect(self.refresh_status) self.status_timer.timeout.connect(self.refresh_status)
self.status_timer.start(5000) self.status_timer.start(5000)
self.tidal_api_status_checker = TidalStatusChecker()
self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status)
self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}"))
self.tidal_api_status_checker.start()
self.tidal_api_status_timer = QTimer(self)
self.tidal_api_status_timer.timeout.connect(self.refresh_tidal_api_status)
self.tidal_api_status_timer.start(6000)
def setup_items(self): def setup_items(self):
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -254,7 +400,9 @@ class ServiceComboBox(QComboBox):
self.services = [ self.services = [
{'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False}, {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False},
{'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False}, {'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False},
{'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False} {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False},
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False},
{'id': 'tidal_api', 'name': 'Tidal (API)', 'icon': 'tidal.png', 'online': False}
] ]
for service in self.services: for service in self.services:
@@ -275,12 +423,12 @@ class ServiceComboBox(QComboBox):
pixmap.save(path) pixmap.save(path)
def update_service_status(self, status_dict): def update_service_status(self, status_dict):
self.services_status = status_dict self.services_status.update(status_dict)
for i in range(self.count()): for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id in self.services_status: if service_id in self.services_status:
service_data = self.itemData(i, Qt.ItemDataRole.UserRole) service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict): if isinstance(service_data, dict):
service_data['online'] = self.services_status[service_id] service_data['online'] = self.services_status[service_id]
@@ -291,8 +439,110 @@ class ServiceComboBox(QComboBox):
def refresh_status(self): def refresh_status(self):
self.status_checker = ServiceStatusChecker() self.status_checker = ServiceStatusChecker()
self.status_checker.status_updated.connect(self.update_service_status) self.status_checker.status_updated.connect(self.update_service_status)
self.status_checker.error.connect(lambda e: print(f"Status check error: {e}")) self.status_checker.error.connect(lambda e: print(f"General status check error: {e}"))
self.status_checker.start() self.status_checker.start()
def update_tidal_api_service_status(self, is_online):
for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id == 'tidal_api':
service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict):
service_data['online'] = is_online
self.setItemData(i, service_data, Qt.ItemDataRole.UserRole)
break
self.update()
def refresh_tidal_api_status(self):
self.tidal_api_status_checker = TidalStatusChecker()
self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status)
self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}"))
self.tidal_api_status_checker.start()
def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
return super().currentData(role)
def update_qobuz_status(self, region_id, is_online):
for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id == 'qobuz':
service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict):
if is_online or service_data.get('online', False):
service_data['online'] = True
self.setItemData(i, service_data, Qt.ItemDataRole.UserRole)
break
self.update()
class QobuzRegionComboBox(QComboBox):
status_updated = pyqtSignal(str, bool)
def __init__(self, parent=None):
super().__init__(parent)
self.setIconSize(QSize(16, 16))
self.setItemDelegate(StatusIndicatorDelegate())
self.setup_items()
self.status_checkers = {}
self.check_status()
self.status_timer = QTimer(self)
self.status_timer.timeout.connect(self.check_status)
self.status_timer.start(10000)
def setup_items(self):
current_dir = os.path.dirname(os.path.abspath(__file__))
self.regions = [
{'id': 'eu', 'name': 'Europe', 'icon': 'eu.svg', 'online': False},
{'id': 'us', 'name': 'North America', 'icon': 'us.svg', 'online': False}
]
for region in self.regions:
icon_path = os.path.join(current_dir, region['icon'])
if not os.path.exists(icon_path):
self.create_placeholder_icon(icon_path)
icon = QIcon(icon_path)
self.addItem(icon, region['name'])
item_index = self.count() - 1
self.setItemData(item_index, region['id'], Qt.ItemDataRole.UserRole + 1)
self.setItemData(item_index, region, Qt.ItemDataRole.UserRole)
def create_placeholder_icon(self, path):
pixmap = QPixmap(16, 16)
pixmap.fill(Qt.GlobalColor.transparent)
pixmap.save(path)
def update_region_status(self, region_id, is_online):
for i in range(self.count()):
current_region_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if current_region_id == region_id:
region_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(region_data, dict):
region_data['online'] = is_online
self.setItemData(i, region_data, Qt.ItemDataRole.UserRole)
break
self.update()
def check_status(self):
for region in self.regions:
region_id = region['id']
checker = QobuzStatusChecker(region_id)
checker.status_updated.connect(lambda status, rid=region_id: self.handle_status_update(rid, status))
checker.start()
self.status_checkers[region_id] = checker
def handle_status_update(self, region_id, is_online):
self.update_region_status(region_id, is_online)
self.status_updated.emit(region_id, is_online)
def currentData(self, role=Qt.ItemDataRole.UserRole + 1): def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
return super().currentData(role) return super().currentData(role)
@@ -300,7 +550,7 @@ class ServiceComboBox(QComboBox):
class SpotiFLACGUI(QWidget): class SpotiFLACGUI(QWidget):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.current_version = "2.2" self.current_version = "3.3"
self.tracks = [] self.tracks = []
self.reset_state() self.reset_state()
@@ -313,6 +563,8 @@ class SpotiFLACGUI(QWidget):
self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool) self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool)
self.use_fallback = self.settings.value('use_fallback', False, type=bool) self.use_fallback = self.settings.value('use_fallback', False, type=bool)
self.service = self.settings.value('service', 'amazon') self.service = self.settings.value('service', 'amazon')
self.qobuz_region = self.settings.value('qobuz_region', 'us')
self.timeout_value = self.settings.value('timeout_value', 30, type=int)
self.check_for_updates = self.settings.value('check_for_updates', True, type=bool) self.check_for_updates = self.settings.value('check_for_updates', True, type=bool)
self.elapsed_time = QTime(0, 0, 0) self.elapsed_time = QTime(0, 0, 0)
@@ -556,7 +808,7 @@ class SpotiFLACGUI(QWidget):
output_layout.setSpacing(5) output_layout.setSpacing(5)
output_label = QLabel('Output Directory') output_label = QLabel('Output Directory')
output_label.setStyleSheet("font-weight: bold; color: palette(text);") output_label.setStyleSheet("font-weight: bold;")
output_layout.addWidget(output_label) output_layout.addWidget(output_label)
output_dir_layout = QHBoxLayout() output_dir_layout = QHBoxLayout()
@@ -570,6 +822,7 @@ class SpotiFLACGUI(QWidget):
output_dir_layout.addWidget(self.output_dir) output_dir_layout.addWidget(self.output_dir)
output_dir_layout.addWidget(self.output_browse) output_dir_layout.addWidget(self.output_browse)
output_layout.addLayout(output_dir_layout) output_layout.addLayout(output_dir_layout)
settings_layout.addWidget(output_group) settings_layout.addWidget(output_group)
@@ -579,21 +832,18 @@ class SpotiFLACGUI(QWidget):
file_layout.setSpacing(5) file_layout.setSpacing(5)
file_label = QLabel('File Settings') file_label = QLabel('File Settings')
file_label.setStyleSheet("font-weight: bold; color: palette(text);") file_label.setStyleSheet("font-weight: bold;")
file_layout.addWidget(file_label) file_layout.addWidget(file_label)
format_layout = QHBoxLayout() format_layout = QHBoxLayout()
format_label = QLabel('Filename Format:') format_label = QLabel('Filename Format:')
format_label.setStyleSheet("color: palette(text);")
self.format_group = QButtonGroup(self) self.format_group = QButtonGroup(self)
self.title_artist_radio = QRadioButton('Title - Artist') self.title_artist_radio = QRadioButton('Title - Artist')
self.title_artist_radio.setStyleSheet("color: palette(text);")
self.title_artist_radio.setCursor(Qt.CursorShape.PointingHandCursor) self.title_artist_radio.setCursor(Qt.CursorShape.PointingHandCursor)
self.title_artist_radio.toggled.connect(self.save_filename_format) self.title_artist_radio.toggled.connect(self.save_filename_format)
self.artist_title_radio = QRadioButton('Artist - Title') self.artist_title_radio = QRadioButton('Artist - Title')
self.artist_title_radio.setStyleSheet("color: palette(text);")
self.artist_title_radio.setCursor(Qt.CursorShape.PointingHandCursor) self.artist_title_radio.setCursor(Qt.CursorShape.PointingHandCursor)
self.artist_title_radio.toggled.connect(self.save_filename_format) self.artist_title_radio.toggled.connect(self.save_filename_format)
@@ -614,14 +864,12 @@ class SpotiFLACGUI(QWidget):
checkbox_layout = QHBoxLayout() checkbox_layout = QHBoxLayout()
self.track_number_checkbox = QCheckBox('Add Track Numbers to Album Files') self.track_number_checkbox = QCheckBox('Add Track Numbers to Album Files')
self.track_number_checkbox.setStyleSheet("color: palette(text);")
self.track_number_checkbox.setCursor(Qt.CursorShape.PointingHandCursor) self.track_number_checkbox.setCursor(Qt.CursorShape.PointingHandCursor)
self.track_number_checkbox.setChecked(self.use_track_numbers) self.track_number_checkbox.setChecked(self.use_track_numbers)
self.track_number_checkbox.toggled.connect(self.save_track_numbering) self.track_number_checkbox.toggled.connect(self.save_track_numbering)
checkbox_layout.addWidget(self.track_number_checkbox) checkbox_layout.addWidget(self.track_number_checkbox)
self.album_subfolder_checkbox = QCheckBox('Create Album Subfolders for Playlist Downloads') self.album_subfolder_checkbox = QCheckBox('Create Album Subfolders for Playlist Downloads')
self.album_subfolder_checkbox.setStyleSheet("color: palette(text);")
self.album_subfolder_checkbox.setCursor(Qt.CursorShape.PointingHandCursor) self.album_subfolder_checkbox.setCursor(Qt.CursorShape.PointingHandCursor)
self.album_subfolder_checkbox.setChecked(self.use_album_subfolders) self.album_subfolder_checkbox.setChecked(self.use_album_subfolders)
self.album_subfolder_checkbox.toggled.connect(self.save_album_subfolder_setting) self.album_subfolder_checkbox.toggled.connect(self.save_album_subfolder_setting)
@@ -636,39 +884,67 @@ class SpotiFLACGUI(QWidget):
auth_layout = QVBoxLayout(auth_group) auth_layout = QVBoxLayout(auth_group)
auth_layout.setSpacing(5) auth_layout.setSpacing(5)
auth_label = QLabel('Lucida') auth_label = QLabel('Service Settings')
auth_label.setStyleSheet("font-weight: bold; color: palette(text);") auth_label.setStyleSheet("font-weight: bold;")
auth_layout.addWidget(auth_label) auth_layout.addWidget(auth_label)
service_fallback_layout = QHBoxLayout() service_fallback_layout = QHBoxLayout()
service_label = QLabel('Service:') service_label = QLabel('Service:')
service_label.setStyleSheet("color: palette(text);")
self.service_dropdown = ServiceComboBox() self.service_dropdown = ServiceComboBox()
self.service_dropdown.currentIndexChanged.connect(self.save_service_setting) self.service_dropdown.currentIndexChanged.connect(self.on_service_changed)
service_fallback_layout.addWidget(service_label) service_fallback_layout.addWidget(service_label)
service_fallback_layout.addWidget(self.service_dropdown) service_fallback_layout.addWidget(self.service_dropdown)
service_fallback_layout.addSpacing(20) service_fallback_layout.addSpacing(10)
self.fallback_checkbox = QCheckBox('Fallback') self.fallback_checkbox = QCheckBox('Fallback')
self.fallback_checkbox.setStyleSheet("color: palette(text);")
self.fallback_checkbox.setCursor(Qt.CursorShape.PointingHandCursor) self.fallback_checkbox.setCursor(Qt.CursorShape.PointingHandCursor)
self.fallback_checkbox.setChecked(self.use_fallback) self.fallback_checkbox.setChecked(self.use_fallback)
self.fallback_checkbox.toggled.connect(self.save_fallback_setting) self.fallback_checkbox.toggled.connect(self.save_fallback_setting)
service_fallback_layout.addWidget(self.fallback_checkbox) service_fallback_layout.addWidget(self.fallback_checkbox)
timeout_label = QLabel('Timeout:')
self.timeout_input = QLineEdit()
self.timeout_input.setText(str(self.timeout_value))
self.timeout_input.setFixedWidth(35)
self.timeout_input.textChanged.connect(self.save_timeout_setting)
service_fallback_layout.addWidget(timeout_label)
service_fallback_layout.addWidget(self.timeout_input)
region_label = QLabel('Region:')
self.qobuz_region_dropdown = QobuzRegionComboBox()
self.qobuz_region_dropdown.currentIndexChanged.connect(self.save_qobuz_region_setting)
service_fallback_layout.addWidget(region_label)
service_fallback_layout.addWidget(self.qobuz_region_dropdown)
region_label.hide()
self.qobuz_region_dropdown.hide()
service_fallback_layout.addStretch() service_fallback_layout.addStretch()
auth_layout.addLayout(service_fallback_layout) auth_layout.addLayout(service_fallback_layout)
settings_layout.addWidget(auth_group) settings_layout.addWidget(auth_group)
settings_layout.addStretch() settings_layout.addStretch()
settings_tab.setLayout(settings_layout) settings_tab.setLayout(settings_layout)
self.tab_widget.addTab(settings_tab, "Settings") self.tab_widget.addTab(settings_tab, "Settings")
for i in range(self.service_dropdown.count()):
if self.service_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == self.service:
self.service_dropdown.setCurrentIndex(i)
break
for i in range(self.qobuz_region_dropdown.count()):
if self.qobuz_region_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == self.qobuz_region:
self.qobuz_region_dropdown.setCurrentIndex(i)
break
self.qobuz_region_dropdown.status_updated.connect(
lambda region_id, is_online: self.service_dropdown.update_qobuz_status(region_id, is_online)
)
def setup_about_tab(self): def setup_about_tab(self):
about_tab = QWidget() about_tab = QWidget()
about_layout = QVBoxLayout() about_layout = QVBoxLayout()
@@ -678,7 +954,7 @@ class SpotiFLACGUI(QWidget):
sections = [ sections = [
("Check for Updates", "https://github.com/afkarxyz/SpotiFLAC/releases"), ("Check for Updates", "https://github.com/afkarxyz/SpotiFLAC/releases"),
("Report an Issue", "https://github.com/afkarxyz/SpotiFLAC/issues"), ("Report an Issue", "https://github.com/afkarxyz/SpotiFLAC/issues"),
("Lucida Site", "https://lucida.to/stats") ("Lucida Status", "https://status.lucida.to")
] ]
for title, url in sections: for title, url in sections:
@@ -719,13 +995,60 @@ class SpotiFLACGUI(QWidget):
spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed) spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
about_layout.addItem(spacer) about_layout.addItem(spacer)
footer_label = QLabel("v2.2 | March 2025") footer_label = QLabel("v3.3 | June 2025")
footer_label.setStyleSheet("font-size: 12px; color: palette(text); margin-top: 10px;") footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;")
about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter) about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter)
about_tab.setLayout(about_layout) about_tab.setLayout(about_layout)
self.tab_widget.addTab(about_tab, "About") self.tab_widget.addTab(about_tab, "About")
def on_service_changed(self, index):
service = self.service_dropdown.currentData()
self.service = service
self.settings.setValue('service', service)
self.settings.sync()
timeout_label = None
for widget in self.timeout_input.parentWidget().children():
if isinstance(widget, QLabel) and widget.text() == "Timeout:":
timeout_label = widget
break
region_label = None
for widget in self.qobuz_region_dropdown.parentWidget().children():
if isinstance(widget, QLabel) and widget.text() == "Region:":
region_label = widget
break
if service == "qobuz":
self.fallback_checkbox.hide()
self.timeout_input.hide()
if timeout_label:
timeout_label.hide()
if region_label:
region_label.show()
self.qobuz_region_dropdown.show()
elif service == "tidal_api":
self.fallback_checkbox.hide()
self.timeout_input.hide()
if timeout_label:
timeout_label.hide()
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
else:
self.fallback_checkbox.show()
self.timeout_input.show()
if timeout_label:
timeout_label.show()
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}")
def save_url(self): def save_url(self):
self.settings.setValue('spotify_url', self.spotify_url.text().strip()) self.settings.setValue('spotify_url', self.spotify_url.text().strip())
self.settings.sync() self.settings.sync()
@@ -751,12 +1074,27 @@ class SpotiFLACGUI(QWidget):
self.settings.sync() self.settings.sync()
self.log_output.append("Fallback setting saved successfully!") self.log_output.append("Fallback setting saved successfully!")
def save_service_setting(self): def save_timeout_setting(self):
service = self.service_dropdown.currentData() try:
self.service = service timeout = int(self.timeout_input.text())
self.settings.setValue('service', service) if timeout > 0:
self.timeout_value = timeout
self.settings.setValue('timeout_value', self.timeout_value)
self.settings.sync()
self.log_output.append(f"Timeout setting saved: {self.timeout_value} seconds")
else:
self.timeout_input.setText(str(self.timeout_value))
self.log_output.append("Timeout must be a positive number")
except ValueError:
self.timeout_input.setText(str(self.timeout_value))
self.log_output.append("Timeout must be a valid number")
def save_qobuz_region_setting(self):
region = self.qobuz_region_dropdown.currentData()
self.qobuz_region = region
self.settings.setValue('qobuz_region', region)
self.settings.sync() self.settings.sync()
self.log_output.append(f"Service setting saved: {self.service_dropdown.currentText()}") self.log_output.append(f"Qobuz region setting saved: {self.qobuz_region_dropdown.currentText()}")
def save_settings(self): def save_settings(self):
self.settings.setValue('output_path', self.output_dir.text().strip()) self.settings.setValue('output_path', self.output_dir.text().strip())
@@ -778,11 +1116,20 @@ class SpotiFLACGUI(QWidget):
self.reset_state() self.reset_state()
self.reset_ui() self.reset_ui()
metadata = get_filtered_data(url) self.log_output.append('Just a moment. Fetching metadata...')
if "error" in metadata: self.tab_widget.setCurrentWidget(self.process_tab)
raise Exception(metadata["error"])
self.metadata_worker = MetadataFetchWorker(url)
url_info = parse_uri(url) self.metadata_worker.finished.connect(self.on_metadata_fetched)
self.metadata_worker.error.connect(self.on_metadata_error)
self.metadata_worker.start()
except Exception as e:
self.log_output.append(f'Error: Failed to start metadata fetch: {str(e)}')
def on_metadata_fetched(self, metadata):
try:
url_info = parse_uri(self.spotify_url.text().strip())
if url_info["type"] == "track": if url_info["type"] == "track":
self.handle_track_metadata(metadata["track"]) self.handle_track_metadata(metadata["track"])
@@ -793,11 +1140,11 @@ class SpotiFLACGUI(QWidget):
self.update_button_states() self.update_button_states()
self.tab_widget.setCurrentIndex(0) self.tab_widget.setCurrentIndex(0)
except SpotifyInvalidUrlException as e:
self.log_output.append(f'Error: {str(e)}')
except Exception as e: except Exception as e:
self.log_output.append(f'Error: Failed to fetch metadata: {str(e)}') self.log_output.append(f'Error: {str(e)}')
def on_metadata_error(self, error_message):
self.log_output.append(f'Error: {error_message}')
def handle_track_metadata(self, track_data): def handle_track_metadata(self, track_data):
track_id = track_data["external_urls"].split("/")[-1] track_id = track_data["external_urls"].split("/")[-1]
@@ -809,7 +1156,8 @@ class SpotiFLACGUI(QWidget):
album=track_data["album_name"], album=track_data["album_name"],
track_number=1, track_number=1,
duration_ms=track_data.get("duration_ms", 0), duration_ms=track_data.get("duration_ms", 0),
id=track_id id=track_id,
isrc=track_data.get("isrc", "")
)] )]
self.is_single_track = True self.is_single_track = True
self.is_album = self.is_playlist = False self.is_album = self.is_playlist = False
@@ -838,7 +1186,8 @@ class SpotiFLACGUI(QWidget):
album=self.album_or_playlist_name, album=self.album_or_playlist_name,
track_number=track["track_number"], track_number=track["track_number"],
duration_ms=track.get("duration_ms", 0), duration_ms=track.get("duration_ms", 0),
id=track_id id=track_id,
isrc=track.get("isrc", "")
)) ))
self.is_album = True self.is_album = True
@@ -867,7 +1216,8 @@ class SpotiFLACGUI(QWidget):
album=track["album_name"], album=track["album_name"],
track_number=len(self.tracks) + 1, track_number=len(self.tracks) + 1,
duration_ms=track.get("duration_ms", 0), duration_ms=track.get("duration_ms", 0),
id=track_id id=track_id,
isrc=track.get("isrc", "")
)) ))
self.is_playlist = True self.is_playlist = True
@@ -911,10 +1261,21 @@ class SpotiFLACGUI(QWidget):
self.followers_label.hide() self.followers_label.hide()
if metadata.get('releaseDate'): if metadata.get('releaseDate'):
release_date = datetime.strptime(metadata['releaseDate'], "%Y-%m-%d") try:
formatted_date = release_date.strftime("%d-%m-%Y") release_date = metadata['releaseDate']
self.release_date_label.setText(f"<b>Released</b> {formatted_date}") if len(release_date) == 4:
self.release_date_label.show() date_obj = datetime.strptime(release_date, "%Y")
elif len(release_date) == 7:
date_obj = datetime.strptime(release_date, "%Y-%m")
else:
date_obj = datetime.strptime(release_date, "%Y-%m-%d")
formatted_date = date_obj.strftime("%d-%m-%Y")
self.release_date_label.setText(f"<b>Released</b> {formatted_date}")
self.release_date_label.show()
except ValueError:
self.release_date_label.setText(f"<b>Released</b> {metadata['releaseDate']}")
self.release_date_label.show()
else: else:
self.release_date_label.hide() self.release_date_label.hide()
@@ -994,7 +1355,8 @@ class SpotiFLACGUI(QWidget):
def download_tracks(self, indices): def download_tracks(self, indices):
self.log_output.clear() self.log_output.clear()
outpath = self.output_dir.text() raw_outpath = self.output_dir.text().strip()
outpath = os.path.normpath(raw_outpath)
if not os.path.exists(outpath): if not os.path.exists(outpath):
self.log_output.append('Warning: Invalid output directory.') self.log_output.append('Warning: Invalid output directory.')
return return
@@ -1002,7 +1364,8 @@ class SpotiFLACGUI(QWidget):
tracks_to_download = self.tracks if self.is_single_track else [self.tracks[i] for i in indices] tracks_to_download = self.tracks if self.is_single_track else [self.tracks[i] for i in indices]
if self.is_album or self.is_playlist: if self.is_album or self.is_playlist:
folder_name = re.sub(r'[<>:"/\\|?*]', '_', self.album_or_playlist_name) name = self.album_or_playlist_name.strip()
folder_name = re.sub(r'[<>:"/\\|?*]', '_', name)
outpath = os.path.join(outpath, folder_name) outpath = os.path.join(outpath, folder_name)
os.makedirs(outpath, exist_ok=True) os.makedirs(outpath, exist_ok=True)
@@ -1013,6 +1376,7 @@ class SpotiFLACGUI(QWidget):
def start_download_worker(self, tracks_to_download, outpath): def start_download_worker(self, tracks_to_download, outpath):
service = self.service_dropdown.currentData() service = self.service_dropdown.currentData()
qobuz_region = self.qobuz_region_dropdown.currentData() if service == "qobuz" else "us"
self.worker = DownloadWorker( self.worker = DownloadWorker(
tracks_to_download, tracks_to_download,
@@ -1025,7 +1389,9 @@ class SpotiFLACGUI(QWidget):
self.use_track_numbers, self.use_track_numbers,
self.use_album_subfolders, self.use_album_subfolders,
self.use_fallback, self.use_fallback,
service service,
self.timeout_value,
qobuz_region
) )
self.worker.finished.connect(self.on_download_finished) self.worker.finished.connect(self.on_download_finished)
self.worker.progress.connect(self.update_progress) self.worker.progress.connect(self.update_progress)
@@ -1044,9 +1410,28 @@ class SpotiFLACGUI(QWidget):
self.tab_widget.setCurrentWidget(self.process_tab) self.tab_widget.setCurrentWidget(self.process_tab)
def update_progress(self, message, percentage): def update_progress(self, message, percentage):
self.log_output.append(message) if "Download progress:" in message or "Processing metadata..." in message:
self.log_output.moveCursor(QTextCursor.MoveOperation.End) current_text = self.log_output.toPlainText()
if percentage > 0:
if current_text:
lines = current_text.split('\n')
if "Download progress:" in lines[-1] or "Processing metadata..." in lines[-1]:
lines[-1] = message
new_text = '\n'.join(lines)
self.log_output.setPlainText(new_text)
self.log_output.moveCursor(QTextCursor.MoveOperation.End)
else:
self.log_output.append(message)
else:
self.log_output.append(message)
else:
self.log_output.append(message)
if percentage > 0 and not "Download progress:" in message:
self.progress_bar.setValue(percentage) self.progress_bar.setValue(percentage)
def stop_download(self): def stop_download(self):
@@ -1121,6 +1506,16 @@ class SpotiFLACGUI(QWidget):
self.time_label.hide() self.time_label.hide()
if __name__ == '__main__': if __name__ == '__main__':
try:
if sys.platform == "win32":
import os
os.system("chcp 65001 > nul")
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
except Exception as e:
print(f"Warning: Could not set UTF-8 encoding: {e}")
app = QApplication(sys.argv) app = QApplication(sys.argv)
ex = SpotiFLACGUI() ex = SpotiFLACGUI()
ex.show() ex.show()
+219 -60
View File
@@ -5,7 +5,7 @@ import json
import hmac import hmac
import time import time
import hashlib import hashlib
from typing import Tuple, Callable from typing import Tuple, Callable, Dict, Any, List
_TOTP_SECRET = bytearray([53,53,48,55,49,52,53,56,53,51,52,56,55,52,57,57,53,57,50,50,52,56,54,51,48,51,50,57,51,52,55]) _TOTP_SECRET = bytearray([53,53,48,55,49,52,53,56,53,51,52,56,55,52,57,57,53,57,50,50,52,56,54,51,48,51,50,57,51,52,55])
@@ -32,7 +32,7 @@ def generate_totp(
counter * 30_000, counter * 30_000,
) )
token_url = 'https://open.spotify.com/get_access_token' token_url = 'https://open.spotify.com/api/token'
playlist_base_url = 'https://api.spotify.com/v1/playlists/{}' playlist_base_url = 'https://api.spotify.com/v1/playlists/{}'
album_base_url = 'https://api.spotify.com/v1/albums/{}' album_base_url = 'https://api.spotify.com/v1/albums/{}'
track_base_url = 'https://api.spotify.com/v1/tracks/{}' track_base_url = 'https://api.spotify.com/v1/tracks/{}'
@@ -100,9 +100,7 @@ def get_json_from_api(api_url, access_token):
return req.json() return req.json()
def get_raw_spotify_data(spotify_url): def get_access_token():
url_info = parse_uri(spotify_url)
try: try:
totp, timestamp = generate_totp() totp, timestamp = generate_totp()
@@ -117,34 +115,116 @@ def get_raw_spotify_data(spotify_url):
req = requests.get(token_url, headers=headers, params=params, timeout=10) req = requests.get(token_url, headers=headers, params=params, timeout=10)
if req.status_code != 200: if req.status_code != 200:
return {"error": f"Failed to get access token. Status code: {req.status_code}"} return {"error": f"Failed to get access token. Status code: {req.status_code}"}
token = req.json() return req.json()
except Exception as e: except Exception as e:
return {"error": f"Failed to get access token: {str(e)}"} return {"error": f"Failed to get access token: {str(e)}"}
def fetch_tracks_in_batches(url: str, access_token: str, batch_size: int = 100, delay: float = 1.0) -> Tuple[List[Dict[str, Any]], int]:
all_tracks = []
current_batch = 0
while url:
print(f"Batch : {current_batch}")
url_parts = url.split("offset=")
if len(url_parts) > 1:
offset_part = url_parts[1].split("&")[0]
print(f"Offset : {offset_part}")
print("-------------")
track_data = get_json_from_api(url, access_token)
if not track_data:
break
items = track_data.get('items', [])
all_tracks.extend(items)
url = track_data.get('next')
if url and "&locale=" in url:
url = url.split("&locale=")[0]
if url and delay > 0:
sleep(delay)
current_batch += 1
return all_tracks, current_batch
def get_raw_spotify_data(spotify_url, batch: bool = False, delay: float = 1.0):
url_info = parse_uri(spotify_url)
token = get_access_token()
if "error" in token:
return token
access_token = token["accessToken"]
raw_data = {} raw_data = {}
if url_info['type'] == "playlist": if url_info['type'] == "playlist":
try: try:
playlist_data = get_json_from_api( playlist_data = get_json_from_api(
playlist_base_url.format(url_info["id"]), playlist_base_url.format(url_info["id"]),
token["accessToken"] access_token
) )
if not playlist_data: if not playlist_data:
return {"error": "Failed to get playlist data"} return {"error": "Failed to get playlist data"}
raw_data = playlist_data raw_data = playlist_data
total_tracks = playlist_data.get('tracks', {}).get('total', 0)
tracks = [] if batch:
tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100' tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100'
while tracks_url: tracks, num_batches = fetch_tracks_in_batches(tracks_url, access_token, 100, delay)
track_data = get_json_from_api(tracks_url, token["accessToken"]) raw_data['tracks']['items'] = tracks
if not track_data: raw_data['_batch_count'] = num_batches
break raw_data['_batch_enabled'] = True
tracks.extend(track_data['items']) if len(tracks) < total_tracks:
tracks_url = track_data.get('next') last_offset = len(tracks)
remaining_tracks = []
while last_offset < total_tracks:
print(f"Batch : {num_batches}")
print(f"Offset : {last_offset}")
print("-------------")
remainder_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?offset={last_offset}&limit=100'
track_data = get_json_from_api(remainder_url, access_token)
if not track_data or not track_data.get('items'):
break
items = track_data.get('items', [])
remaining_tracks.extend(items)
if len(items) < 100:
break
last_offset += len(items)
num_batches += 1
if delay > 0:
sleep(delay)
tracks.extend(remaining_tracks)
raw_data['tracks']['items'] = tracks
raw_data['_batch_count'] = num_batches
else:
tracks = []
tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100'
while tracks_url:
track_data = get_json_from_api(tracks_url, access_token)
if not track_data:
break
tracks.extend(track_data['items'])
tracks_url = track_data.get('next')
if tracks_url and "&locale=" in tracks_url:
tracks_url = tracks_url.split("&locale=")[0]
raw_data['tracks']['items'] = tracks
raw_data['_batch_enabled'] = False
raw_data['tracks']['items'] = tracks
except Exception as e: except Exception as e:
return {"error": f"Failed to get playlist data: {str(e)}"} return {"error": f"Failed to get playlist data: {str(e)}"}
@@ -152,25 +232,68 @@ def get_raw_spotify_data(spotify_url):
try: try:
album_data = get_json_from_api( album_data = get_json_from_api(
album_base_url.format(url_info["id"]), album_base_url.format(url_info["id"]),
token["accessToken"] access_token
) )
if not album_data: if not album_data:
return {"error": "Failed to get album data"} return {"error": "Failed to get album data"}
album_data['_token'] = token["accessToken"] album_data['_token'] = access_token
raw_data = album_data raw_data = album_data
total_tracks = album_data.get('total_tracks', 0)
tracks = [] if batch:
tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50' tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50'
while tracks_url: tracks, num_batches = fetch_tracks_in_batches(tracks_url, access_token, 50, delay)
track_data = get_json_from_api(tracks_url, token["accessToken"]) raw_data['tracks']['items'] = tracks
if not track_data: raw_data['_batch_count'] = num_batches
break raw_data['_batch_enabled'] = True
tracks.extend(track_data['items']) if len(tracks) < total_tracks:
tracks_url = track_data.get('next') last_offset = len(tracks)
remaining_tracks = []
while last_offset < total_tracks:
print(f"Batch : {num_batches}")
print(f"Offset : {last_offset}")
print("-------------")
remainder_url = f'{album_base_url.format(url_info["id"])}/tracks?offset={last_offset}&limit=50'
track_data = get_json_from_api(remainder_url, access_token)
if not track_data or not track_data.get('items'):
break
items = track_data.get('items', [])
remaining_tracks.extend(items)
if len(items) < 50:
break
last_offset += len(items)
num_batches += 1
if delay > 0:
sleep(delay)
tracks.extend(remaining_tracks)
raw_data['tracks']['items'] = tracks
raw_data['_batch_count'] = num_batches
else:
tracks = []
tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50'
while tracks_url:
track_data = get_json_from_api(tracks_url, access_token)
if not track_data:
break
tracks.extend(track_data['items'])
tracks_url = track_data.get('next')
if tracks_url and "&locale=" in tracks_url:
tracks_url = tracks_url.split("&locale=")[0]
raw_data['tracks']['items'] = tracks
raw_data['_batch_enabled'] = False
raw_data['tracks']['items'] = tracks
except Exception as e: except Exception as e:
return {"error": f"Failed to get album data: {str(e)}"} return {"error": f"Failed to get album data: {str(e)}"}
@@ -178,7 +301,7 @@ def get_raw_spotify_data(spotify_url):
try: try:
track_data = get_json_from_api( track_data = get_json_from_api(
track_base_url.format(url_info["id"]), track_base_url.format(url_info["id"]),
token["accessToken"] access_token
) )
if not track_data: if not track_data:
return {"error": "Failed to get track data"} return {"error": "Failed to get track data"}
@@ -191,10 +314,12 @@ def get_raw_spotify_data(spotify_url):
def format_track_data(track_data): def format_track_data(track_data):
artists = [] artists = []
for artist in track_data['artists']: for artist in track_data.get('artists', []):
artists.append(artist['name']) artists.append(artist['name'])
image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') if track_data.get('album', {}).get('images') else ''
isrc = track_data.get('external_ids', {}).get('isrc', '')
return { return {
"track": { "track": {
@@ -205,22 +330,37 @@ def format_track_data(track_data):
"images": image_url, "images": image_url,
"release_date": track_data.get('album', {}).get('release_date', ''), "release_date": track_data.get('album', {}).get('release_date', ''),
"track_number": track_data.get('track_number', 0), "track_number": track_data.get('track_number', 0),
"external_urls": track_data.get('external_urls', {}).get('spotify', '') "external_urls": track_data.get('external_urls', {}).get('spotify', ''),
"isrc": isrc
} }
} }
def format_album_data(album_data): def format_album_data(album_data):
artists = [] artists = []
for artist in album_data['artists']: for artist in album_data.get('artists', []):
artists.append(artist['name']) artists.append(artist['name'])
image_url = album_data.get('images', [{}])[0].get('url', '') image_url = album_data.get('images', [{}])[0].get('url', '') if album_data.get('images') else ''
track_list = [] track_list = []
for track in album_data.get('tracks', {}).get('items', []): for track in album_data.get('tracks', {}).get('items', []):
track_artists = [] track_artists = []
for artist in track.get('artists', []): for artist in track.get('artists', []):
track_artists.append(artist['name']) track_artists.append(artist['name'])
track_id = track.get('id', '')
track_isrc = ''
if track_id and album_data.get('_token'):
try:
full_track_data = get_json_from_api(
track_base_url.format(track_id),
album_data.get('_token')
)
if full_track_data:
track_isrc = full_track_data.get('external_ids', {}).get('isrc', '')
except:
pass
track_list.append({ track_list.append({
"artists": ", ".join(track_artists), "artists": ", ".join(track_artists),
@@ -230,31 +370,44 @@ def format_album_data(album_data):
"images": image_url, "images": image_url,
"release_date": album_data.get('release_date', ''), "release_date": album_data.get('release_date', ''),
"track_number": track.get('track_number', 0), "track_number": track.get('track_number', 0),
"external_urls": track.get('external_urls', {}).get('spotify', '') "external_urls": track.get('external_urls', {}).get('spotify', ''),
"isrc": track_isrc
}) })
album_info = {
"total_tracks": album_data.get('total_tracks', 0),
"name": album_data.get('name', ''),
"release_date": album_data.get('release_date', ''),
"artists": ", ".join(artists),
"images": image_url
}
if album_data.get('_batch_enabled', False):
album_info["batch"] = f"{album_data.get('_batch_count', 1)}"
return { return {
"album_info": { "album_info": album_info,
"total_tracks": album_data.get('total_tracks', 0),
"name": album_data.get('name', ''),
"release_date": album_data.get('release_date', ''),
"artists": ", ".join(artists),
"images": image_url
},
"track_list": track_list "track_list": track_list
} }
def format_playlist_data(playlist_data): def format_playlist_data(playlist_data):
image_url = playlist_data.get('images', [{}])[0].get('url', '') image_url = playlist_data.get('images', [{}])[0].get('url', '') if playlist_data.get('images') else ''
track_list = [] track_list = []
for item in playlist_data.get('tracks', {}).get('items', []): for item in playlist_data.get('tracks', {}).get('items', []):
track = item.get('track', {}) track = item.get('track', {})
if not track:
continue
artists = [] artists = []
for artist in track.get('artists', []): for artist in track.get('artists', []):
artists.append(artist['name']) artists.append(artist['name'])
track_image = track.get('album', {}).get('images', [{}])[0].get('url', '') track_image = ''
if track.get('album', {}).get('images'):
track_image = track.get('album', {}).get('images', [{}])[0].get('url', '')
track_isrc = track.get('external_ids', {}).get('isrc', '')
track_list.append({ track_list.append({
"artists": ", ".join(artists), "artists": ", ".join(artists),
@@ -264,19 +417,25 @@ def format_playlist_data(playlist_data):
"images": track_image, "images": track_image,
"release_date": track.get('album', {}).get('release_date', ''), "release_date": track.get('album', {}).get('release_date', ''),
"track_number": track.get('track_number', 0), "track_number": track.get('track_number', 0),
"external_urls": track.get('external_urls', {}).get('spotify', '') "external_urls": track.get('external_urls', {}).get('spotify', ''),
"isrc": track_isrc
}) })
playlist_info = {
"tracks": {"total": playlist_data.get('tracks', {}).get('total', 0)},
"followers": {"total": playlist_data.get('followers', {}).get('total', 0)},
"owner": {
"display_name": playlist_data.get('owner', {}).get('display_name', ''),
"name": playlist_data.get('name', ''),
"images": image_url
}
}
if playlist_data.get('_batch_enabled', False):
playlist_info["batch"] = f"{playlist_data.get('_batch_count', 1)}"
return { return {
"playlist_info": { "playlist_info": playlist_info,
"tracks": {"total": playlist_data.get('tracks', {}).get('total', 0)},
"followers": {"total": playlist_data.get('followers', {}).get('total', 0)},
"owner": {
"display_name": playlist_data.get('owner', {}).get('display_name', ''),
"name": playlist_data.get('name', ''),
"images": image_url
}
},
"track_list": track_list "track_list": track_list
} }
@@ -296,8 +455,8 @@ def process_spotify_data(raw_data, data_type):
except Exception as e: except Exception as e:
return {"error": f"Error processing data: {str(e)}"} return {"error": f"Error processing data: {str(e)}"}
def get_filtered_data(spotify_url): def get_filtered_data(spotify_url, batch=False, delay=1.0):
raw_data = get_raw_spotify_data(spotify_url) raw_data = get_raw_spotify_data(spotify_url, batch=batch, delay=delay)
if raw_data and "error" not in raw_data: if raw_data and "error" not in raw_data:
url_info = parse_uri(spotify_url) url_info = parse_uri(spotify_url)
filtered_data = process_spotify_data(raw_data, url_info['type']) filtered_data = process_spotify_data(raw_data, url_info['type'])
@@ -306,10 +465,10 @@ def get_filtered_data(spotify_url):
if __name__ == '__main__': if __name__ == '__main__':
playlist = "https://open.spotify.com/playlist/37i9dQZEVXbNG2KDcFcKOF" playlist = "https://open.spotify.com/playlist/37i9dQZEVXbNG2KDcFcKOF"
album = "https://open.spotify.com/album/7kFyd5oyJdVX2pIi6P4iHE" album = "https://open.spotify.com/album/6J84szYCnMfzEcvIcfWMFL"
song = "https://open.spotify.com/track/4wJ5Qq0jBN4ajy7ouZIV1c" song = "https://open.spotify.com/track/7so0lgd0zP2Sbgs2d7a1SZ"
filtered_playlist = get_filtered_data(playlist) filtered_playlist = get_filtered_data(playlist, batch=True, delay=0.1)
print(json.dumps(filtered_playlist, indent=2)) print(json.dumps(filtered_playlist, indent=2))
filtered_album = get_filtered_data(album) filtered_album = get_filtered_data(album)
+829 -77
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -1,3 +1,3 @@
{ {
"version": "2.1" "version": "3.2"
} }