Compare commits

...

6 Commits

Author SHA1 Message Date
afkarxyz 1f0922f358 v4.0 2025-07-22 07:54:03 +07:00
afkarxyz 3f267a3fa1 v3.9.5 2025-07-22 07:42:53 +07:00
afkarxyz 22da74a027 v3.9 2025-07-21 17:33:39 +07:00
afkarxyz 783350fe88 v3.9 2025-07-21 17:28:44 +07:00
afkarxyz 0057d43f46 v3.8 2025-07-14 13:20:14 +07:00
afkarxyz 9928968ffb v3.7 2025-07-13 05:30:59 +07:00
6 changed files with 509 additions and 76 deletions
+1 -1
View File
@@ -6,7 +6,7 @@
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Qobuz & Tidal. <b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Qobuz & Tidal.
</div> </div>
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v3.6/SpotiFLAC.exe) ### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v3.9/SpotiFLAC.exe)
## Screenshots ## Screenshots
+157 -43
View File
@@ -4,6 +4,7 @@ from dataclasses import dataclass
from datetime import datetime from datetime import datetime
import requests import requests
import re import re
import asyncio
from packaging import version from packaging import version
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
@@ -19,6 +20,7 @@ from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkRepl
from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException
from qobuzDL import QobuzDownloader from qobuzDL import QobuzDownloader
from tidalDL import TidalDownloader from tidalDL import TidalDownloader
from deezerDL import DeezerDownloader
@dataclass @dataclass
class Track: class Track:
@@ -56,7 +58,7 @@ class DownloadWorker(QThread):
progress = pyqtSignal(str, int) progress = pyqtSignal(str, int)
def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False, def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False,
album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True, album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True,
use_album_subfolders=False, service="tidal", qobuz_region="us"): use_album_subfolders=False, service="tidal", qobuz_region="us", deezer_speed=5):
super().__init__() super().__init__()
self.tracks = tracks self.tracks = tracks
self.outpath = outpath self.outpath = outpath
@@ -69,6 +71,7 @@ class DownloadWorker(QThread):
self.use_album_subfolders = use_album_subfolders self.use_album_subfolders = use_album_subfolders
self.service = service self.service = service
self.qobuz_region = qobuz_region self.qobuz_region = qobuz_region
self.deezer_speed = deezer_speed
self.is_paused = False self.is_paused = False
self.is_stopped = False self.is_stopped = False
self.failed_tracks = [] self.failed_tracks = []
@@ -76,6 +79,8 @@ class DownloadWorker(QThread):
def get_formatted_filename(self, track): def get_formatted_filename(self, track):
if self.filename_format == "artist_title": if self.filename_format == "artist_title":
filename = f"{track.artists} - {track.title}.flac" filename = f"{track.artists} - {track.title}.flac"
elif self.filename_format == "title_only":
filename = f"{track.title}.flac"
else: else:
filename = f"{track.title} - {track.artists}.flac" filename = f"{track.title} - {track.artists}.flac"
return re.sub(r'[<>:"/\\|?*]', '_', filename) return re.sub(r'[<>:"/\\|?*]', '_', filename)
@@ -86,6 +91,8 @@ class DownloadWorker(QThread):
downloader = QobuzDownloader(self.qobuz_region) downloader = QobuzDownloader(self.qobuz_region)
elif self.service == "tidal": elif self.service == "tidal":
downloader = TidalDownloader() downloader = TidalDownloader()
elif self.service == "deezer":
downloader = DeezerDownloader()
else: else:
downloader = TidalDownloader() downloader = TidalDownloader()
@@ -160,8 +167,6 @@ class DownloadWorker(QThread):
continue continue
self.progress.emit(f"Searching and downloading from Tidal for ISRC: {track.isrc} - {track.title} - {track.artists}", 0) self.progress.emit(f"Searching and downloading from Tidal for ISRC: {track.isrc} - {track.title} - {track.artists}", 0)
import asyncio
is_paused_callback = lambda: self.is_paused is_paused_callback = lambda: self.is_paused
is_stopped_callback = lambda: self.is_stopped is_stopped_callback = lambda: self.is_stopped
@@ -196,12 +201,44 @@ class DownloadWorker(QThread):
else: else:
downloaded_file = None downloaded_file = None
raise Exception(f"Tidal download failed or returned unexpected result: {download_result_details}") raise Exception(f"Tidal download failed or returned unexpected result: {download_result_details}")
elif self.service == "deezer":
if not track.isrc:
self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0)
self.failed_tracks.append((track.title, track.artists, "No ISRC available"))
continue
self.progress.emit(f"Downloading from Deezer with ISRC: {track.isrc}", 0)
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
success = loop.run_until_complete(downloader.download_by_isrc(track.isrc, track_outpath, self.deezer_speed))
if success:
safe_title = "".join(c for c in track.title if c.isalnum() or c in (' ', '-', '_')).rstrip()
safe_artist = "".join(c for c in track.artists if c.isalnum() or c in (' ', '-', '_')).rstrip()
expected_filename = f"{safe_artist} - {safe_title}.flac"
downloaded_file = os.path.join(track_outpath, expected_filename)
if not os.path.exists(downloaded_file):
import glob
flac_files = glob.glob(os.path.join(track_outpath, "*.flac"))
if flac_files:
downloaded_file = max(flac_files, key=os.path.getctime)
else:
raise Exception("Downloaded file not found")
else:
raise Exception("Deezer download failed")
else: else:
track_id = track.id track_id = track.id
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0) self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0)
import asyncio
try: try:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
if loop.is_closed(): if loop.is_closed():
@@ -223,21 +260,25 @@ class DownloadWorker(QThread):
is_paused_callback=is_paused_callback, is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback is_stopped_callback=is_stopped_callback
) )
if self.is_stopped: if self.is_stopped:
return return
if downloaded_file == new_filepath: if downloaded_file and os.path.exists(downloaded_file):
self.progress.emit(f"File already exists: {new_filename}", 0) if downloaded_file == new_filepath:
self.progress.emit(f"Skipped: {track.title} - {track.artists}", self.progress.emit(f"File already exists: {new_filename}", 0)
int((i + 1) / total_tracks * 100)) self.progress.emit(f"Skipped: {track.title} - {track.artists}",
continue int((i + 1) / total_tracks * 100))
continue
if os.path.exists(downloaded_file) and downloaded_file != new_filepath: if downloaded_file != new_filepath:
if os.path.exists(new_filepath): try:
os.remove(new_filepath) os.rename(downloaded_file, new_filepath)
os.rename(downloaded_file, new_filepath) self.progress.emit(f"File renamed to: {new_filename}", 0)
self.progress.emit(f"File renamed to: {new_filename}", 0) except OSError as e:
self.progress.emit(f"Warning: Could not rename file {downloaded_file} to {new_filepath}: {str(e)}", 0)
pass
else:
raise Exception(f"Download failed or file not found: {downloaded_file}")
self.progress.emit(f"Successfully downloaded: {track.title} - {track.artists}", self.progress.emit(f"Successfully downloaded: {track.title} - {track.artists}",
int((i + 1) / total_tracks * 100)) int((i + 1) / total_tracks * 100))
@@ -334,6 +375,19 @@ class QobuzStatusChecker(QThread):
self.error.emit(f"Error checking Qobuz status: {str(e)}") self.error.emit(f"Error checking Qobuz status: {str(e)}")
self.status_updated.emit(False) self.status_updated.emit(False)
class DeezerStatusChecker(QThread):
status_updated = pyqtSignal(bool)
error = pyqtSignal(str)
def run(self):
try:
response = requests.get("https://deezmate.com/", timeout=5)
is_online = response.status_code == 200
self.status_updated.emit(is_online)
except Exception as e:
self.error.emit(f"Error checking Deezer status: {str(e)}")
self.status_updated.emit(False)
class StatusIndicatorDelegate(QStyledItemDelegate): class StatusIndicatorDelegate(QStyledItemDelegate):
def paint(self, painter, option, index): def paint(self, painter, option, index):
item_data = index.data(Qt.ItemDataRole.UserRole) item_data = index.data(Qt.ItemDataRole.UserRole)
@@ -371,12 +425,22 @@ class ServiceComboBox(QComboBox):
self.tidal_status_timer.timeout.connect(self.refresh_tidal_status) self.tidal_status_timer.timeout.connect(self.refresh_tidal_status)
self.tidal_status_timer.start(6000) self.tidal_status_timer.start(6000)
self.deezer_status_checker = DeezerStatusChecker()
self.deezer_status_checker.status_updated.connect(self.update_deezer_service_status)
self.deezer_status_checker.error.connect(lambda e: print(f"Deezer status check error: {e}"))
self.deezer_status_checker.start()
self.deezer_status_timer = QTimer(self)
self.deezer_status_timer.timeout.connect(self.refresh_deezer_status)
self.deezer_status_timer.start(6000)
def setup_items(self): def setup_items(self):
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
self.services = [ self.services = [
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False}, {'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False},
{'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False} {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False},
{'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False}
] ]
for service in self.services: for service in self.services:
@@ -412,6 +476,23 @@ class ServiceComboBox(QComboBox):
self.tidal_status_checker.error.connect(lambda e: print(f"Tidal status check error: {e}")) self.tidal_status_checker.error.connect(lambda e: print(f"Tidal status check error: {e}"))
self.tidal_status_checker.start() self.tidal_status_checker.start()
def update_deezer_service_status(self, is_online):
for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id == 'deezer':
service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict):
service_data['online'] = is_online
self.setItemData(i, service_data, Qt.ItemDataRole.UserRole)
break
self.update()
def refresh_deezer_status(self):
self.deezer_status_checker = DeezerStatusChecker()
self.deezer_status_checker.status_updated.connect(self.update_deezer_service_status)
self.deezer_status_checker.error.connect(lambda e: print(f"Deezer status check error: {e}"))
self.deezer_status_checker.start()
def currentData(self, role=Qt.ItemDataRole.UserRole + 1): def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
return super().currentData(role) return super().currentData(role)
@@ -503,7 +584,7 @@ class QobuzRegionComboBox(QComboBox):
class SpotiFLACGUI(QWidget): class SpotiFLACGUI(QWidget):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.current_version = "3.7" self.current_version = "4.0"
self.tracks = [] self.tracks = []
self.reset_state() self.reset_state()
@@ -516,6 +597,7 @@ class SpotiFLACGUI(QWidget):
self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool) self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool)
self.service = self.settings.value('service', 'tidal') self.service = self.settings.value('service', 'tidal')
self.qobuz_region = self.settings.value('qobuz_region', 'us') self.qobuz_region = self.settings.value('qobuz_region', 'us')
self.deezer_speed = self.settings.value('deezer_speed', 7.5, type=float)
self.check_for_updates = self.settings.value('check_for_updates', True, type=bool) self.check_for_updates = self.settings.value('check_for_updates', True, type=bool)
self.elapsed_time = QTime(0, 0, 0) self.elapsed_time = QTime(0, 0, 0)
@@ -544,12 +626,11 @@ class SpotiFLACGUI(QWidget):
if dialog.disable_check.isChecked(): if dialog.disable_check.isChecked():
self.settings.setValue('check_for_updates', False) self.settings.setValue('check_for_updates', False)
self.check_for_updates = False self.check_for_updates = False
if result == QDialog.DialogCode.Accepted: if result == QDialog.DialogCode.Accepted:
QDesktopServices.openUrl(QUrl("https://github.com/afkarxyz/SpotiFLAC/releases")) QDesktopServices.openUrl(QUrl("https://github.com/afkarxyz/SpotiFLAC/releases"))
except Exception as e: except Exception as e:
print(f"Error checking for updates: {e}") pass
@staticmethod @staticmethod
def format_duration(ms): def format_duration(ms):
@@ -788,7 +869,6 @@ class SpotiFLACGUI(QWidget):
format_layout = QHBoxLayout() format_layout = QHBoxLayout()
format_label = QLabel('Filename Format:') format_label = QLabel('Filename Format:')
self.format_group = QButtonGroup(self) self.format_group = QButtonGroup(self)
self.title_artist_radio = QRadioButton('Title - Artist') self.title_artist_radio = QRadioButton('Title - Artist')
self.title_artist_radio.setCursor(Qt.CursorShape.PointingHandCursor) self.title_artist_radio.setCursor(Qt.CursorShape.PointingHandCursor)
@@ -798,17 +878,25 @@ class SpotiFLACGUI(QWidget):
self.artist_title_radio.setCursor(Qt.CursorShape.PointingHandCursor) self.artist_title_radio.setCursor(Qt.CursorShape.PointingHandCursor)
self.artist_title_radio.toggled.connect(self.save_filename_format) self.artist_title_radio.toggled.connect(self.save_filename_format)
self.title_only_radio = QRadioButton('Title')
self.title_only_radio.setCursor(Qt.CursorShape.PointingHandCursor)
self.title_only_radio.toggled.connect(self.save_filename_format)
if hasattr(self, 'filename_format') and self.filename_format == "artist_title": if hasattr(self, 'filename_format') and self.filename_format == "artist_title":
self.artist_title_radio.setChecked(True) self.artist_title_radio.setChecked(True)
elif hasattr(self, 'filename_format') and self.filename_format == "title_only":
self.title_only_radio.setChecked(True)
else: else:
self.title_artist_radio.setChecked(True) self.title_artist_radio.setChecked(True)
self.format_group.addButton(self.title_artist_radio) self.format_group.addButton(self.title_artist_radio)
self.format_group.addButton(self.artist_title_radio) self.format_group.addButton(self.artist_title_radio)
self.format_group.addButton(self.title_only_radio)
format_layout.addWidget(format_label) format_layout.addWidget(format_label)
format_layout.addWidget(self.title_artist_radio) format_layout.addWidget(self.title_artist_radio)
format_layout.addWidget(self.artist_title_radio) format_layout.addWidget(self.artist_title_radio)
format_layout.addWidget(self.title_only_radio)
format_layout.addStretch() format_layout.addStretch()
file_layout.addLayout(format_layout) file_layout.addLayout(format_layout)
@@ -859,6 +947,18 @@ class SpotiFLACGUI(QWidget):
region_label.hide() region_label.hide()
self.qobuz_region_dropdown.hide() self.qobuz_region_dropdown.hide()
self.deezer_speed_label = QLabel('Speed:')
self.deezer_speed_dropdown = QComboBox()
self.deezer_speed_dropdown.addItem('Fast (5s)', 5)
self.deezer_speed_dropdown.addItem('Normal (7.5s)', 7.5)
self.deezer_speed_dropdown.addItem('Slow (10s)', 10)
self.deezer_speed_dropdown.currentIndexChanged.connect(self.save_deezer_speed_setting)
service_fallback_layout.addWidget(self.deezer_speed_label)
service_fallback_layout.addWidget(self.deezer_speed_dropdown)
self.deezer_speed_label.hide()
self.deezer_speed_dropdown.hide()
service_fallback_layout.addStretch() service_fallback_layout.addStretch()
auth_layout.addLayout(service_fallback_layout) auth_layout.addLayout(service_fallback_layout)
@@ -876,6 +976,11 @@ class SpotiFLACGUI(QWidget):
self.qobuz_region_dropdown.setCurrentIndex(i) self.qobuz_region_dropdown.setCurrentIndex(i)
break break
for i in range(self.deezer_speed_dropdown.count()):
if self.deezer_speed_dropdown.itemData(i) == self.deezer_speed:
self.deezer_speed_dropdown.setCurrentIndex(i)
break
self.update_service_ui() self.update_service_ui()
self.qobuz_region_dropdown.status_updated.connect( self.qobuz_region_dropdown.status_updated.connect(
@@ -931,7 +1036,7 @@ class SpotiFLACGUI(QWidget):
spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed) spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
about_layout.addItem(spacer) about_layout.addItem(spacer)
footer_label = QLabel("v3.7 | July 2025") footer_label = QLabel("v4.0 | July 2025")
footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;") footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;")
about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter) about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter)
@@ -943,21 +1048,8 @@ class SpotiFLACGUI(QWidget):
self.settings.setValue('service', service) self.settings.setValue('service', service)
self.settings.sync() self.settings.sync()
region_label = None self.update_service_ui()
for widget in self.qobuz_region_dropdown.parentWidget().children(): self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}")
if isinstance(widget, QLabel) and widget.text() == "Region:":
region_label = widget
break
if service == "qobuz":
if region_label:
region_label.show()
self.qobuz_region_dropdown.show()
else:
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}")
def update_service_ui(self): def update_service_ui(self):
service = self.service service = self.service
@@ -972,17 +1064,32 @@ class SpotiFLACGUI(QWidget):
if region_label: if region_label:
region_label.show() region_label.show()
self.qobuz_region_dropdown.show() self.qobuz_region_dropdown.show()
self.deezer_speed_label.hide()
self.deezer_speed_dropdown.hide()
elif service == "deezer":
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
self.deezer_speed_label.show()
self.deezer_speed_dropdown.show()
else: else:
if region_label: if region_label:
region_label.hide() region_label.hide()
self.qobuz_region_dropdown.hide() self.qobuz_region_dropdown.hide()
self.deezer_speed_label.hide()
self.deezer_speed_dropdown.hide()
def save_url(self): def save_url(self):
self.settings.setValue('spotify_url', self.spotify_url.text().strip()) self.settings.setValue('spotify_url', self.spotify_url.text().strip())
self.settings.sync() self.settings.sync()
def save_filename_format(self): def save_filename_format(self):
self.filename_format = "artist_title" if self.artist_title_radio.isChecked() else "title_artist" if self.artist_title_radio.isChecked():
self.filename_format = "artist_title"
elif self.title_only_radio.isChecked():
self.filename_format = "title_only"
else:
self.filename_format = "title_artist"
self.settings.setValue('filename_format', self.filename_format) self.settings.setValue('filename_format', self.filename_format)
self.settings.sync() self.settings.sync()
@@ -1002,6 +1109,13 @@ class SpotiFLACGUI(QWidget):
self.settings.sync() self.settings.sync()
self.log_output.append(f"Qobuz region setting saved: {self.qobuz_region_dropdown.currentText()}") self.log_output.append(f"Qobuz region setting saved: {self.qobuz_region_dropdown.currentText()}")
def save_deezer_speed_setting(self):
speed = self.deezer_speed_dropdown.currentData()
self.deezer_speed = speed
self.settings.setValue('deezer_speed', speed)
self.settings.sync()
self.log_output.append(f"Deezer speed setting saved: {self.deezer_speed_dropdown.currentText()}")
def save_settings(self): def save_settings(self):
self.settings.setValue('output_path', self.output_dir.text().strip()) self.settings.setValue('output_path', self.output_dir.text().strip())
self.settings.sync() self.settings.sync()
@@ -1284,6 +1398,8 @@ class SpotiFLACGUI(QWidget):
service = self.service_dropdown.currentData() service = self.service_dropdown.currentData()
qobuz_region = self.qobuz_region_dropdown.currentData() if service == "qobuz" else "us" qobuz_region = self.qobuz_region_dropdown.currentData() if service == "qobuz" else "us"
deezer_speed = self.deezer_speed_dropdown.currentData() if service == "deezer" else 7.5
self.worker = DownloadWorker( self.worker = DownloadWorker(
tracks_to_download, tracks_to_download,
outpath, outpath,
@@ -1295,7 +1411,8 @@ class SpotiFLACGUI(QWidget):
self.use_track_numbers, self.use_track_numbers,
self.use_album_subfolders, self.use_album_subfolders,
service, service,
qobuz_region qobuz_region,
deezer_speed
) )
self.worker.finished.connect(self.on_download_finished) self.worker.finished.connect(self.on_download_finished)
self.worker.progress.connect(self.update_progress) self.worker.progress.connect(self.update_progress)
@@ -1387,7 +1504,6 @@ class SpotiFLACGUI(QWidget):
for i, track in enumerate(self.tracks, 1): for i, track in enumerate(self.tracks, 1):
if self.is_playlist: if self.is_playlist:
track.track_number = i track.track_number = i
duration = self.format_duration(track.duration_ms) duration = self.format_duration(track.duration_ms)
display_text = f"{i}. {track.title} - {track.artists}{duration}" display_text = f"{i}. {track.title} - {track.artists}{duration}"
list_item = self.track_list.item(i - 1) list_item = self.track_list.item(i - 1)
@@ -1412,13 +1528,11 @@ class SpotiFLACGUI(QWidget):
if __name__ == '__main__': if __name__ == '__main__':
try: try:
if sys.platform == "win32": if sys.platform == "win32":
import os
os.system("chcp 65001 > nul")
import io import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
except Exception as e: except Exception as e:
print(f"Warning: Could not set UTF-8 encoding: {e}") pass
app = QApplication(sys.argv) app = QApplication(sys.argv)
ex = SpotiFLACGUI() ex = SpotiFLACGUI()
+215
View File
@@ -0,0 +1,215 @@
import requests
import asyncio
import os
import sys
from urllib.parse import urlparse
from mutagen.flac import FLAC
from mutagen.id3 import ID3NoHeaderError
import deezmate
class DeezerDownloader:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.progress_callback = None
def set_progress_callback(self, callback):
self.progress_callback = callback
def get_track_by_isrc(self, isrc):
try:
url = f"https://api.deezer.com/2.0/track/isrc:{isrc}"
response = self.session.get(url)
response.raise_for_status()
data = response.json()
if 'error' in data:
print(f"Error from Deezer API: {data['error']['message']}")
return None
return data
except requests.exceptions.RequestException as e:
print(f"Error fetching track data: {e}")
return None
def extract_metadata(self, track_data):
metadata = {}
metadata['title'] = track_data.get('title', '')
metadata['title_short'] = track_data.get('title_short', '')
metadata['duration'] = track_data.get('duration', 0)
metadata['track_position'] = track_data.get('track_position', 1)
metadata['disk_number'] = track_data.get('disk_number', 1)
metadata['isrc'] = track_data.get('isrc', '')
metadata['release_date'] = track_data.get('release_date', '')
metadata['explicit_lyrics'] = track_data.get('explicit_lyrics', False)
if 'artist' in track_data:
metadata['artist'] = track_data['artist'].get('name', '')
metadata['artist_id'] = track_data['artist'].get('id', '')
if 'contributors' in track_data:
artists = []
for contributor in track_data['contributors']:
if contributor.get('role') == 'Main':
artists.append(contributor.get('name', ''))
metadata['artists'] = ', '.join(artists) if artists else metadata.get('artist', '')
if 'album' in track_data:
album = track_data['album']
metadata['album'] = album.get('title', '')
metadata['album_id'] = album.get('id', '')
metadata['cover_url'] = album.get('cover_xl', album.get('cover_big', ''))
metadata['cover_md5'] = album.get('md5_image', '')
metadata['deezer_link'] = track_data.get('link', '')
metadata['preview_url'] = track_data.get('preview', '')
return metadata
def download_cover_art(self, cover_url, filename):
if not cover_url:
return None
try:
response = self.session.get(cover_url)
response.raise_for_status()
cover_path = f"{filename}_cover.jpg"
with open(cover_path, 'wb') as f:
f.write(response.content)
return cover_path
except Exception as e:
print(f"Error downloading cover art: {e}")
return None
def embed_metadata(self, file_path, metadata, cover_path=None):
try:
audio = FLAC(file_path)
audio.clear()
if metadata.get('title'):
audio['TITLE'] = metadata['title']
if metadata.get('artists'):
audio['ARTIST'] = metadata['artists']
elif metadata.get('artist'):
audio['ARTIST'] = metadata['artist']
if metadata.get('album'):
audio['ALBUM'] = metadata['album']
if metadata.get('release_date'):
audio['DATE'] = metadata['release_date']
if metadata.get('track_position'):
audio['TRACKNUMBER'] = str(metadata['track_position'])
if metadata.get('disk_number'):
audio['DISCNUMBER'] = str(metadata['disk_number'])
if metadata.get('isrc'):
audio['ISRC'] = metadata['isrc']
if cover_path and os.path.exists(cover_path):
with open(cover_path, 'rb') as f:
cover_data = f.read()
from mutagen.flac import Picture
picture = Picture()
picture.type = 3
picture.mime = 'image/jpeg'
picture.desc = 'Cover'
picture.data = cover_data
audio.add_picture(picture)
audio.save()
print(f"Metadata embedded successfully in {file_path}")
except Exception as e:
print(f"Error embedding metadata: {e}")
async def download_by_isrc(self, isrc, output_dir=".", initial_delay=7.5):
print(f"Fetching track info for ISRC: {isrc}")
track_data = self.get_track_by_isrc(isrc)
if not track_data:
print("Failed to get track data from Deezer API")
return False
metadata = self.extract_metadata(track_data)
print(f"Found track: {metadata.get('artists', 'Unknown')} - {metadata.get('title', 'Unknown')}")
deezer_link = metadata.get('deezer_link')
if not deezer_link:
print("No Deezer link found in track data")
return False
print(f"Using Deezer link: {deezer_link}")
flac_url = await deezmate.main(deezer_link, initial_delay)
if not flac_url:
print("Failed to get download URL from deezmate")
return False
print("Downloading FLAC file...")
try:
response = self.session.get(flac_url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
print(f"File size: {total_size} bytes ({total_size / (1024*1024):.2f} MB)")
safe_title = "".join(c for c in metadata.get('title', 'Unknown') if c.isalnum() or c in (' ', '-', '_')).rstrip()
safe_artist = "".join(c for c in metadata.get('artists', 'Unknown') if c.isalnum() or c in (' ', '-', '_')).rstrip()
filename = f"{safe_artist} - {safe_title}.flac"
file_path = os.path.join(output_dir, filename)
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if self.progress_callback and total_size > 0:
current_mb = downloaded / (1024 * 1024)
total_mb = total_size / (1024 * 1024)
percent = (downloaded / total_size) * 100
self.progress_callback(downloaded, total_size)
print(f"Downloaded: {file_path}")
cover_path = None
if metadata.get('cover_url'):
print("Downloading cover art...")
cover_path = self.download_cover_art(metadata['cover_url'],
os.path.join(output_dir, f"{safe_artist} - {safe_title}"))
print("Embedding metadata...")
self.embed_metadata(file_path, metadata, cover_path)
if cover_path and os.path.exists(cover_path):
os.remove(cover_path)
print(f"Successfully downloaded and tagged: {filename}")
return True
except Exception as e:
print(f"Error downloading file: {e}")
return False
async def main():
if len(sys.argv) != 2:
print("Usage: python deezerDL.py <ISRC>")
print("Example: python deezerDL.py USUM72409273")
return
isrc = sys.argv[1]
downloader = DeezerDownloader()
success = await downloader.download_by_isrc(isrc)
if success:
print("Download completed successfully!")
else:
print("Download failed!")
if __name__ == "__main__":
asyncio.run(main())
+130
View File
@@ -0,0 +1,130 @@
import nodriver as uc
import asyncio
async def download_deezer_track(deezer_link=None, initial_delay=7.5):
if deezer_link is None:
deezer_link = "https://www.deezer.com/us/track/2947516331"
browser = None
try:
browser = await uc.start(headless=False)
page = await browser.get("https://deezmate.com/en")
print("Loading...")
await asyncio.sleep(initial_delay)
input_selector = 'input[placeholder="Paste your Deezer link here..."]'
await page.wait_for(input_selector, timeout=15)
input_element = await page.select(input_selector)
await input_element.clear_input()
await input_element.send_keys(deezer_link)
print("Link entered")
await page.evaluate("""
window.apiResponse = null;
window.originalFetch = window.fetch;
window.fetch = function(...args) {
return window.originalFetch(...args).then(async response => {
if (response.url.includes('api.deezmate.com/dl/')) {
try {
const data = await response.clone().json();
window.apiResponse = data;
console.log('Captured API response:', data);
} catch (e) {
console.log('Error parsing API response:', e);
}
}
return response;
});
};
""")
max_retries = 3
download_button_clicked = False
for attempt in range(max_retries):
try:
download_button_selector = 'button.bg-purple.hover\\:bg-purple-dark.cursor-pointer.transition.text-white.rounded-xl.p-2.mt-2.w-full.mb-5'
await page.wait_for(download_button_selector, timeout=15)
download_button = await page.select(download_button_selector)
await download_button.click()
print("Processing...")
download_button_clicked = True
break
except Exception as e:
if attempt < max_retries - 1:
print(f"Turnstile verification failed, retrying... ({attempt + 1}/{max_retries})")
await asyncio.sleep(0.5)
await page.evaluate("window.apiResponse = null;")
else:
print("Failed to pass Turnstile verification after all retries")
raise e
if not download_button_clicked:
return None
try:
track_download_selector = 'button.bg-purple.text-white.flex.items-center.gap-2.px-3.py-1.rounded-full.hover\\:bg-purple-dark.transition'
await page.wait_for(track_download_selector, timeout=15)
track_download_button = await page.select(track_download_selector)
await track_download_button.click()
except Exception as e:
print(f"Failed to click track download button: {e}")
return None
print("Getting FLAC URL from API response...")
api_response = None
for i in range(30):
api_response = await page.evaluate("window.apiResponse")
if api_response:
break
await asyncio.sleep(0.2)
if not api_response:
return None
def parse_nodriver_response(data):
if isinstance(data, list):
result = {}
for item in data:
if isinstance(item, list) and len(item) == 2:
key = item[0]
value_obj = item[1]
if isinstance(value_obj, dict) and 'value' in value_obj:
if value_obj.get('type') == 'object':
result[key] = parse_nodriver_response(value_obj['value'])
else:
result[key] = value_obj['value']
return result
return data
parsed_response = parse_nodriver_response(api_response)
if parsed_response.get('success') and parsed_response.get('links'):
flac_url = parsed_response['links'].get('flac')
if flac_url:
print(f"Successfully obtained FLAC download URL: {flac_url}")
return flac_url
return None
except Exception as e:
print(f"Error: {e}")
return None
finally:
if browser:
try:
await browser.stop()
except:
pass
async def main(deezer_link=None, initial_delay=7.5):
flac_url = await download_deezer_track(deezer_link, initial_delay)
if not flac_url:
print("Failed to download track")
return flac_url
if __name__ == "__main__":
uc.loop().run_until_complete(main())
+2 -28
View File
@@ -2,7 +2,6 @@ import asyncio
import json import json
import os import os
import re import re
import tempfile
import time import time
import httpx import httpx
from mutagen.flac import FLAC, Picture from mutagen.flac import FLAC, Picture
@@ -24,22 +23,11 @@ class TidalDownloader:
self.progress_callback = ProgressCallback() self.progress_callback = ProgressCallback()
self.client_id = "zU4XHVVkc2tDPo4t" self.client_id = "zU4XHVVkc2tDPo4t"
self.client_secret = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4=" self.client_secret = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4="
self.temp_dir = tempfile.gettempdir()
self.token_path = os.path.join(self.temp_dir, "tidal_token.json")
self.access_token = None
self._load_token()
def set_progress_callback(self, callback): def set_progress_callback(self, callback):
self.progress_callback = callback self.progress_callback = callback
def _load_token(self):
if os.path.exists(self.token_path):
try:
with open(self.token_path, "r") as tok:
token = json.loads(tok.read())
self.access_token = token.get("access_token")
except:
pass
def sanitize_filename(self, filename): def sanitize_filename(self, filename):
if not filename: if not filename:
@@ -48,9 +36,6 @@ class TidalDownloader:
return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track" return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track"
async def get_access_token(self): async def get_access_token(self):
if self.access_token:
return self.access_token
refresh_url = "https://auth.tidal.com/v1/oauth2/token" refresh_url = "https://auth.tidal.com/v1/oauth2/token"
payload = { payload = {
@@ -68,18 +53,7 @@ class TidalDownloader:
if response.status_code == 200: if response.status_code == 200:
token_data = response.json() token_data = response.json()
new_token = token_data.get("access_token") return token_data.get("access_token")
try:
with open(self.token_path, "w") as f:
json.dump({
"access_token": new_token
}, f)
except:
pass
self.access_token = new_token
return new_token
else: else:
return None return None
+1 -1
View File
@@ -1,3 +1,3 @@
{ {
"version": "3.6" "version": "3.9"
} }