Compare commits

..

9 Commits

Author SHA1 Message Date
afkarxyz af4f1dd401 v3.7 2025-07-13 05:25:13 +07:00
afkarxyz 3414fadbd3 v3.6 2025-07-08 16:44:11 +07:00
afkarxyz 457f30da99 v3.6 2025-07-08 16:37:45 +07:00
afkarxyz d4e621b36c v3.5 2025-07-02 13:50:01 +07:00
afkarxyz 58a733b790 v3.5 2025-07-02 13:44:39 +07:00
afkarxyz c85ab4bc28 Update README.md 2025-07-01 18:37:22 +07:00
afkarxyz dac2e99b5a Update README.md 2025-07-01 18:37:09 +07:00
afkarxyz d0f494f582 v3.4 2025-06-23 11:59:09 +07:00
afkarxyz 0542d6e86b v3.4 2025-06-23 11:57:49 +07:00
8 changed files with 841 additions and 1363 deletions
-99
View File
@@ -1,99 +0,0 @@
import asyncio
import zendriver as zd
async def get_metadata(page, headless=True):
max_attempts = 40
attempts = 0
await asyncio.sleep(2)
await page.evaluate("""
window.downloadInfo = null;
const originalFetch = window.fetch;
window.fetch = async function(...args) {
const [url, config] = args;
if (url.includes('/api/load?url=%2Fapi%2Ffetch%2Fstream%2Fv2')) {
const payload = JSON.parse(config.body);
const title = document.querySelector('h1.svelte-6pt9ji').textContent.trim();
const artists = Array.from(document.querySelectorAll('h2.svelte-6pt9ji a.normal'))
.map(a => a.textContent.trim())
.join(', ');
const cover = document.querySelector('.svelte-6pt9ji .meta.svelte-6pt9ji a').href;
window.downloadInfo = {
url: payload.url,
cover: cover,
title: title,
artists: artists,
token: payload.token.primary,
expiry: payload.token.expiry
};
}
return originalFetch.apply(this, args);
};
""")
await page.evaluate("""
function waitForElement(selector) {
return new Promise(resolve => {
if (document.querySelector(selector)) {
return resolve(document.querySelector(selector));
}
const observer = new MutationObserver(mutations => {
if (document.querySelector(selector)) {
observer.disconnect();
resolve(document.querySelector(selector));
}
});
observer.observe(document.documentElement, {
childList: true,
subtree: true
});
});
}
(async () => {
if (!window.location.hostname.includes('lucida.')) return;
await Promise.race([
waitForElement('.d1-track button'),
waitForElement('button[class*="download-button"]')
]);
const clickDownloadButton = () => {
const button = document.querySelector('.d1-track button') ||
document.querySelector('button[class*="download-button"]');
if (button) button.click();
};
clickDownloadButton();
})();
""")
while attempts < max_attempts:
download_info = await page.evaluate("window.downloadInfo")
if download_info:
return download_info
await asyncio.sleep(0.5)
attempts += 1
raise TimeoutError("Timeout")
async def main(headless=True):
browser = await zd.start(headless=headless)
try:
track_id = "2plbrEY59IikOBgBGLjaoe"
url = f"https://lucida.to/?url=https%3A%2F%2Fopen.spotify.com%2Ftrack%2F{track_id}&country=auto&to=tidal"
page = await browser.get(url)
download_info = await get_metadata(page)
print(download_info)
return download_info
finally:
await browser.stop()
if __name__ == "__main__":
asyncio.run(main())
+2 -13
View File
@@ -3,15 +3,10 @@
![SpotiFLAC](https://github.com/user-attachments/assets/b4c4f403-edbd-4a71-b74b-c7d433d47d06)
<div align="center">
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music, and Deezer <code>(via Lucida)</code>, as well as Qobuz <code>(via SquidWTF)</code>.
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Qobuz & Tidal.
</div>
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v3.3/SpotiFLAC.exe)
#
> [!Note]
**Download speed** from Lucida is unpredictable—sometimes fast, sometimes slow. Join their [Discord](https://discord.com/invite/dXEGRWqEbS) for updates.
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v3.6/SpotiFLAC.exe)
## Screenshots
@@ -21,14 +16,8 @@
![image](https://github.com/user-attachments/assets/be9a79b5-260e-4948-9f72-10c735210ab7)
![image](https://github.com/user-attachments/assets/c4403934-9003-447e-a27b-fc74cab23454)
![image](https://github.com/user-attachments/assets/1feec621-f8bf-4b2a-ae73-afcb1fb1deba)
![image](https://github.com/user-attachments/assets/c64b9a08-c99a-4d3a-ae8b-5f834623915b)
> When **Fallback** is enabled, it will use the backup server `Lucida.su`
## Lossless Audio Check
![image](https://github.com/user-attachments/assets/d63b422d-0ea3-4307-850f-96c99d7eaa9a)
+52 -140
View File
@@ -17,7 +17,8 @@ from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush
from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException
from getTracks import LucidaDownloader, SquidWTFDownloader, TidalDownloader
from qobuzDL import QobuzDownloader
from tidalDL import TidalDownloader
@dataclass
class Track:
@@ -53,10 +54,9 @@ class MetadataFetchWorker(QThread):
class DownloadWorker(QThread):
finished = pyqtSignal(bool, str, list)
progress = pyqtSignal(str, int)
def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False,
album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True,
use_album_subfolders=False, service="amazon", timeout=30, qobuz_region="us"):
use_album_subfolders=False, service="tidal", qobuz_region="us"):
super().__init__()
self.tracks = tracks
self.outpath = outpath
@@ -68,7 +68,6 @@ class DownloadWorker(QThread):
self.use_track_numbers = use_track_numbers
self.use_album_subfolders = use_album_subfolders
self.service = service
self.timeout = timeout
self.qobuz_region = qobuz_region
self.is_paused = False
self.is_stopped = False
@@ -84,11 +83,11 @@ class DownloadWorker(QThread):
def run(self):
try:
if self.service == "qobuz":
downloader = SquidWTFDownloader(self.qobuz_region, self.timeout)
elif self.service == "tidal_api":
downloader = TidalDownloader(timeout=self.timeout)
downloader = QobuzDownloader(self.qobuz_region)
elif self.service == "tidal":
downloader = TidalDownloader()
else:
downloader = LucidaDownloader(timeout=self.timeout)
downloader = TidalDownloader()
def progress_update(current, total):
if total > 0:
@@ -154,16 +153,15 @@ class DownloadWorker(QThread):
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
)
elif self.service == "tidal_api":
elif self.service == "tidal":
if not track.isrc:
self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0)
self.failed_tracks.append((track.title, track.artists, "No ISRC available"))
continue
self.progress.emit(f"Searching and downloading from Tidal (API) for ISRC: {track.isrc} - {track.title} - {track.artists}", 0)
self.progress.emit(f"Searching and downloading from Tidal for ISRC: {track.isrc} - {track.title} - {track.artists}", 0)
import asyncio
is_paused_callback = lambda: self.is_paused
is_stopped_callback = lambda: self.is_stopped
@@ -181,7 +179,6 @@ class DownloadWorker(QThread):
isrc=track.isrc,
output_dir=track_outpath,
quality="LOSSLESS",
embed_metadata=True,
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
))
@@ -192,13 +189,13 @@ class DownloadWorker(QThread):
self.progress.emit(f"Download stopped by user for: {track.title}",0)
return
elif isinstance(download_result_details, dict) and download_result_details.get("success") == False:
raise Exception(download_result_details.get("error", "Tidal API download failed"))
raise Exception(download_result_details.get("error", "Tidal download failed"))
elif isinstance(download_result_details, dict) and (download_result_details.get("status") == "all_skipped" or download_result_details.get("status") == "skipped_exists"):
self.progress.emit(f"File already exists or skipped: {new_filename}",0)
downloaded_file = new_filepath
else:
downloaded_file = None
raise Exception(f"Tidal API download failed or returned unexpected result: {download_result_details}")
raise Exception(f"Tidal download failed or returned unexpected result: {download_result_details}")
else:
track_id = track.id
self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0)
@@ -306,26 +303,7 @@ class UpdateDialog(QDialog):
self.update_button.clicked.connect(self.accept)
self.cancel_button.clicked.connect(self.reject)
class ServiceStatusChecker(QThread):
status_updated = pyqtSignal(dict)
error = pyqtSignal(str)
def run(self):
try:
response = requests.get("https://lucida.to/api/stats", timeout=5)
services_status = {}
if response.status_code == 200:
data = response.json()
current_services = data.get('downloads', {}).get('current', {}).get('services', {})
services_status['amazon'] = current_services.get('amazon', 0) > 0
services_status['tidal'] = current_services.get('tidal', 0) > 0
services_status['deezer'] = current_services.get('deezer', 0) > 0
else:
self.error.emit(f"Lucida API error: {response.status_code}")
self.status_updated.emit(services_status)
except Exception as e:
self.error.emit(f"Error checking Lucida service status: {str(e)}")
class TidalStatusChecker(QThread):
status_updated = pyqtSignal(bool)
@@ -382,36 +360,23 @@ class ServiceComboBox(QComboBox):
self.services_status = {}
self.setItemDelegate(StatusIndicatorDelegate())
self.setup_items()
self.status_checker = ServiceStatusChecker()
self.status_checker.status_updated.connect(self.update_service_status)
self.status_checker.error.connect(lambda e: print(f"General status check error: {e}"))
self.status_checker.start()
self.tidal_status_checker = TidalStatusChecker()
self.tidal_status_checker.status_updated.connect(self.update_tidal_service_status)
self.tidal_status_checker.error.connect(lambda e: print(f"Tidal status check error: {e}"))
self.tidal_status_checker.start()
self.status_timer = QTimer(self)
self.status_timer.timeout.connect(self.refresh_status)
self.status_timer.start(5000)
self.tidal_api_status_checker = TidalStatusChecker()
self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status)
self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}"))
self.tidal_api_status_checker.start()
self.tidal_api_status_timer = QTimer(self)
self.tidal_api_status_timer.timeout.connect(self.refresh_tidal_api_status)
self.tidal_api_status_timer.start(6000)
self.tidal_status_timer = QTimer(self)
self.tidal_status_timer.timeout.connect(self.refresh_tidal_status)
self.tidal_status_timer.start(6000)
def setup_items(self):
current_dir = os.path.dirname(os.path.abspath(__file__))
self.services = [
{'id': 'tidal', 'name': 'Tidal (Lucida)', 'icon': 'tidal.png', 'online': False},
{'id': 'amazon', 'name': 'Amazon (Lucida)', 'icon': 'amazon.png', 'online': False},
{'id': 'deezer', 'name': 'Deezer (Lucida)', 'icon': 'deezer.png', 'online': False},
{'id': 'qobuz', 'name': 'Qobuz (SquidWTF)', 'icon': 'qobuz.png', 'online': False},
{'id': 'tidal_api', 'name': 'Tidal (API)', 'icon': 'tidal.png', 'online': False}
{'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False},
{'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False}
]
for service in self.services:
@@ -425,36 +390,15 @@ class ServiceComboBox(QComboBox):
item_index = self.count() - 1
self.setItemData(item_index, service['id'], Qt.ItemDataRole.UserRole + 1)
self.setItemData(item_index, service, Qt.ItemDataRole.UserRole)
def create_placeholder_icon(self, path):
pixmap = QPixmap(16, 16)
pixmap.fill(Qt.GlobalColor.transparent)
pixmap.save(path)
def update_service_status(self, status_dict):
self.services_status.update(status_dict)
def update_tidal_service_status(self, is_online):
for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id in self.services_status:
service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict):
service_data['online'] = self.services_status[service_id]
self.setItemData(i, service_data, Qt.ItemDataRole.UserRole)
self.update()
def refresh_status(self):
self.status_checker = ServiceStatusChecker()
self.status_checker.status_updated.connect(self.update_service_status)
self.status_checker.error.connect(lambda e: print(f"General status check error: {e}"))
self.status_checker.start()
def update_tidal_api_service_status(self, is_online):
for i in range(self.count()):
service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1)
if service_id == 'tidal_api':
if service_id == 'tidal':
service_data = self.itemData(i, Qt.ItemDataRole.UserRole)
if isinstance(service_data, dict):
service_data['online'] = is_online
@@ -462,11 +406,11 @@ class ServiceComboBox(QComboBox):
break
self.update()
def refresh_tidal_api_status(self):
self.tidal_api_status_checker = TidalStatusChecker()
self.tidal_api_status_checker.status_updated.connect(self.update_tidal_api_service_status)
self.tidal_api_status_checker.error.connect(lambda e: print(f"Tidal (API) status check error: {e}"))
self.tidal_api_status_checker.start()
def refresh_tidal_status(self):
self.tidal_status_checker = TidalStatusChecker()
self.tidal_status_checker.status_updated.connect(self.update_tidal_service_status)
self.tidal_status_checker.error.connect(lambda e: print(f"Tidal status check error: {e}"))
self.tidal_status_checker.start()
def currentData(self, role=Qt.ItemDataRole.UserRole + 1):
return super().currentData(role)
@@ -559,7 +503,7 @@ class QobuzRegionComboBox(QComboBox):
class SpotiFLACGUI(QWidget):
def __init__(self):
super().__init__()
self.current_version = "3.4"
self.current_version = "3.7"
self.tracks = []
self.reset_state()
@@ -570,9 +514,8 @@ class SpotiFLACGUI(QWidget):
self.filename_format = self.settings.value('filename_format', 'title_artist')
self.use_track_numbers = self.settings.value('use_track_numbers', False, type=bool)
self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool)
self.service = self.settings.value('service', 'amazon')
self.service = self.settings.value('service', 'tidal')
self.qobuz_region = self.settings.value('qobuz_region', 'us')
self.timeout_value = self.settings.value('timeout_value', 30, type=int)
self.check_for_updates = self.settings.value('check_for_updates', True, type=bool)
self.elapsed_time = QTime(0, 0, 0)
@@ -902,20 +845,11 @@ class SpotiFLACGUI(QWidget):
self.service_dropdown = ServiceComboBox()
self.service_dropdown.currentIndexChanged.connect(self.on_service_changed)
service_fallback_layout.addWidget(service_label)
service_fallback_layout.addWidget(self.service_dropdown)
service_fallback_layout.addSpacing(10)
timeout_label = QLabel('Timeout:')
self.timeout_input = QLineEdit()
self.timeout_input.setText(str(self.timeout_value))
self.timeout_input.setFixedWidth(35)
self.timeout_input.textChanged.connect(self.save_timeout_setting)
service_fallback_layout.addWidget(timeout_label)
service_fallback_layout.addWidget(self.timeout_input)
region_label = QLabel('Region:')
self.qobuz_region_dropdown = QobuzRegionComboBox()
self.qobuz_region_dropdown.currentIndexChanged.connect(self.save_qobuz_region_setting)
@@ -932,7 +866,6 @@ class SpotiFLACGUI(QWidget):
settings_layout.addStretch()
settings_tab.setLayout(settings_layout)
self.tab_widget.addTab(settings_tab, "Settings")
for i in range(self.service_dropdown.count()):
if self.service_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == self.service:
self.service_dropdown.setCurrentIndex(i)
@@ -943,6 +876,8 @@ class SpotiFLACGUI(QWidget):
self.qobuz_region_dropdown.setCurrentIndex(i)
break
self.update_service_ui()
self.qobuz_region_dropdown.status_updated.connect(
lambda region_id, is_online: self.service_dropdown.update_qobuz_status(region_id, is_online)
)
@@ -955,8 +890,7 @@ class SpotiFLACGUI(QWidget):
sections = [
("Check for Updates", "https://github.com/afkarxyz/SpotiFLAC/releases"),
("Report an Issue", "https://github.com/afkarxyz/SpotiFLAC/issues"),
("Lucida Status", "https://status.lucida.to")
("Report an Issue", "https://github.com/afkarxyz/SpotiFLAC/issues")
]
for title, url in sections:
@@ -988,7 +922,7 @@ class SpotiFLACGUI(QWidget):
}
""")
button.setCursor(Qt.CursorShape.PointingHandCursor)
button.clicked.connect(lambda _, url=url: QDesktopServices.openUrl(QUrl(url)))
button.clicked.connect(lambda _, url=url: QDesktopServices.openUrl(QUrl(url if url.startswith(('http://', 'https://')) else f'https://{url}')))
section_layout.addWidget(button, alignment=Qt.AlignmentFlag.AlignCenter)
about_layout.addWidget(section_widget)
@@ -997,25 +931,18 @@ class SpotiFLACGUI(QWidget):
spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed)
about_layout.addItem(spacer)
footer_label = QLabel("v3.4 | June 2025")
footer_label = QLabel("v3.7 | July 2025")
footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;")
about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter)
about_tab.setLayout(about_layout)
self.tab_widget.addTab(about_tab, "About")
def on_service_changed(self, index):
service = self.service_dropdown.currentData()
self.service = service
self.settings.setValue('service', service)
self.settings.sync()
timeout_label = None
for widget in self.timeout_input.parentWidget().children():
if isinstance(widget, QLabel) and widget.text() == "Timeout:":
timeout_label = widget
break
region_label = None
for widget in self.qobuz_region_dropdown.parentWidget().children():
if isinstance(widget, QLabel) and widget.text() == "Region:":
@@ -1023,31 +950,33 @@ class SpotiFLACGUI(QWidget):
break
if service == "qobuz":
self.timeout_input.hide()
if timeout_label:
timeout_label.hide()
if region_label:
region_label.show()
self.qobuz_region_dropdown.show()
elif service == "tidal_api":
self.timeout_input.hide()
if timeout_label:
timeout_label.hide()
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
else:
self.timeout_input.show()
if timeout_label:
timeout_label.show()
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}")
def update_service_ui(self):
service = self.service
region_label = None
for widget in self.qobuz_region_dropdown.parentWidget().children():
if isinstance(widget, QLabel) and widget.text() == "Region:":
region_label = widget
break
if service == "qobuz":
if region_label:
region_label.show()
self.qobuz_region_dropdown.show()
else:
if region_label:
region_label.hide()
self.qobuz_region_dropdown.hide()
def save_url(self):
self.settings.setValue('spotify_url', self.spotify_url.text().strip())
self.settings.sync()
@@ -1061,27 +990,11 @@ class SpotiFLACGUI(QWidget):
self.use_track_numbers = self.track_number_checkbox.isChecked()
self.settings.setValue('use_track_numbers', self.use_track_numbers)
self.settings.sync()
def save_album_subfolder_setting(self):
self.use_album_subfolders = self.album_subfolder_checkbox.isChecked()
self.settings.setValue('use_album_subfolders', self.use_album_subfolders)
self.settings.sync()
def save_timeout_setting(self):
try:
timeout = int(self.timeout_input.text())
if timeout > 0:
self.timeout_value = timeout
self.settings.setValue('timeout_value', self.timeout_value)
self.settings.sync()
self.log_output.append(f"Timeout setting saved: {self.timeout_value} seconds")
else:
self.timeout_input.setText(str(self.timeout_value))
self.log_output.append("Timeout must be a positive number")
except ValueError:
self.timeout_input.setText(str(self.timeout_value))
self.log_output.append("Timeout must be a valid number")
def save_qobuz_region_setting(self):
region = self.qobuz_region_dropdown.currentData()
self.qobuz_region = region
@@ -1382,7 +1295,6 @@ class SpotiFLACGUI(QWidget):
self.use_track_numbers,
self.use_album_subfolders,
service,
self.timeout_value,
qobuz_region
)
self.worker.finished.connect(self.on_download_finished)
+58 -31
View File
@@ -2,35 +2,56 @@ from time import sleep
from urllib.parse import urlparse, parse_qs
import requests
import json
import hmac
import time
import hashlib
from typing import Tuple, Callable, Dict, Any, List
import pyotp
import base64
from random import randrange
from typing import Dict, Any, List, Tuple
_TOTP_SECRET = bytearray([53,53,48,55,49,52,53,56,53,51,52,56,55,52,57,57,53,57,50,50,52,56,54,51,48,51,50,57,51,52,55])
# https://github.com/visagenull/Spotify-Free
def get_random_user_agent():
return f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_{randrange(11, 15)}_{randrange(4, 9)}) AppleWebKit/{randrange(530, 537)}.{randrange(30, 37)} (KHTML, like Gecko) Chrome/{randrange(80, 105)}.0.{randrange(3000, 4500)}.{randrange(60, 125)} Safari/{randrange(530, 537)}.{randrange(30, 36)}"
def generate_totp(
secret: bytes = _TOTP_SECRET,
algorithm: Callable[[], object] = hashlib.sha1,
digits: int = 6,
counter_factory: Callable[[], int] = lambda: int(time.time()) // 30,
) -> Tuple[str, int]:
counter = counter_factory()
hmac_result = hmac.new(
secret, counter.to_bytes(8, byteorder="big"), algorithm
).digest()
def generate_totp():
url = "https://raw.githubusercontent.com/Thereallo1026/spotify-secrets/refs/heads/main/secrets/secretBytes.json"
offset = hmac_result[-1] & 15
truncated_value = (
(hmac_result[offset] & 127) << 24
| (hmac_result[offset + 1] & 255) << 16
| (hmac_result[offset + 2] & 255) << 8
| (hmac_result[offset + 3] & 255)
)
return (
str(truncated_value % (10**digits)).zfill(digits),
counter * 30_000,
)
try:
resp = requests.get(url, timeout=10)
if resp.status_code != 200:
raise Exception(f"Failed to fetch TOTP secrets from GitHub. Status: {resp.status_code}")
secrets_list = resp.json()
latest_entry = max(secrets_list, key=lambda x: x["version"])
version = latest_entry["version"]
secret_cipher = latest_entry["secret"]
except Exception as e:
raise Exception(f"Failed to fetch secrets from GitHub: {str(e)}")
processed = [byte ^ ((i % 33) + 9) for i, byte in enumerate(secret_cipher)]
processed_str = "".join(map(str, processed))
utf8_bytes = processed_str.encode('utf-8')
hex_str = utf8_bytes.hex()
secret_bytes = bytes.fromhex(hex_str)
b32_secret = base64.b32encode(secret_bytes).decode('utf-8')
totp = pyotp.TOTP(b32_secret)
headers = {
"Host": "open.spotify.com",
"User-Agent": get_random_user_agent(),
"Accept": "*/*",
}
try:
resp = requests.get("https://open.spotify.com/api/server-time", headers=headers, timeout=10)
if resp.status_code != 200:
raise Exception(f"Failed to get server time. Status code: {resp.status_code}")
data = resp.json()
server_time = data.get("serverTime")
if server_time is None:
raise Exception("Failed to fetch server time from Spotify")
return totp, server_time, version
except Exception as e:
raise Exception(f"Error getting server time: {str(e)}")
token_url = 'https://open.spotify.com/api/token'
playlist_base_url = 'https://api.spotify.com/v1/playlists/{}'
@@ -102,14 +123,20 @@ def get_json_from_api(api_url, access_token):
def get_access_token():
try:
totp, timestamp = generate_totp()
totp, server_time, totp_version = generate_totp()
otp_code = totp.at(int(server_time))
timestamp_ms = int(time.time() * 1000)
params = {
"reason": "init",
"productType": "web-player",
"totp": totp,
"totpVer": 5,
"ts": timestamp,
'reason': 'init',
'productType': 'web-player',
'totp': otp_code,
'totpServerTime': server_time,
'totpVer': str(totp_version),
'sTime': server_time,
'cTime': timestamp_ms,
'buildVer': 'web-player_2025-07-02_1720000000000_12345678',
'buildDate': '2025-07-02'
}
req = requests.get(token_url, headers=headers, params=params, timeout=10)
-1076
View File
File diff suppressed because it is too large Load Diff
+271
View File
@@ -0,0 +1,271 @@
import requests
import time
import os
import re
from datetime import datetime
from mutagen.flac import FLAC, Picture
from mutagen.id3 import PictureType
class ProgressCallback:
def __call__(self, current, total):
if total > 0:
percent = (current / total) * 100
print(f"\r{percent:.2f}% ({current}/{total})", end="")
else:
print(f"\r{current / (1024 * 1024):.2f} MB", end="")
class QobuzDownloader:
def __init__(self, region="us", timeout=30):
if region not in ["eu", "us"]:
raise ValueError("Region must be either 'us' or 'eu'")
self.region = region
self.timeout = timeout
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.base_api_url = f"https://{region}.qobuz.squid.wtf/api"
self.download_chunk_size = 256 * 1024
self.progress_callback = ProgressCallback()
def set_progress_callback(self, callback):
self.progress_callback = callback
def sanitize_filename(self, filename):
if not filename:
return "Unknown Track"
sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename))
return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track"
def get_track_info(self, isrc):
print(f"Fetching: {isrc}")
search_url = f"{self.base_api_url}/get-music"
params = {'q': isrc, 'offset': 0, 'limit': 10}
try:
response = self.session.get(search_url, params=params, timeout=self.timeout)
response.raise_for_status()
data = response.json()
selected_track = None
if data and data.get("success"):
items = data.get("data", {}).get("tracks", {}).get("items", [])
priority = {24: 1, 16: 2}
for track in items:
if track.get("isrc") == isrc:
current_prio = priority.get(track.get("maximum_bit_depth"), 3)
if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3):
selected_track = track
if current_prio == 1:
break
if not selected_track:
raise Exception(f"Track not found: {isrc}")
title = selected_track.get('title', 'Unknown')
bit_depth = selected_track.get('maximum_bit_depth', 'Unknown')
print(f"Found: {title} ({bit_depth}b)")
return selected_track
except requests.exceptions.RequestException as e:
raise Exception(f"Request error: {e}")
except Exception as e:
raise Exception(f"Error: {e}")
def get_download_url(self, track_id):
print("Fetching URL...")
download_api_url = f"{self.base_api_url}/download-music"
params = {'track_id': track_id, 'quality': 27}
try:
response = self.session.get(download_api_url, params=params, timeout=self.timeout)
response.raise_for_status()
data = response.json()
if data and data.get("success") and data.get("data", {}).get("url"):
download_url = data["data"]["url"]
print("URL found")
return download_url
else:
error_msg = data.get('error', {}).get('message', 'Unknown API error')
raise Exception(f"API error: {error_msg}")
except requests.exceptions.RequestException as e:
raise Exception(f"Request error: {e}")
except Exception as e:
raise Exception(f"Error: {e}")
def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None):
if output_dir != ".":
try:
os.makedirs(output_dir, exist_ok=True)
except OSError as e:
raise Exception(f"Directory error: {e}")
track_info = self.get_track_info(isrc)
track_id = track_info.get("id")
if not track_id:
raise Exception("No track ID found")
artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name'))
track_title = self.sanitize_filename(track_info.get('title'))
output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac")
if os.path.exists(output_filename):
file_size = os.path.getsize(output_filename)
if file_size > 0:
print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)")
return output_filename
download_url = self.get_download_url(track_id)
temp_filename = output_filename + ".part"
print(f"Downloading...")
try:
with self.session.get(download_url, stream=True, timeout=900) as response, \
open(temp_filename, 'wb') as f:
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
start_time = time.time()
last_update_time = start_time
for chunk in response.iter_content(chunk_size=self.download_chunk_size):
if is_stopped_callback and is_stopped_callback():
f.close()
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception("Download stopped")
while is_paused_callback and is_paused_callback():
time.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
f.close()
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception("Download stopped")
f.write(chunk)
downloaded_size += len(chunk)
current_time = time.time()
if current_time - last_update_time >= 1:
if total_size > 0:
progress_percent = (downloaded_size / total_size) * 100
elapsed_time = current_time - start_time
speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0
print(f"{progress_percent:.2f}% - {speed:.2f} MB/s")
else:
print(f"{downloaded_size / (1024 * 1024):.2f} MB")
last_update_time = current_time
if self.progress_callback:
self.progress_callback(downloaded_size, total_size)
os.rename(temp_filename, output_filename)
print("Download complete")
except requests.exceptions.RequestException as e:
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception(f"Download failed: {e}")
except Exception as e:
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise Exception(f"File error: {e}")
print("Adding metadata...")
try:
self._embed_metadata(output_filename, track_info)
print("Metadata saved")
except Exception as e:
print(f"Tagging failed: {e}")
print(f"Done")
return output_filename
def _embed_metadata(self, filename, track_info):
try:
audio = FLAC(filename)
audio.delete()
audio.clear_pictures()
album_info = track_info.get('album', {})
artist = track_info.get('performer', {}).get('name')
if track_info.get('title'):
audio['TITLE'] = track_info['title']
if artist:
audio['ARTIST'] = artist
if album_info.get('title'):
audio['ALBUM'] = album_info['title']
if album_info.get('artist', {}).get('name', artist):
audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist)
if track_info.get('track_number'):
audio['TRACKNUMBER'] = str(track_info['track_number'])
if track_info.get('release_date_original'):
audio['DATE'] = track_info['release_date_original']
try:
audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year)
except ValueError:
pass
if album_info.get('genre', {}).get('name'):
audio['GENRE'] = album_info['genre']['name']
if track_info.get('copyright'):
audio['COPYRIGHT'] = track_info['copyright']
if track_info.get('isrc'):
audio['ISRC'] = track_info['isrc']
if album_info.get('label', {}).get('name'):
audio['ORGANIZATION'] = album_info['label']['name']
img_info = album_info.get('image', {})
cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail')
if cover_url:
try:
img_response = self.session.get(cover_url, timeout=30)
img_response.raise_for_status()
mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower()
if mime_type in ['image/jpeg', 'image/png']:
picture = Picture()
picture.data = img_response.content
picture.type = PictureType.COVER_FRONT
picture.mime = mime_type
audio.add_picture(picture)
print("Cover added")
except Exception as e:
print(f"Cover error: {str(e)}")
audio.save()
except Exception as e:
raise Exception(f"Metadata error: {e}")
def main():
print("=== QobuzDL - Qobuz Downloader ===")
downloader = QobuzDownloader(region="us")
isrc = "USAT22409172"
output_dir = "."
try:
downloaded_file = downloader.download(isrc, output_dir)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
try:
import sys
if sys.platform == "win32":
import os
os.system("chcp 65001 > nul")
try:
sys.stdout.reconfigure(encoding='utf-8')
except:
pass
except:
pass
main()
+454
View File
@@ -0,0 +1,454 @@
import asyncio
import json
import os
import re
import tempfile
import time
import httpx
from mutagen.flac import FLAC, Picture
from mutagen.id3 import PictureType
class ProgressCallback:
def __call__(self, current, total):
if total > 0:
percent = (current / total) * 100
print(f"\r{percent:.2f}% ({current}/{total})", end="")
else:
print(f"\r{current / (1024 * 1024):.2f} MB", end="")
class TidalDownloader:
def __init__(self, timeout=30, max_retries=3):
self.timeout = timeout
self.max_retries = max_retries
self.download_chunk_size = 256 * 1024
self.progress_callback = ProgressCallback()
self.client_id = "zU4XHVVkc2tDPo4t"
self.client_secret = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4="
self.temp_dir = tempfile.gettempdir()
self.token_path = os.path.join(self.temp_dir, "tidal_token.json")
self.access_token = None
self._load_token()
def set_progress_callback(self, callback):
self.progress_callback = callback
def _load_token(self):
if os.path.exists(self.token_path):
try:
with open(self.token_path, "r") as tok:
token = json.loads(tok.read())
self.access_token = token.get("access_token")
except:
pass
def sanitize_filename(self, filename):
if not filename:
return "Unknown Track"
sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename))
return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track"
async def get_access_token(self):
if self.access_token:
return self.access_token
refresh_url = "https://auth.tidal.com/v1/oauth2/token"
payload = {
"client_id": self.client_id,
"grant_type": "client_credentials",
}
async with httpx.AsyncClient(http2=True) as client:
try:
response = await client.post(
url=refresh_url,
data=payload,
auth=(self.client_id, self.client_secret),
)
if response.status_code == 200:
token_data = response.json()
new_token = token_data.get("access_token")
try:
with open(self.token_path, "w") as f:
json.dump({
"access_token": new_token
}, f)
except:
pass
self.access_token = new_token
return new_token
else:
return None
except:
return None
async def search_tracks(self, query):
try:
tidal_token = await self.get_access_token()
if not tidal_token:
raise Exception("Failed to get access token")
search_url = f"https://api.tidal.com/v1/search/tracks?query={query}&limit=25&offset=0&countryCode=US"
header = {"authorization": f"Bearer {tidal_token}"}
async with httpx.AsyncClient(http2=True) as client:
search_data = await client.get(url=search_url, headers=header)
response_data = search_data.json()
filtered_items = [{
"id": item.get("id"),
"title": item.get("title"),
"url": item.get("url"),
"isrc": item.get("isrc"),
"audioQuality": item.get("audioQuality"),
"mediaMetadata": item.get("mediaMetadata"),
"album": item.get("album", {}),
"artists": item.get("artists", []),
"artist": item.get("artist", {}),
"trackNumber": item.get("trackNumber"),
"volumeNumber": item.get("volumeNumber"),
"duration": item.get("duration"),
"copyright": item.get("copyright"),
"explicit": item.get("explicit")
} for item in response_data.get("items", [])]
return {
"limit": response_data.get("limit"),
"offset": response_data.get("offset"),
"totalNumberOfItems": response_data.get("totalNumberOfItems"),
"items": filtered_items
}
except Exception as e:
raise Exception(f"Search error: {str(e)}")
async def get_track_info(self, query, isrc=None):
print(f"Fetching: {query}" + (f" (ISRC: {isrc})" if isrc else ""))
try:
result = await self.search_tracks(query)
if not result or not result.get("items"):
raise Exception(f"No tracks found for query: {query}")
selected_track = None
if isrc:
isrc_items = [item for item in result["items"] if item.get("isrc") == isrc]
if len(isrc_items) > 1:
hires_items = []
for item in isrc_items:
media_metadata = item.get("mediaMetadata", {})
tags = media_metadata.get("tags", []) if media_metadata else []
if "HIRES_LOSSLESS" in tags:
hires_items.append(item)
if hires_items:
selected_track = hires_items[0]
else:
selected_track = isrc_items[0]
elif len(isrc_items) == 1:
selected_track = isrc_items[0]
else:
selected_track = result["items"][0]
else:
selected_track = result["items"][0]
if not selected_track:
raise Exception(f"Track not found: {query}" + (f" (ISRC: {isrc})" if isrc else ""))
title = selected_track.get('title', 'Unknown')
quality = selected_track.get('audioQuality', 'Unknown')
print(f"Found: {title} ({quality})")
return selected_track
except Exception as e:
raise Exception(f"Error getting track info: {str(e)}")
async def get_download_url(self, track_id, quality="LOSSLESS"):
print("Fetching URL...")
download_api_url = f"https://hifi.401658.xyz/track/?id={track_id}&quality={quality}"
async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client:
try:
response = await client.get(download_api_url)
if response.status_code == 200:
data = response.json()
for item in data:
if "OriginalTrackUrl" in item:
print("URL found")
return {
"download_url": item["OriginalTrackUrl"],
"track_info": data[0] if data else {}
}
raise Exception("Download URL not found in response")
else:
raise Exception(f"API returned status code: {response.status_code}")
except Exception as e:
raise Exception(f"Error getting download URL: {str(e)}")
async def download_album_art(self, album_id, size="1280x1280"):
try:
art_url = f"https://resources.tidal.com/images/{album_id.replace('-', '/')}/{size}.jpg"
async with httpx.AsyncClient(http2=True, timeout=self.timeout) as client:
response = await client.get(art_url)
if response.status_code == 200:
return response.content
else:
print(f"Failed to download album art: HTTP {response.status_code}")
return None
except Exception as e:
print(f"Error downloading album art: {str(e)}")
return None
async def download_file(self, url, filepath, is_paused_callback=None, is_stopped_callback=None):
temp_filepath = filepath + ".part"
retry_count = 0
while retry_count <= self.max_retries:
try:
async with httpx.AsyncClient(http2=True, timeout=60.0) as client:
async with client.stream('GET', url) as response:
if response.status_code != 200:
raise Exception(f"HTTP {response.status_code}")
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
start_time = time.time()
last_update_time = start_time
with open(temp_filepath, 'wb') as f:
async for chunk in response.aiter_bytes(chunk_size=self.download_chunk_size):
if is_stopped_callback and is_stopped_callback():
f.close()
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
raise Exception("Download stopped")
while is_paused_callback and is_paused_callback():
await asyncio.sleep(0.1)
if is_stopped_callback and is_stopped_callback():
f.close()
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
raise Exception("Download stopped")
f.write(chunk)
downloaded_size += len(chunk)
current_time = time.time()
if current_time - last_update_time >= 1:
if total_size > 0:
progress_percent = (downloaded_size / total_size) * 100
elapsed_time = current_time - start_time
speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0
print(f"{progress_percent:.2f}% - {speed:.2f} MB/s")
else:
print(f"{downloaded_size / (1024 * 1024):.2f} MB")
last_update_time = current_time
if self.progress_callback:
self.progress_callback(downloaded_size, total_size)
os.rename(temp_filepath, filepath)
print("Download complete")
return {"success": True, "size": downloaded_size}
except Exception as e:
retry_count += 1
if retry_count > self.max_retries:
if os.path.exists(temp_filepath):
try:
os.remove(temp_filepath)
except:
pass
raise Exception(f"Download error after {self.max_retries} retries: {str(e)}")
print(f"Download error (attempt {retry_count}/{self.max_retries}): {str(e)}")
print(f"Retrying in {retry_count * 2} seconds...")
await asyncio.sleep(retry_count * 2)
async def embed_metadata(self, filepath, track_info, search_info=None):
try:
print("Embedding metadata...")
audio = FLAC(filepath)
audio.clear()
audio.clear_pictures()
if track_info.get("title"):
audio["TITLE"] = track_info["title"]
artists_list = []
if search_info and search_info.get("artists"):
for artist in search_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif search_info and search_info.get("artist") and search_info["artist"].get("name"):
artists_list.append(search_info["artist"]["name"])
elif track_info.get("artists"):
for artist in track_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif track_info.get("artist") and track_info["artist"].get("name"):
artists_list.append(track_info["artist"]["name"])
if artists_list:
audio["ARTIST"] = artists_list[0]
if len(artists_list) > 1:
audio["ALBUMARTIST"] = "; ".join(artists_list)
else:
audio["ALBUMARTIST"] = artists_list[0]
album_info = search_info.get("album", {}) if search_info else track_info.get("album", {})
if album_info.get("title"):
audio["ALBUM"] = album_info["title"]
if search_info and search_info.get("trackNumber"):
audio["TRACKNUMBER"] = str(search_info["trackNumber"])
elif track_info.get("trackNumber"):
audio["TRACKNUMBER"] = str(track_info["trackNumber"])
if search_info and search_info.get("volumeNumber"):
audio["DISCNUMBER"] = str(search_info["volumeNumber"])
elif track_info.get("volumeNumber"):
audio["DISCNUMBER"] = str(track_info["volumeNumber"])
duration = search_info.get("duration") if search_info else track_info.get("duration")
if duration:
audio["LENGTH"] = str(duration)
isrc = search_info.get("isrc") if search_info else track_info.get("isrc")
if isrc:
audio["ISRC"] = isrc
copyright_info = search_info.get("copyright") if search_info else track_info.get("copyright")
if copyright_info:
audio["COPYRIGHT"] = copyright_info
if album_info.get("releaseDate"):
audio["DATE"] = album_info["releaseDate"][:4]
try:
audio["YEAR"] = album_info["releaseDate"][:4]
except:
pass
if track_info.get("genre"):
audio["GENRE"] = track_info["genre"]
if track_info.get("audioQuality"):
audio["COMMENT"] = f"Tidal {track_info['audioQuality']}"
if album_info.get("cover"):
album_art = await self.download_album_art(album_info["cover"])
if album_art:
picture = Picture()
picture.data = album_art
picture.type = PictureType.COVER_FRONT
picture.mime = "image/jpeg"
picture.desc = "Cover"
audio.add_picture(picture)
print("Album art embedded")
audio.save()
print(f"Metadata embedded successfully for: {track_info.get('title', 'Unknown')}")
return True
except Exception as e:
print(f"Error embedding metadata: {str(e)}")
return False
async def download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", is_paused_callback=None, is_stopped_callback=None):
if output_dir != ".":
try:
os.makedirs(output_dir, exist_ok=True)
except OSError as e:
raise Exception(f"Directory error: {e}")
track_info = await self.get_track_info(query, isrc)
track_id = track_info.get("id")
if not track_id:
raise Exception("No track ID found")
artists_list = []
if track_info.get("artists"):
for artist in track_info["artists"]:
if artist.get("name"):
artists_list.append(artist["name"])
elif track_info.get("artist") and track_info["artist"].get("name"):
artists_list.append(track_info["artist"]["name"])
artist_name = ", ".join(artists_list) if artists_list else "Unknown Artist"
artist_name = self.sanitize_filename(artist_name)
track_title = self.sanitize_filename(track_info.get("title", f"track_{track_id}"))
output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac")
if os.path.exists(output_filename):
file_size = os.path.getsize(output_filename)
if file_size > 0:
print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)")
return output_filename
download_info = await self.get_download_url(track_id, quality)
download_url = download_info["download_url"]
download_track_info = download_info["track_info"]
print(f"Downloading to: {output_filename}")
await self.download_file(
download_url,
output_filename,
is_paused_callback=is_paused_callback,
is_stopped_callback=is_stopped_callback
)
print("Adding metadata...")
try:
await self.embed_metadata(output_filename, download_track_info, track_info)
print("Metadata saved")
except Exception as e:
print(f"Tagging failed: {e}")
print("Done")
return output_filename
async def main():
print("=== TidalDL - Tidal Downloader ===")
downloader = TidalDownloader(timeout=30, max_retries=3)
query = "APT."
isrc = "USAT22409172"
output_dir = "."
try:
downloaded_file = await downloader.download(query, isrc, output_dir)
print(f"Success: File saved as {downloaded_file}")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
try:
import sys
if sys.platform == "win32":
import os
os.system("chcp 65001 > nul")
try:
sys.stdout.reconfigure(encoding='utf-8')
except:
pass
except:
pass
asyncio.run(main())
+1 -1
View File
@@ -1,3 +1,3 @@
{
"version": "3.3"
"version": "3.6"
}