diff --git a/SpotiFLAC.py b/SpotiFLAC.py index 4fc3337..6161cf6 100644 --- a/SpotiFLAC.py +++ b/SpotiFLAC.py @@ -17,7 +17,7 @@ from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush, Q from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException -from getTracks import TrackDownloader +from getTracks import LucidaDownloader, SquidWTFDownloader @dataclass class Track: @@ -28,6 +28,7 @@ class Track: track_number: int duration_ms: int id: str + isrc: str = "" class MetadataFetchWorker(QThread): finished = pyqtSignal(dict) @@ -55,7 +56,7 @@ class DownloadWorker(QThread): def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False, album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True, - use_album_subfolders=False, use_fallback=False, service="amazon", timeout=30): + use_album_subfolders=False, use_fallback=False, service="amazon", timeout=30, qobuz_region="us"): super().__init__() self.tracks = tracks self.outpath = outpath @@ -69,6 +70,7 @@ class DownloadWorker(QThread): self.use_fallback = use_fallback self.service = service self.timeout = timeout + self.qobuz_region = qobuz_region self.is_paused = False self.is_stopped = False self.failed_tracks = [] @@ -82,7 +84,10 @@ class DownloadWorker(QThread): def run(self): try: - downloader = TrackDownloader(self.use_fallback, self.timeout) + if self.service == "qobuz": + downloader = SquidWTFDownloader(self.qobuz_region, self.timeout) + else: + downloader = LucidaDownloader(self.use_fallback, self.timeout) def progress_update(current, total): if total > 0: @@ -110,9 +115,6 @@ class DownloadWorker(QThread): int((i) / total_tracks * 100)) try: - track_id = track.id - self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0) - if self.is_playlist and self.use_album_subfolders: album_folder = re.sub(r'[<>:"/\\|?*]', '_', track.album) track_outpath = os.path.join(self.outpath, album_folder) @@ -120,21 +122,6 @@ class DownloadWorker(QThread): else: track_outpath = self.outpath - import asyncio - metadata = asyncio.run(downloader.get_track_info(track_id, self.service)) - - self.progress.emit(f"Track info received, starting download process", 0) - - is_paused_callback = lambda: self.is_paused - is_stopped_callback = lambda: self.is_stopped - - downloaded_file = downloader.download( - metadata, - track_outpath, - is_paused_callback=is_paused_callback, - is_stopped_callback=is_stopped_callback - ) - if (self.is_album or (self.is_playlist and self.use_album_subfolders)) and self.use_track_numbers: new_filename = f"{track.track_number:02d} - {self.get_formatted_filename(track)}" else: @@ -143,6 +130,54 @@ class DownloadWorker(QThread): new_filename = re.sub(r'[<>:"/\\|?*]', '_', new_filename) new_filepath = os.path.join(track_outpath, new_filename) + if os.path.exists(new_filepath) and os.path.getsize(new_filepath) > 0: + self.progress.emit(f"File already exists: {new_filename}. Skipping download.", 0) + self.progress.emit(f"Skipped: {track.title} - {track.artists}", + int((i + 1) / total_tracks * 100)) + continue + + if self.service == "qobuz": + if not track.isrc: + self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0) + self.failed_tracks.append((track.title, track.artists, "No ISRC available")) + continue + + self.progress.emit(f"Getting track from Qobuz with ISRC: {track.isrc}", 0) + + is_paused_callback = lambda: self.is_paused + is_stopped_callback = lambda: self.is_stopped + + downloaded_file = downloader.download( + track.isrc, + track_outpath, + is_paused_callback=is_paused_callback, + is_stopped_callback=is_stopped_callback + ) + else: + track_id = track.id + self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0) + + import asyncio + metadata = asyncio.run(downloader.get_track_info(track_id, self.service)) + + self.progress.emit(f"Track info received, starting download process", 0) + + is_paused_callback = lambda: self.is_paused + is_stopped_callback = lambda: self.is_stopped + + downloaded_file = downloader.download( + metadata, + track_outpath, + is_paused_callback=is_paused_callback, + is_stopped_callback=is_stopped_callback + ) + + if downloaded_file == new_filepath: + self.progress.emit(f"File already exists: {new_filename}", 0) + self.progress.emit(f"Skipped: {track.title} - {track.artists}", + int((i + 1) / total_tracks * 100)) + continue + if os.path.exists(downloaded_file) and downloaded_file != new_filepath: if os.path.exists(new_filepath): os.remove(new_filepath) @@ -236,6 +271,22 @@ class ServiceStatusChecker(QThread): except Exception as e: self.error.emit(f"Error checking service status: {str(e)}") +class QobuzStatusChecker(QThread): + status_updated = pyqtSignal(bool) + error = pyqtSignal(str) + + def __init__(self, region="us"): + super().__init__() + self.region = region + + def run(self): + try: + response = requests.get(f"https://{self.region}.qobuz.squid.wtf", timeout=5) + self.status_updated.emit(response.status_code == 200) + except Exception as e: + self.error.emit(f"Error checking Qobuz status: {str(e)}") + self.status_updated.emit(False) + class StatusIndicatorDelegate(QStyledItemDelegate): def paint(self, painter, option, index): item_data = index.data(Qt.ItemDataRole.UserRole) @@ -285,7 +336,8 @@ class ServiceComboBox(QComboBox): self.services = [ {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False}, {'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False}, - {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False} + {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False}, + {'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False} ] for service in self.services: @@ -327,11 +379,96 @@ class ServiceComboBox(QComboBox): def currentData(self, role=Qt.ItemDataRole.UserRole + 1): return super().currentData(role) + + def update_qobuz_status(self, region_id, is_online): + for i in range(self.count()): + service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) + + if service_id == 'qobuz': + service_data = self.itemData(i, Qt.ItemDataRole.UserRole) + if isinstance(service_data, dict): + if is_online or service_data.get('online', False): + service_data['online'] = True + self.setItemData(i, service_data, Qt.ItemDataRole.UserRole) + break + + self.update() + +class QobuzRegionComboBox(QComboBox): + status_updated = pyqtSignal(str, bool) + + def __init__(self, parent=None): + super().__init__(parent) + self.setIconSize(QSize(16, 16)) + + self.setItemDelegate(StatusIndicatorDelegate()) + + self.setup_items() + + self.status_checkers = {} + self.check_status() + + self.status_timer = QTimer(self) + self.status_timer.timeout.connect(self.check_status) + self.status_timer.start(10000) + + def setup_items(self): + current_dir = os.path.dirname(os.path.abspath(__file__)) + + self.regions = [ + {'id': 'eu', 'name': 'Europe', 'icon': 'eu.svg', 'online': False}, + {'id': 'us', 'name': 'North America', 'icon': 'us.svg', 'online': False} + ] + + for region in self.regions: + icon_path = os.path.join(current_dir, region['icon']) + if not os.path.exists(icon_path): + self.create_placeholder_icon(icon_path) + + icon = QIcon(icon_path) + + self.addItem(icon, region['name']) + item_index = self.count() - 1 + self.setItemData(item_index, region['id'], Qt.ItemDataRole.UserRole + 1) + self.setItemData(item_index, region, Qt.ItemDataRole.UserRole) + + def create_placeholder_icon(self, path): + pixmap = QPixmap(16, 16) + pixmap.fill(Qt.GlobalColor.transparent) + pixmap.save(path) + + def update_region_status(self, region_id, is_online): + for i in range(self.count()): + current_region_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) + + if current_region_id == region_id: + region_data = self.itemData(i, Qt.ItemDataRole.UserRole) + if isinstance(region_data, dict): + region_data['online'] = is_online + self.setItemData(i, region_data, Qt.ItemDataRole.UserRole) + break + + self.update() + + def check_status(self): + for region in self.regions: + region_id = region['id'] + checker = QobuzStatusChecker(region_id) + checker.status_updated.connect(lambda status, rid=region_id: self.handle_status_update(rid, status)) + checker.start() + self.status_checkers[region_id] = checker + + def handle_status_update(self, region_id, is_online): + self.update_region_status(region_id, is_online) + self.status_updated.emit(region_id, is_online) + + def currentData(self, role=Qt.ItemDataRole.UserRole + 1): + return super().currentData(role) class SpotiFLACGUI(QWidget): def __init__(self): super().__init__() - self.current_version = "2.6" + self.current_version = "2.7" self.tracks = [] self.reset_state() @@ -344,6 +481,7 @@ class SpotiFLACGUI(QWidget): self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool) self.use_fallback = self.settings.value('use_fallback', False, type=bool) self.service = self.settings.value('service', 'amazon') + self.qobuz_region = self.settings.value('qobuz_region', 'us') self.timeout_value = self.settings.value('timeout_value', 30, type=int) self.check_for_updates = self.settings.value('check_for_updates', True, type=bool) @@ -602,6 +740,7 @@ class SpotiFLACGUI(QWidget): output_dir_layout.addWidget(self.output_dir) output_dir_layout.addWidget(self.output_browse) + output_layout.addLayout(output_dir_layout) settings_layout.addWidget(output_group) @@ -663,7 +802,7 @@ class SpotiFLACGUI(QWidget): auth_layout = QVBoxLayout(auth_group) auth_layout.setSpacing(5) - auth_label = QLabel('Lucida Settings') + auth_label = QLabel('Service Settings') auth_label.setStyleSheet("font-weight: bold;") auth_layout.addWidget(auth_label) @@ -672,38 +811,58 @@ class SpotiFLACGUI(QWidget): service_label = QLabel('Service:') self.service_dropdown = ServiceComboBox() - self.service_dropdown.currentIndexChanged.connect(self.save_service_setting) + self.service_dropdown.currentIndexChanged.connect(self.on_service_changed) service_fallback_layout.addWidget(service_label) service_fallback_layout.addWidget(self.service_dropdown) - service_fallback_layout.addSpacing(20) - + service_fallback_layout.addSpacing(10) + self.fallback_checkbox = QCheckBox('Fallback') self.fallback_checkbox.setCursor(Qt.CursorShape.PointingHandCursor) self.fallback_checkbox.setChecked(self.use_fallback) self.fallback_checkbox.toggled.connect(self.save_fallback_setting) service_fallback_layout.addWidget(self.fallback_checkbox) - service_fallback_layout.addSpacing(20) - timeout_label = QLabel('Timeout:') self.timeout_input = QLineEdit() self.timeout_input.setText(str(self.timeout_value)) - self.timeout_input.setFixedWidth(60) + self.timeout_input.setFixedWidth(35) self.timeout_input.textChanged.connect(self.save_timeout_setting) service_fallback_layout.addWidget(timeout_label) service_fallback_layout.addWidget(self.timeout_input) + region_label = QLabel('Region:') + self.qobuz_region_dropdown = QobuzRegionComboBox() + self.qobuz_region_dropdown.currentIndexChanged.connect(self.save_qobuz_region_setting) + service_fallback_layout.addWidget(region_label) + service_fallback_layout.addWidget(self.qobuz_region_dropdown) + + region_label.hide() + self.qobuz_region_dropdown.hide() + service_fallback_layout.addStretch() auth_layout.addLayout(service_fallback_layout) settings_layout.addWidget(auth_group) - settings_layout.addStretch() settings_tab.setLayout(settings_layout) self.tab_widget.addTab(settings_tab, "Settings") + for i in range(self.service_dropdown.count()): + if self.service_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == self.service: + self.service_dropdown.setCurrentIndex(i) + break + + for i in range(self.qobuz_region_dropdown.count()): + if self.qobuz_region_dropdown.itemData(i, Qt.ItemDataRole.UserRole + 1) == self.qobuz_region: + self.qobuz_region_dropdown.setCurrentIndex(i) + break + + self.qobuz_region_dropdown.status_updated.connect( + lambda region_id, is_online: self.service_dropdown.update_qobuz_status(region_id, is_online) + ) + def setup_about_tab(self): about_tab = QWidget() about_layout = QVBoxLayout() @@ -754,13 +913,58 @@ class SpotiFLACGUI(QWidget): spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed) about_layout.addItem(spacer) - footer_label = QLabel("v2.6 | May 2025") + footer_label = QLabel("v2.7 | May 2025") footer_label.setStyleSheet("font-size: 12px; margin-top: 10px;") about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter) about_tab.setLayout(about_layout) self.tab_widget.addTab(about_tab, "About") + def on_service_changed(self, index): + service = self.service_dropdown.currentData() + self.service = service + self.settings.setValue('service', service) + self.settings.sync() + + timeout_label = None + for widget in self.timeout_input.parentWidget().children(): + if isinstance(widget, QLabel) and widget.text() == "Timeout:": + timeout_label = widget + break + + if service == "qobuz": + self.fallback_checkbox.hide() + self.timeout_input.hide() + if timeout_label: + timeout_label.hide() + + region_label = None + for widget in self.qobuz_region_dropdown.parentWidget().children(): + if isinstance(widget, QLabel) and widget.text() == "Region:": + region_label = widget + break + + if region_label: + region_label.show() + self.qobuz_region_dropdown.show() + else: + self.fallback_checkbox.show() + self.timeout_input.show() + if timeout_label: + timeout_label.show() + + region_label = None + for widget in self.qobuz_region_dropdown.parentWidget().children(): + if isinstance(widget, QLabel) and widget.text() == "Region:": + region_label = widget + break + + if region_label: + region_label.hide() + self.qobuz_region_dropdown.hide() + + self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}") + def save_url(self): self.settings.setValue('spotify_url', self.spotify_url.text().strip()) self.settings.sync() @@ -801,12 +1005,12 @@ class SpotiFLACGUI(QWidget): self.timeout_input.setText(str(self.timeout_value)) self.log_output.append("Timeout must be a valid number") - def save_service_setting(self): - service = self.service_dropdown.currentData() - self.service = service - self.settings.setValue('service', service) + def save_qobuz_region_setting(self): + region = self.qobuz_region_dropdown.currentData() + self.qobuz_region = region + self.settings.setValue('qobuz_region', region) self.settings.sync() - self.log_output.append(f"Service setting saved: {self.service_dropdown.currentText()}") + self.log_output.append(f"Qobuz region setting saved: {self.qobuz_region_dropdown.currentText()}") def save_settings(self): self.settings.setValue('output_path', self.output_dir.text().strip()) @@ -868,7 +1072,8 @@ class SpotiFLACGUI(QWidget): album=track_data["album_name"], track_number=1, duration_ms=track_data.get("duration_ms", 0), - id=track_id + id=track_id, + isrc=track_data.get("isrc", "") )] self.is_single_track = True self.is_album = self.is_playlist = False @@ -897,7 +1102,8 @@ class SpotiFLACGUI(QWidget): album=self.album_or_playlist_name, track_number=track["track_number"], duration_ms=track.get("duration_ms", 0), - id=track_id + id=track_id, + isrc=track.get("isrc", "") )) self.is_album = True @@ -926,7 +1132,8 @@ class SpotiFLACGUI(QWidget): album=track["album_name"], track_number=len(self.tracks) + 1, duration_ms=track.get("duration_ms", 0), - id=track_id + id=track_id, + isrc=track.get("isrc", "") )) self.is_playlist = True @@ -1083,6 +1290,7 @@ class SpotiFLACGUI(QWidget): def start_download_worker(self, tracks_to_download, outpath): service = self.service_dropdown.currentData() + qobuz_region = self.qobuz_region_dropdown.currentData() if service == "qobuz" else "us" self.worker = DownloadWorker( tracks_to_download, @@ -1096,7 +1304,8 @@ class SpotiFLACGUI(QWidget): self.use_album_subfolders, self.use_fallback, service, - self.timeout_value + self.timeout_value, + qobuz_region ) self.worker.finished.connect(self.on_download_finished) self.worker.progress.connect(self.update_progress) @@ -1136,7 +1345,7 @@ class SpotiFLACGUI(QWidget): else: self.log_output.append(message) - if percentage > 0: + if percentage > 0 and not "Download progress:" in message: self.progress_bar.setValue(percentage) def stop_download(self): @@ -1211,6 +1420,16 @@ class SpotiFLACGUI(QWidget): self.time_label.hide() if __name__ == '__main__': + try: + if sys.platform == "win32": + import os + os.system("chcp 65001 > nul") + import io + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') + sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + except Exception as e: + print(f"Warning: Could not set UTF-8 encoding: {e}") + app = QApplication(sys.argv) ex = SpotiFLACGUI() ex.show() diff --git a/getMetadata.py b/getMetadata.py index c4ab2c8..a40469c 100644 --- a/getMetadata.py +++ b/getMetadata.py @@ -319,6 +319,8 @@ def format_track_data(track_data): image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') if track_data.get('album', {}).get('images') else '' + isrc = track_data.get('external_ids', {}).get('isrc', '') + return { "track": { "artists": ", ".join(artists), @@ -328,7 +330,8 @@ def format_track_data(track_data): "images": image_url, "release_date": track_data.get('album', {}).get('release_date', ''), "track_number": track_data.get('track_number', 0), - "external_urls": track_data.get('external_urls', {}).get('spotify', '') + "external_urls": track_data.get('external_urls', {}).get('spotify', ''), + "isrc": isrc } } @@ -344,6 +347,20 @@ def format_album_data(album_data): track_artists = [] for artist in track.get('artists', []): track_artists.append(artist['name']) + + track_id = track.get('id', '') + track_isrc = '' + + if track_id and album_data.get('_token'): + try: + full_track_data = get_json_from_api( + track_base_url.format(track_id), + album_data.get('_token') + ) + if full_track_data: + track_isrc = full_track_data.get('external_ids', {}).get('isrc', '') + except: + pass track_list.append({ "artists": ", ".join(track_artists), @@ -353,7 +370,8 @@ def format_album_data(album_data): "images": image_url, "release_date": album_data.get('release_date', ''), "track_number": track.get('track_number', 0), - "external_urls": track.get('external_urls', {}).get('spotify', '') + "external_urls": track.get('external_urls', {}).get('spotify', ''), + "isrc": track_isrc }) album_info = { @@ -389,6 +407,8 @@ def format_playlist_data(playlist_data): if track.get('album', {}).get('images'): track_image = track.get('album', {}).get('images', [{}])[0].get('url', '') + track_isrc = track.get('external_ids', {}).get('isrc', '') + track_list.append({ "artists": ", ".join(artists), "name": track.get('name', ''), @@ -397,7 +417,8 @@ def format_playlist_data(playlist_data): "images": track_image, "release_date": track.get('album', {}).get('release_date', ''), "track_number": track.get('track_number', 0), - "external_urls": track.get('external_urls', {}).get('spotify', '') + "external_urls": track.get('external_urls', {}).get('spotify', ''), + "isrc": track_isrc }) playlist_info = { @@ -443,9 +464,9 @@ def get_filtered_data(spotify_url, batch=False, delay=1.0): return {"error": "Failed to get raw data"} if __name__ == '__main__': - playlist = "https://open.spotify.com/playlist/5Qvz8wZIRYbEUUFoPueKI5" - album = "https://open.spotify.com/album/7kFyd5oyJdVX2pIi6P4iHE" - song = "https://open.spotify.com/track/4wJ5Qq0jBN4ajy7ouZIV1c" + playlist = "https://open.spotify.com/playlist/37i9dQZEVXbNG2KDcFcKOF" + album = "https://open.spotify.com/album/6J84szYCnMfzEcvIcfWMFL" + song = "https://open.spotify.com/track/7so0lgd0zP2Sbgs2d7a1SZ" filtered_playlist = get_filtered_data(playlist, batch=True, delay=0.1) print(json.dumps(filtered_playlist, indent=2)) diff --git a/getTracks.py b/getTracks.py index dadd691..6e1d5ef 100644 --- a/getTracks.py +++ b/getTracks.py @@ -4,17 +4,31 @@ import os import asyncio import re import base64 +from datetime import datetime +from mutagen.flac import FLAC, Picture +from mutagen.id3 import PictureType -class TrackDownloader: - def __init__(self, use_fallback=False, timeout=30): +class ProgressCallback: + def __call__(self, current, total): + if total > 0: + percent = (current / total) * 100 + print(f"\r{percent:.2f}% ({current}/{total})", end="") + else: + print(f"\r{current / (1024 * 1024):.2f} MB", end="") + +class LucidaDownloader: + def __init__(self, domain="to", timeout=30): self.client = requests.Session() self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } - self.progress_callback = None - self.use_fallback = use_fallback + self.progress_callback = ProgressCallback() self.timeout = timeout - self.base_domain = "lucida.su" if use_fallback else "lucida.to" + + if domain not in ["to", "su"]: + raise ValueError("Domain must be either 'to' or 'su'") + + self.base_domain = f"lucida.{domain}" def set_progress_callback(self, callback): self.progress_callback = callback @@ -22,37 +36,34 @@ class TrackDownloader: def generate_filename(self, track_id, service): return f"{track_id}_{service}.flac" - async def get_track_info(self, track_id, service="amazon", use_fallback=None): - if use_fallback is None: - use_fallback = self.use_fallback + async def get_track_info(self, track_id, service="tidal"): + if service not in ["tidal", "amazon", "deezer"]: + raise ValueError("Service must be one of 'tidal', 'amazon', or 'deezer'") - domain_type = "su" if use_fallback else "to" - spotify_url = f"https://open.spotify.com/track/{track_id}" - result = self.convert_spotify_link(spotify_url, service, domain_type) + result = self._convert_spotify_link(spotify_url, service) if "error" in result: - raise Exception(f"Failed to get track info: {result['error']}") + raise Exception(f"Error: {result['error']}") result["track_id"] = track_id return result - def convert_spotify_link(self, spotify_url, target_service="amazon", domain_type="to"): + def _convert_spotify_link(self, spotify_url, target_service="tidal"): track_id_match = re.search(r'track/([a-zA-Z0-9]+)', spotify_url) if not track_id_match: return {"error": "Invalid Spotify URL"} - domain = "lucida.to" if domain_type == "to" else "lucida.su" - base_url = f"https://{domain}" + base_url = f"https://{self.base_domain}" headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "id-ID,id;q=0.9", "Cache-Control": "no-cache", "Connection": "keep-alive", - "Host": domain, + "Host": self.base_domain, "Pragma": "no-cache", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" @@ -142,17 +153,26 @@ class TrackDownloader: except Exception as error: return {"error": str(error)} - def download(self, metadata, output_dir, is_paused_callback=None, is_stopped_callback=None): + def download(self, metadata, output_dir=".", is_paused_callback=None, is_stopped_callback=None): track_url = metadata['url'] primary_token = metadata['token']['primary'] expiry = metadata['token']['expiry'] track_id = metadata['track_id'] service = metadata['service'] - print(f"Starting download for: {track_url}") + print(f"Starting download: track ID {track_id}") if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped by user") + raise Exception("Download stopped") + + file_name = self.generate_filename(track_id, service) + file_path = os.path.join(output_dir, file_name) + + if os.path.exists(file_path): + file_size = os.path.getsize(file_path) + if file_size > 0: + print(f"File already exists: {file_path} ({file_size / (1024 * 1024):.2f} MB)") + return file_path initial_request = { "account": {"id": "auto", "type": "country"}, @@ -180,7 +200,7 @@ class TrackDownloader: initial_response = response.json() if not initial_response.get("success", False): - raise Exception(f"Initial request failed: {initial_response.get('error', 'Unknown error')}") + raise Exception(f"Request failed: {initial_response.get('error', 'Unknown error')}") handoff = initial_response["handoff"] server = initial_response["server"] @@ -189,24 +209,24 @@ class TrackDownloader: completion_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}" - print("Waiting for track processing to complete") + print("Waiting for processing...") while True: if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped by user") + raise Exception("Download stopped") while is_paused_callback and is_paused_callback(): time.sleep(0.1) if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped by user") + raise Exception("Download stopped") completion_response = self.client.get(completion_url, headers=self.headers).json() status = completion_response["status"] if status == "completed": - print("Processing completed: 100%") + print("Processing: 100%") break elif status == "error": - raise Exception(f"API request failed: {completion_response.get('message', 'Unknown error')}") + raise Exception(f"API error: {completion_response.get('message', 'Unknown error')}") else: progress = completion_response.get("progress", {}) if progress: @@ -214,13 +234,13 @@ class TrackDownloader: total = progress.get("total", 100) percent = int((current / total) * 100) if total > 0 else 0 action = progress.get("action", "Processing") - print(f"Progress: {percent}% - {action} ({current}/{total})") + print(f"{percent}% - {action}") if action.lower() == "metadata": if self.progress_callback: self.progress_callback(0, 0) else: - print(f"Status: {status} - Waiting for progress information...") + print(f"Status: {status}") if status.lower() == "metadata": if self.progress_callback: self.progress_callback(0, 0) @@ -228,7 +248,7 @@ class TrackDownloader: time.sleep(1) download_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}/download" - print(f"Starting download of: {file_name}") + print(f"Downloading file...") response = self.client.get(download_url, stream=True, headers=self.headers) total_size = int(response.headers.get('content-length', 0)) @@ -246,7 +266,7 @@ class TrackDownloader: file.close() if os.path.exists(file_path): os.remove(file_path) - raise Exception("Download stopped by user") + raise Exception("Download stopped") while is_paused_callback and is_paused_callback(): time.sleep(0.1) @@ -254,7 +274,7 @@ class TrackDownloader: file.close() if os.path.exists(file_path): os.remove(file_path) - raise Exception("Download stopped by user") + raise Exception("Download stopped") if chunk: file.write(chunk) @@ -266,9 +286,9 @@ class TrackDownloader: progress_percent = (downloaded_size / total_size) * 100 elapsed_time = current_time - start_time speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0 - print(f"Download progress: {progress_percent:.2f}% ({downloaded_size}/{total_size}) - {speed:.2f} MB/s") + print(f"{progress_percent:.2f}% - {speed:.2f} MB/s") else: - print(f"Downloaded {downloaded_size / (1024 * 1024):.2f} MB") + print(f"{downloaded_size / (1024 * 1024):.2f} MB") last_update_time = current_time @@ -276,9 +296,9 @@ class TrackDownloader: self.progress_callback(downloaded_size, total_size) if downloaded_size == 0: - raise Exception("No data received from server") + raise Exception("No data received") - print(f"Download completed: {file_path}") + print(f"Complete. File saved: {file_path}") return file_path except Exception as e: @@ -289,30 +309,275 @@ class TrackDownloader: pass raise e +class SquidWTFDownloader: + def __init__(self, region="us", timeout=30): + if region not in ["eu", "us"]: + raise ValueError("Region must be either 'us' or 'eu'") + + self.region = region + self.timeout = timeout + self.session = requests.Session() + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + self.base_api_url = f"https://{region}.qobuz.squid.wtf/api" + self.download_chunk_size = 256 * 1024 + self.progress_callback = ProgressCallback() + + def set_progress_callback(self, callback): + self.progress_callback = callback + + def sanitize_filename(self, filename): + if not filename: + return "Unknown Track" + sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename)) + return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track" + + def get_track_info(self, isrc): + print(f"Fetching: {isrc}") + search_url = f"{self.base_api_url}/get-music" + params = {'q': isrc, 'offset': 0, 'limit': 10} + + try: + response = self.session.get(search_url, params=params, timeout=self.timeout) + response.raise_for_status() + data = response.json() + + selected_track = None + if data and data.get("success"): + items = data.get("data", {}).get("tracks", {}).get("items", []) + priority = {24: 1, 16: 2} + for track in items: + if track.get("isrc") == isrc: + current_prio = priority.get(track.get("maximum_bit_depth"), 3) + if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3): + selected_track = track + if current_prio == 1: + break + + if not selected_track: + raise Exception(f"Track not found: {isrc}") + + title = selected_track.get('title', 'Unknown') + bit_depth = selected_track.get('maximum_bit_depth', 'Unknown') + print(f"Found: {title} ({bit_depth}b)") + return selected_track + + except requests.exceptions.RequestException as e: + raise Exception(f"Request error: {e}") + except Exception as e: + raise Exception(f"Error: {e}") + + def get_download_url(self, track_id): + print("Fetching URL...") + download_api_url = f"{self.base_api_url}/download-music" + params = {'track_id': track_id, 'quality': 27} + + try: + response = self.session.get(download_api_url, params=params, timeout=self.timeout) + response.raise_for_status() + data = response.json() + + if data and data.get("success") and data.get("data", {}).get("url"): + download_url = data["data"]["url"] + print("URL found") + return download_url + else: + error_msg = data.get('error', {}).get('message', 'Unknown API error') + raise Exception(f"API error: {error_msg}") + + except requests.exceptions.RequestException as e: + raise Exception(f"Request error: {e}") + except Exception as e: + raise Exception(f"Error: {e}") + + def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None): + if output_dir != ".": + try: + os.makedirs(output_dir, exist_ok=True) + except OSError as e: + raise Exception(f"Directory error: {e}") + + track_info = self.get_track_info(isrc) + track_id = track_info.get("id") + + if not track_id: + raise Exception("No track ID found") + + artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name')) + track_title = self.sanitize_filename(track_info.get('title')) + output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac") + + if os.path.exists(output_filename): + file_size = os.path.getsize(output_filename) + if file_size > 0: + print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)") + return output_filename + + download_url = self.get_download_url(track_id) + temp_filename = output_filename + ".part" + + print(f"Downloading...") + try: + with self.session.get(download_url, stream=True, timeout=900) as response, \ + open(temp_filename, 'wb') as f: + response.raise_for_status() + total_size = int(response.headers.get('content-length', 0)) + downloaded_size = 0 + start_time = time.time() + last_update_time = start_time + + for chunk in response.iter_content(chunk_size=self.download_chunk_size): + if is_stopped_callback and is_stopped_callback(): + f.close() + if os.path.exists(temp_filename): + os.remove(temp_filename) + raise Exception("Download stopped") + + while is_paused_callback and is_paused_callback(): + time.sleep(0.1) + if is_stopped_callback and is_stopped_callback(): + f.close() + if os.path.exists(temp_filename): + os.remove(temp_filename) + raise Exception("Download stopped") + f.write(chunk) + downloaded_size += len(chunk) + + current_time = time.time() + if current_time - last_update_time >= 1: + if total_size > 0: + progress_percent = (downloaded_size / total_size) * 100 + elapsed_time = current_time - start_time + speed = downloaded_size / (1024 * 1024 * elapsed_time) if elapsed_time > 0 else 0 + print(f"{progress_percent:.2f}% - {speed:.2f} MB/s") + else: + print(f"{downloaded_size / (1024 * 1024):.2f} MB") + + last_update_time = current_time + + if self.progress_callback: + self.progress_callback(downloaded_size, total_size) + + os.rename(temp_filename, output_filename) + print("Download complete") + + except requests.exceptions.RequestException as e: + if os.path.exists(temp_filename): + os.remove(temp_filename) + raise Exception(f"Download failed: {e}") + except Exception as e: + if os.path.exists(temp_filename): + os.remove(temp_filename) + raise Exception(f"File error: {e}") + + print("Adding metadata...") + try: + self._embed_metadata(output_filename, track_info) + print("Metadata saved") + except Exception as e: + print(f"Tagging failed: {e}") + + print(f"Done") + return output_filename + + def _embed_metadata(self, filename, track_info): + try: + audio = FLAC(filename) + audio.delete() + audio.clear_pictures() + + album_info = track_info.get('album', {}) + artist = track_info.get('performer', {}).get('name') + + if track_info.get('title'): + audio['TITLE'] = track_info['title'] + if artist: + audio['ARTIST'] = artist + if album_info.get('title'): + audio['ALBUM'] = album_info['title'] + if album_info.get('artist', {}).get('name', artist): + audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist) + if track_info.get('track_number'): + audio['TRACKNUMBER'] = str(track_info['track_number']) + if track_info.get('release_date_original'): + audio['DATE'] = track_info['release_date_original'] + try: + audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year) + except ValueError: + pass + if album_info.get('genre', {}).get('name'): + audio['GENRE'] = album_info['genre']['name'] + if track_info.get('copyright'): + audio['COPYRIGHT'] = track_info['copyright'] + if track_info.get('isrc'): + audio['ISRC'] = track_info['isrc'] + if album_info.get('label', {}).get('name'): + audio['ORGANIZATION'] = album_info['label']['name'] + + img_info = album_info.get('image', {}) + cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail') + if cover_url: + try: + img_response = self.session.get(cover_url, timeout=30) + img_response.raise_for_status() + mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower() + if mime_type in ['image/jpeg', 'image/png']: + picture = Picture() + picture.data = img_response.content + picture.type = PictureType.COVER_FRONT + picture.mime = mime_type + audio.add_picture(picture) + print("Cover added") + except Exception as e: + print(f"Cover error: {str(e)}") + + audio.save() + + except Exception as e: + raise Exception(f"Metadata error: {e}") + async def main(): - use_fallback = False - downloader = TrackDownloader(use_fallback) + print("=== LucidaDownloader ===") + lucida = LucidaDownloader(domain="to") - output_dir = "." track_id = "2plbrEY59IikOBgBGLjaoe" service = "tidal" - - def progress_update(current, total): - if total > 0: - percent = (current / total) * 100 - print(f"\rDownload progress: {percent:.2f}% ({current}/{total})", end="") - - downloader.set_progress_callback(progress_update) + output_dir = "." try: - print(f"Getting track info for ID: {track_id} from {service}") - metadata = await downloader.get_track_info(track_id, service) - print(f"Track info received, starting download process") + print(f"Getting track: {track_id} from {service}") + metadata = await lucida.get_track_info(track_id, service) + print("Starting download") - downloaded_file = downloader.download(metadata, output_dir) - print(f"\nFile downloaded successfully: {downloaded_file}") + downloaded_file = lucida.download(metadata, output_dir) + print(f"Success: File saved as {downloaded_file}") except Exception as e: - print(f"An error occurred: {str(e)}") + print(f"Error: {str(e)}") + + print("\n\n=== SquidWTFDownloader ===") + squid = SquidWTFDownloader(region="us") + + isrc = "TCAIT2495017" + output_dir = "." + + try: + downloaded_file = squid.download(isrc, output_dir) + print(f"Success: File saved as {downloaded_file}") + except Exception as e: + print(f"Error: {str(e)}") if __name__ == "__main__": + try: + import sys + if sys.platform == "win32": + import os + os.system("chcp 65001 > nul") + try: + sys.stdout.reconfigure(encoding='utf-8') + except: + pass + except: + pass + asyncio.run(main()) \ No newline at end of file