diff --git a/SpotiFLAC.py b/SpotiFLAC.py index d150a50..925bbb3 100644 --- a/SpotiFLAC.py +++ b/SpotiFLAC.py @@ -20,11 +20,8 @@ from PyQt6.QtGui import QIcon, QTextCursor, QDesktopServices, QPixmap, QBrush from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply from getMetadata import get_filtered_data, parse_uri, SpotifyInvalidUrlException -from qobuzAutoDL import QobuzDownloader as QobuzAutoDownloader -from qobuzRegionDL import QobuzDownloader as QobuzRegionDownloader from tidalDL import TidalDownloader from deezerDL import DeezerDownloader -from amazonDL import LucidaDownloader @dataclass class Track: @@ -64,7 +61,7 @@ class DownloadWorker(QThread): def __init__(self, tracks, outpath, is_single_track=False, is_album=False, is_playlist=False, album_or_playlist_name='', filename_format='title_artist', use_track_numbers=True, - use_artist_subfolders=False, use_album_subfolders=False, service="tidal", qobuz_region="us", qobuz_mode="auto"): + use_artist_subfolders=False, use_album_subfolders=False, service="tidal"): super().__init__() self.tracks = tracks self.outpath = outpath @@ -77,8 +74,6 @@ class DownloadWorker(QThread): self.use_artist_subfolders = use_artist_subfolders self.use_album_subfolders = use_album_subfolders self.service = service - self.qobuz_region = qobuz_region - self.qobuz_mode = qobuz_mode self.is_paused = False self.is_stopped = False self.failed_tracks = [] @@ -96,17 +91,10 @@ class DownloadWorker(QThread): def run(self): try: - if self.service == "qobuz": - if self.qobuz_mode == "auto": - downloader = QobuzAutoDownloader() - else: - downloader = QobuzRegionDownloader(self.qobuz_region) - elif self.service == "tidal": + if self.service == "tidal": downloader = TidalDownloader() elif self.service == "deezer": downloader = DeezerDownloader() - elif self.service == "amazon": - downloader = LucidaDownloader() else: downloader = TidalDownloader() @@ -161,24 +149,7 @@ class DownloadWorker(QThread): self.skipped_tracks.append(track) continue - if self.service == "qobuz": - if not track.isrc: - self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0) - self.failed_tracks.append((track.title, track.artists, "No ISRC available")) - continue - - self.progress.emit(f"Getting track from Qobuz with ISRC: {track.isrc}", 0) - - is_paused_callback = lambda: self.is_paused - is_stopped_callback = lambda: self.is_stopped - - downloaded_file = downloader.download( - track.isrc, - track_outpath, - is_paused_callback=is_paused_callback, - is_stopped_callback=is_stopped_callback - ) - elif self.service == "tidal": + if self.service == "tidal": if not track.isrc: self.progress.emit(f"No ISRC found for track: {track.title}. Skipping.", 0) self.failed_tracks.append((track.title, track.artists, "No ISRC available")) @@ -236,21 +207,6 @@ class DownloadWorker(QThread): raise Exception("Downloaded file not found") else: raise Exception("Deezer download failed") - elif self.service == "amazon": - self.progress.emit(f"Downloading from Amazon Music: {track.title} - {track.artists}", 0) - - is_paused_callback = lambda: self.is_paused - is_stopped_callback = lambda: self.is_stopped - - downloaded_file = downloader.download( - track.id, - track_outpath, - is_paused_callback=is_paused_callback, - is_stopped_callback=is_stopped_callback - ) - - if not downloaded_file or not os.path.exists(downloaded_file): - raise Exception("Amazon Music download failed") else: track_id = track.id self.progress.emit(f"Getting track info for ID: {track_id} from {self.service}", 0) @@ -373,26 +329,6 @@ class TidalStatusChecker(QThread): self.error.emit(f"Error checking Tidal (API) status: {str(e)}") self.status_updated.emit(False) -class QobuzStatusChecker(QThread): - status_updated = pyqtSignal(bool) - error = pyqtSignal(str) - - def __init__(self, region="us", mode="auto"): - super().__init__() - self.region = region - self.mode = mode - - def run(self): - try: - if self.mode == "auto": - response = requests.get("https://qobuz.squid.wtf", timeout=5) - else: - response = requests.get(f"https://{self.region}.qqdl.site", timeout=5) - self.status_updated.emit(response.status_code == 200) - except Exception as e: - self.error.emit(f"Error checking Qobuz status: {str(e)}") - self.status_updated.emit(False) - class DeezerStatusChecker(QThread): status_updated = pyqtSignal(bool) error = pyqtSignal(str) @@ -406,19 +342,6 @@ class DeezerStatusChecker(QThread): self.error.emit(f"Error checking Deezer status: {str(e)}") self.status_updated.emit(False) -class AmazonStatusChecker(QThread): - status_updated = pyqtSignal(bool) - error = pyqtSignal(str) - - def run(self): - try: - response = requests.get("https://lucida.to/api/load?url=%2Fapi%2Fcountries%3Fservice%3Damazon", timeout=5) - is_online = response.status_code == 200 - self.status_updated.emit(is_online) - except Exception as e: - self.error.emit(f"Error checking Amazon Music status: {str(e)}") - self.status_updated.emit(False) - class StatusIndicatorDelegate(QStyledItemDelegate): def paint(self, painter, option, index): item_data = index.data(Qt.ItemDataRole.UserRole) @@ -463,25 +386,14 @@ class ServiceComboBox(QComboBox): self.deezer_status_timer = QTimer(self) self.deezer_status_timer.timeout.connect(self.refresh_deezer_status) - self.deezer_status_timer.start(60000) - - self.amazon_status_checker = AmazonStatusChecker() - self.amazon_status_checker.status_updated.connect(self.update_amazon_service_status) - self.amazon_status_checker.error.connect(lambda e: print(f"Amazon Music status check error: {e}")) - self.amazon_status_checker.start() - - self.amazon_status_timer = QTimer(self) - self.amazon_status_timer.timeout.connect(self.refresh_amazon_status) - self.amazon_status_timer.start(60000) + self.deezer_status_timer.start(60000) def setup_items(self): current_dir = os.path.dirname(os.path.abspath(__file__)) self.services = [ - {'id': 'qobuz', 'name': 'Qobuz', 'icon': 'qobuz.png', 'online': False}, {'id': 'tidal', 'name': 'Tidal', 'icon': 'tidal.png', 'online': False}, - {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False}, - {'id': 'amazon', 'name': 'Amazon Music', 'icon': 'amazon.png', 'online': False} + {'id': 'deezer', 'name': 'Deezer', 'icon': 'deezer.png', 'online': False} ] for service in self.services: @@ -537,120 +449,13 @@ class ServiceComboBox(QComboBox): self.deezer_status_checker.error.connect(lambda e: print(f"Deezer status check error: {e}")) self.deezer_status_checker.start() - def update_amazon_service_status(self, is_online): - self.update_service_status('amazon', is_online) - - def refresh_amazon_status(self): - if hasattr(self, 'amazon_status_checker') and self.amazon_status_checker.isRunning(): - self.amazon_status_checker.quit() - self.amazon_status_checker.wait() - - self.amazon_status_checker = AmazonStatusChecker() - self.amazon_status_checker.status_updated.connect(self.update_amazon_service_status) - self.amazon_status_checker.error.connect(lambda e: print(f"Amazon Music status check error: {e}")) - self.amazon_status_checker.start() - def currentData(self, role=Qt.ItemDataRole.UserRole + 1): return super().currentData(role) - def update_qobuz_status(self, region_id, is_online): - for i in range(self.count()): - service_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) - - if service_id == 'qobuz': - service_data = self.itemData(i, Qt.ItemDataRole.UserRole) - if isinstance(service_data, dict): - if is_online or service_data.get('online', False): - service_data['online'] = True - self.setItemData(i, service_data, Qt.ItemDataRole.UserRole) - break - - self.update() - -class QobuzRegionComboBox(QComboBox): - status_updated = pyqtSignal(str, bool) - - def __init__(self, parent=None): - super().__init__(parent) - self.setIconSize(QSize(16, 16)) - - self.setItemDelegate(StatusIndicatorDelegate()) - - self.setup_items() - - self.status_checkers = {} - self.check_status() - - self.status_timer = QTimer(self) - self.status_timer.timeout.connect(self.check_status) - self.status_timer.start(60000) - - def setup_items(self): - current_dir = os.path.dirname(os.path.abspath(__file__)) - - self.regions = [ - {'id': 'us', 'name': 'USA', 'icon': 'us.svg', 'online': False}, - {'id': 'eu', 'name': 'Europe', 'icon': 'eu.svg', 'online': False}, - {'id': 'br', 'name': 'Brazil', 'icon': 'br.svg', 'online': False}, - {'id': 'jp', 'name': 'Japan', 'icon': 'jp.svg', 'online': False}, - {'id': 'au', 'name': 'Australia', 'icon': 'au.svg', 'online': False} - ] - - for region in self.regions: - icon_path = os.path.join(current_dir, region['icon']) - if not os.path.exists(icon_path): - self.create_placeholder_icon(icon_path) - - icon = QIcon(icon_path) - - self.addItem(icon, region['name']) - item_index = self.count() - 1 - self.setItemData(item_index, region['id'], Qt.ItemDataRole.UserRole + 1) - self.setItemData(item_index, region, Qt.ItemDataRole.UserRole) - - def create_placeholder_icon(self, path): - pixmap = QPixmap(16, 16) - pixmap.fill(Qt.GlobalColor.transparent) - pixmap.save(path) - - def update_region_status(self, region_id, is_online): - for i in range(self.count()): - current_region_id = self.itemData(i, Qt.ItemDataRole.UserRole + 1) - - if current_region_id == region_id: - region_data = self.itemData(i, Qt.ItemDataRole.UserRole) - if isinstance(region_data, dict): - region_data['online'] = is_online - self.setItemData(i, region_data, Qt.ItemDataRole.UserRole) - break - - self.update() - - def check_status(self): - for region_id, checker in self.status_checkers.items(): - if checker.isRunning(): - checker.quit() - checker.wait() - self.status_checkers.clear() - - for region in self.regions: - region_id = region['id'] - checker = QobuzStatusChecker(region_id, "region") - checker.status_updated.connect(lambda status, rid=region_id: self.handle_status_update(rid, status)) - checker.start() - self.status_checkers[region_id] = checker - - def handle_status_update(self, region_id, is_online): - self.update_region_status(region_id, is_online) - self.status_updated.emit(region_id, is_online) - - def currentData(self, role=Qt.ItemDataRole.UserRole + 1): - return super().currentData(role) - class SpotiFLACGUI(QWidget): def __init__(self): super().__init__() - self.current_version = "4.6" + self.current_version = "4.7" self.tracks = [] self.all_tracks = [] self.successful_downloads = [] @@ -665,8 +470,6 @@ class SpotiFLACGUI(QWidget): self.use_artist_subfolders = self.settings.value('use_artist_subfolders', False, type=bool) self.use_album_subfolders = self.settings.value('use_album_subfolders', False, type=bool) self.service = self.settings.value('service', 'tidal') - self.qobuz_region = self.settings.value('qobuz_region', 'us') - self.qobuz_mode = self.settings.value('qobuz_mode', 'auto') self.check_for_updates = self.settings.value('check_for_updates', True, type=bool) self.current_theme_color = self.settings.value('theme_color', '#2196F3') self.track_list_format = self.settings.value('track_list_format', 'track_artist_date_duration') @@ -1027,18 +830,19 @@ class SpotiFLACGUI(QWidget): control_layout = QHBoxLayout() self.stop_btn = QPushButton('Stop') self.pause_resume_btn = QPushButton('Pause') - self.remove_successful_btn = QPushButton('Remove Finished Songs') self.stop_btn.setFixedWidth(120) self.pause_resume_btn.setFixedWidth(120) - self.remove_successful_btn.setFixedWidth(200) self.stop_btn.setCursor(Qt.CursorShape.PointingHandCursor) self.pause_resume_btn.setCursor(Qt.CursorShape.PointingHandCursor) - self.remove_successful_btn.setCursor(Qt.CursorShape.PointingHandCursor) self.stop_btn.clicked.connect(self.stop_download) self.pause_resume_btn.clicked.connect(self.toggle_pause_resume) + + self.remove_successful_btn = QPushButton('Remove Finished Songs') + self.remove_successful_btn.setFixedWidth(200) + self.remove_successful_btn.setCursor(Qt.CursorShape.PointingHandCursor) self.remove_successful_btn.clicked.connect(self.remove_successful_downloads) control_layout.addStretch() @@ -1228,29 +1032,6 @@ class SpotiFLACGUI(QWidget): service_fallback_layout.addWidget(service_label) service_fallback_layout.addWidget(self.service_dropdown) - service_fallback_layout.addSpacing(10) - - self.qobuz_mode_label = QLabel('Mode:') - self.qobuz_mode_dropdown = QComboBox() - self.qobuz_mode_dropdown.addItem("Auto", "auto") - self.qobuz_mode_dropdown.addItem("Region", "region") - self.qobuz_mode_dropdown.currentIndexChanged.connect(self.on_qobuz_mode_changed) - service_fallback_layout.addWidget(self.qobuz_mode_label) - service_fallback_layout.addWidget(self.qobuz_mode_dropdown) - - service_fallback_layout.addSpacing(10) - - self.region_label = QLabel('Region:') - self.qobuz_region_dropdown = QobuzRegionComboBox() - self.qobuz_region_dropdown.currentIndexChanged.connect(self.save_qobuz_region_setting) - service_fallback_layout.addWidget(self.region_label) - service_fallback_layout.addWidget(self.qobuz_region_dropdown) - - self.qobuz_mode_label.hide() - self.qobuz_mode_dropdown.hide() - self.region_label.hide() - self.qobuz_region_dropdown.hide() - service_fallback_layout.addStretch() auth_layout.addLayout(service_fallback_layout) @@ -1259,17 +1040,9 @@ class SpotiFLACGUI(QWidget): settings_tab.setLayout(settings_layout) self.tab_widget.addTab(settings_tab, "Settings") self.set_combobox_value(self.service_dropdown, self.service) - self.set_combobox_value(self.qobuz_region_dropdown, self.qobuz_region) - self.set_combobox_value(self.qobuz_mode_dropdown, self.qobuz_mode) self.set_combobox_value(self.track_list_format_dropdown, self.track_list_format) self.set_combobox_value(self.date_format_dropdown, self.date_format) - self.update_service_ui() - - self.qobuz_region_dropdown.status_updated.connect( - lambda region_id, is_online: self.service_dropdown.update_qobuz_status(region_id, is_online) - ) - def setup_theme_tab(self): theme_tab = QWidget() theme_layout = QVBoxLayout() @@ -1471,7 +1244,7 @@ class SpotiFLACGUI(QWidget): about_layout.addWidget(section_widget) - footer_label = QLabel(f"v{self.current_version} | September 2025") + footer_label = QLabel(f"v{self.current_version} | October 2025") about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter) about_tab.setLayout(about_layout) @@ -1482,44 +1255,8 @@ class SpotiFLACGUI(QWidget): self.service = service self.settings.setValue('service', service) self.settings.sync() - - self.update_service_ui() self.log_output.append(f"Service changed to: {self.service_dropdown.currentText()}") - def on_qobuz_mode_changed(self, index): - mode = self.qobuz_mode_dropdown.currentData() - self.qobuz_mode = mode - self.settings.setValue('qobuz_mode', mode) - self.settings.sync() - - self.update_qobuz_mode_ui() - self.log_output.append(f"Qobuz mode changed to: {self.qobuz_mode_dropdown.currentText()}") - - def update_service_ui(self): - service = self.service - - if service == "qobuz": - self.qobuz_mode_label.show() - self.qobuz_mode_dropdown.show() - self.update_qobuz_mode_ui() - else: - self.qobuz_mode_label.hide() - self.qobuz_mode_dropdown.hide() - self.region_label.hide() - self.qobuz_region_dropdown.hide() - - def update_qobuz_mode_ui(self): - mode = self.qobuz_mode_dropdown.currentData() - if mode is None: - mode = self.qobuz_mode - - if mode == "region": - self.region_label.show() - self.qobuz_region_dropdown.show() - else: - self.region_label.hide() - self.qobuz_region_dropdown.hide() - def save_url(self): self.settings.setValue('spotify_url', self.spotify_url.text().strip()) self.settings.sync() @@ -1549,13 +1286,6 @@ class SpotiFLACGUI(QWidget): self.settings.setValue('use_album_subfolders', self.use_album_subfolders) self.settings.sync() - def save_qobuz_region_setting(self): - region = self.qobuz_region_dropdown.currentData() - self.qobuz_region = region - self.settings.setValue('qobuz_region', region) - self.settings.sync() - self.log_output.append(f"Qobuz region setting saved: {self.qobuz_region_dropdown.currentText()}") - def save_track_list_format(self): format_value = self.track_list_format_dropdown.currentData() self.track_list_format = format_value @@ -1952,8 +1682,6 @@ class SpotiFLACGUI(QWidget): def start_download_worker(self, tracks_to_download, outpath): service = self.service_dropdown.currentData() - qobuz_region = self.qobuz_region_dropdown.currentData() if service == "qobuz" else "us" - qobuz_mode = self.qobuz_mode_dropdown.currentData() if service == "qobuz" else "auto" self.worker = DownloadWorker( tracks_to_download, @@ -1966,9 +1694,7 @@ class SpotiFLACGUI(QWidget): self.use_track_numbers, self.use_artist_subfolders, self.use_album_subfolders, - service, - qobuz_region, - qobuz_mode + service ) self.worker.finished.connect(lambda success, message, failed_tracks, successful_tracks, skipped_tracks: self.on_download_finished(success, message, failed_tracks, successful_tracks, skipped_tracks)) self.worker.progress.connect(self.update_progress) @@ -2052,7 +1778,6 @@ class SpotiFLACGUI(QWidget): self.pause_resume_btn.setText('Resume') def remove_successful_downloads(self): - """Remove successfully downloaded and skipped tracks from the dashboard""" successful_tracks = getattr(self, 'successful_downloads', []) skipped_tracks = getattr(self, 'skipped_downloads', []) @@ -2062,7 +1787,6 @@ class SpotiFLACGUI(QWidget): tracks_to_remove = [] - # Check for successful downloads for track in self.tracks: for successful_track in successful_tracks: if (track.title == successful_track.title and @@ -2071,13 +1795,12 @@ class SpotiFLACGUI(QWidget): tracks_to_remove.append(track) break - # Check for skipped tracks (already exists) for track in self.tracks: for skipped_track in skipped_tracks: if (track.title == skipped_track.title and track.artists == skipped_track.artists and track.album == skipped_track.album): - if track not in tracks_to_remove: # Avoid duplicates + if track not in tracks_to_remove: tracks_to_remove.append(track) break @@ -2151,12 +1874,6 @@ class SpotiFLACGUI(QWidget): checker.quit() checker.wait() - if hasattr(self, 'qobuz_region_dropdown'): - for checker in self.qobuz_region_dropdown.status_checkers.values(): - if checker.isRunning(): - checker.quit() - checker.wait() - if hasattr(self, 'worker') and self.worker and self.worker.isRunning(): self.worker.stop() self.worker.quit() diff --git a/amazon.png b/amazon.png deleted file mode 100644 index c0c593b..0000000 Binary files a/amazon.png and /dev/null differ diff --git a/amazonDL.py b/amazonDL.py deleted file mode 100644 index 4ac7e4f..0000000 --- a/amazonDL.py +++ /dev/null @@ -1,128 +0,0 @@ -import requests -import time -import os -import re -import base64 -import urllib3 -from urllib.parse import unquote -from random import randrange - -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - -def get_random_user_agent(): - return f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_{randrange(11, 15)}_{randrange(4, 9)}) AppleWebKit/{randrange(530, 537)}.{randrange(30, 37)} (KHTML, like Gecko) Chrome/{randrange(80, 105)}.0.{randrange(3000, 4500)}.{randrange(60, 125)} Safari/{randrange(530, 537)}.{randrange(30, 36)}" - -def extract_data(html, patterns): - for pattern in patterns: - if match := re.search(pattern, html): - return match.group(1) - return None - -def download_track(track_id, service="amazon", output_dir="."): - client = requests.Session() - client.verify = False - headers = {'User-Agent': get_random_user_agent()} - - try: - spotify_url = f"https://open.spotify.com/track/{track_id}" - params = {"url": spotify_url, "country": "auto", "to": service} - - response = client.get("https://lucida.to", params=params, headers=headers, timeout=30) - html = response.text - - token = extract_data(html, [r'token:"([^"]+)"', r'"token"\s*:\s*"([^"]+)"']) - url = extract_data(html, [r'"url":"([^"]+)"', r'url:"([^"]+)"']) - expiry = extract_data(html, [r'tokenExpiry:(\d+)', r'"tokenExpiry"\s*:\s*(\d+)']) - - if not (token and url): - raise Exception("Could not extract required data") - - try: - decoded_token = base64.b64decode(base64.b64decode(token).decode('latin1')).decode('latin1') - except: - decoded_token = token - - clean_url = url.replace('\\/', '/') - print(f"Fetching: {clean_url}") - - request_data = { - "account": {"id": "auto", "type": "country"}, - "compat": "false", "downscale": "original", "handoff": True, - "metadata": True, "private": True, - "token": {"primary": decoded_token, "expiry": int(expiry) if expiry else None}, - "upload": {"enabled": False, "service": "pixeldrain"}, - "url": clean_url - } - - response = client.post("https://lucida.to/api/load?url=/api/fetch/stream/v2", - json=request_data, headers=headers) - - if csrf_token := response.cookies.get('csrf_token'): - headers['X-CSRF-Token'] = csrf_token - - data = response.json() - if not data.get("success"): - raise Exception(f"Request failed: {data.get('error', 'Unknown error')}") - - completion_url = f"https://{data['server']}.lucida.to/api/fetch/request/{data['handoff']}" - print("Fetching URL...") - - while True: - resp = client.get(completion_url, headers=headers).json() - if resp["status"] == "completed": - print("URL found") - break - elif resp["status"] == "error": - raise Exception(f"Processing failed: {resp.get('message', 'Unknown error')}") - elif progress := resp.get("progress"): - percent = int((progress.get("current", 0) / progress.get("total", 100)) * 100) - print(f"\r{percent}%", end="") - time.sleep(1) - - download_url = f"https://{data['server']}.lucida.to/api/fetch/request/{data['handoff']}/download" - response = client.get(download_url, stream=True, headers=headers) - - file_name = "track.flac" - if content_disp := response.headers.get('content-disposition'): - if match := re.search(r'filename[*]?=([^;]+)', content_disp): - raw_name = match.group(1).strip('"\'') - file_name = unquote(raw_name[7:] if raw_name.startswith("UTF-8''") else raw_name) - for char in '<>:"/\\|?*': - file_name = file_name.replace(char, '') - file_name = file_name.strip() - - file_path = os.path.join(output_dir, file_name) - print(f"Downloading...") - - with open(file_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) - - print("Download complete") - print("Done") - return file_path - - except Exception as e: - print(f"Error: {str(e)}") - return None - -class LucidaDownloader: - def __init__(self): - self.progress_callback = None - - def set_progress_callback(self, callback): - self.progress_callback = callback - - def download(self, track_id, output_dir, is_paused_callback=None, is_stopped_callback=None): - try: - return download_track(track_id, service="amazon", output_dir=output_dir) - except Exception as e: - raise Exception(f"Amazon Music download failed: {str(e)}") - -if __name__ == "__main__": - print("=== AmazonDL - Amazon Music Downloader ===") - track_id = "2plbrEY59IikOBgBGLjaoe" - service = "amazon" - - download_track(track_id, service) \ No newline at end of file diff --git a/au.svg b/au.svg deleted file mode 100644 index 96e8076..0000000 --- a/au.svg +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - diff --git a/br.svg b/br.svg deleted file mode 100644 index 719a763..0000000 --- a/br.svg +++ /dev/null @@ -1,45 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/eu.svg b/eu.svg deleted file mode 100644 index b0874c1..0000000 --- a/eu.svg +++ /dev/null @@ -1,28 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/jp.svg b/jp.svg deleted file mode 100644 index cc1c181..0000000 --- a/jp.svg +++ /dev/null @@ -1,11 +0,0 @@ - - - - - - - - - - - diff --git a/qobuz.png b/qobuz.png deleted file mode 100644 index a66fe4b..0000000 Binary files a/qobuz.png and /dev/null differ diff --git a/qobuzAutoDL.py b/qobuzAutoDL.py deleted file mode 100644 index 315c184..0000000 --- a/qobuzAutoDL.py +++ /dev/null @@ -1,251 +0,0 @@ -import requests -import time -import os -import re -from datetime import datetime -from mutagen.flac import FLAC, Picture -from mutagen.id3 import PictureType -from random import randrange - -def get_random_user_agent(): - return f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_{randrange(11, 15)}_{randrange(4, 9)}) AppleWebKit/{randrange(530, 537)}.{randrange(30, 37)} (KHTML, like Gecko) Chrome/{randrange(80, 105)}.0.{randrange(3000, 4500)}.{randrange(60, 125)} Safari/{randrange(530, 537)}.{randrange(30, 36)}" - -class ProgressCallback: - def __call__(self, current, total): - if total > 0: - percent = (current / total) * 100 - print(f"\r{percent:.2f}% ({current}/{total})", end="") - else: - print(f"\r{current / (1024 * 1024):.2f} MB", end="") - -class QobuzDownloader: - def __init__(self, timeout=30): - self.timeout = timeout - self.session = requests.Session() - self.headers = { - 'User-Agent': get_random_user_agent() - } - self.base_api_url = "https://qobuz.squid.wtf/api" - self.download_chunk_size = 256 * 1024 - self.progress_callback = ProgressCallback() - - def set_progress_callback(self, callback): - self.progress_callback = callback - - def sanitize_filename(self, filename): - if not filename: - return "Unknown Track" - sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename)) - return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track" - - def get_track_info(self, isrc): - print(f"Fetching: {isrc}") - search_url = f"{self.base_api_url}/get-music" - params = {'q': isrc, 'offset': 0, 'limit': 10, 'region': 'auto'} - - try: - response = self.session.get(search_url, params=params, timeout=self.timeout) - response.raise_for_status() - data = response.json() - - selected_track = None - if data and data.get("success"): - items = data.get("data", {}).get("tracks", {}).get("items", []) - priority = {24: 1, 16: 2} - for track in items: - if track.get("isrc") == isrc: - current_prio = priority.get(track.get("maximum_bit_depth"), 3) - if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3): - selected_track = track - if current_prio == 1: - break - - if not selected_track: - raise Exception(f"Track not found: {isrc}") - - title = selected_track.get('title', 'Unknown') - bit_depth = selected_track.get('maximum_bit_depth', 'Unknown') - print(f"Found: {title} ({bit_depth}b)") - return selected_track - - except requests.exceptions.RequestException as e: - raise Exception(f"Request error: {e}") - except Exception as e: - raise Exception(f"Error: {e}") - - def get_download_url(self, track_id): - print("Fetching URL...") - download_api_url = f"{self.base_api_url}/download-music" - params = {'track_id': track_id, 'quality': 27, 'region': 'auto'} - - try: - response = self.session.get(download_api_url, params=params, timeout=self.timeout) - response.raise_for_status() - data = response.json() - - if data and data.get("success") and data.get("data", {}).get("url"): - download_url = data["data"]["url"] - print("URL found") - return download_url - else: - error_msg = data.get('error', {}).get('message', 'Unknown API error') - raise Exception(f"API error: {error_msg}") - - except requests.exceptions.RequestException as e: - raise Exception(f"Request error: {e}") - except Exception as e: - raise Exception(f"Error: {e}") - - def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None): - if output_dir != ".": - try: - os.makedirs(output_dir, exist_ok=True) - except OSError as e: - raise Exception(f"Directory error: {e}") - - track_info = self.get_track_info(isrc) - track_id = track_info.get("id") - - if not track_id: - raise Exception("No track ID found") - - artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name')) - track_title = self.sanitize_filename(track_info.get('title')) - output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac") - - if os.path.exists(output_filename): - file_size = os.path.getsize(output_filename) - if file_size > 0: - print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)") - return output_filename - - download_url = self.get_download_url(track_id) - temp_filename = output_filename + ".part" - - print(f"Downloading...") - try: - response = self.session.get(download_url, timeout=900) - response.raise_for_status() - - if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped") - - while is_paused_callback and is_paused_callback(): - time.sleep(0.1) - if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped") - - with open(temp_filename, 'wb') as f: - f.write(response.content) - - downloaded_size = len(response.content) - total_size = downloaded_size - - if self.progress_callback: - self.progress_callback(downloaded_size, total_size) - - os.rename(temp_filename, output_filename) - print("Download complete") - - except requests.exceptions.RequestException as e: - if os.path.exists(temp_filename): - os.remove(temp_filename) - raise Exception(f"Download failed: {e}") - except Exception as e: - if os.path.exists(temp_filename): - os.remove(temp_filename) - raise Exception(f"File error: {e}") - - print("Adding metadata...") - try: - self._embed_metadata(output_filename, track_info) - print("Metadata saved") - except Exception as e: - print(f"Tagging failed: {e}") - - print(f"Done") - return output_filename - - def _embed_metadata(self, filename, track_info): - try: - audio = FLAC(filename) - audio.delete() - audio.clear_pictures() - - album_info = track_info.get('album', {}) - artist = track_info.get('performer', {}).get('name') - - if track_info.get('title'): - audio['TITLE'] = track_info['title'] - if artist: - audio['ARTIST'] = artist - if album_info.get('title'): - audio['ALBUM'] = album_info['title'] - if album_info.get('artist', {}).get('name', artist): - audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist) - if track_info.get('track_number'): - audio['TRACKNUMBER'] = str(track_info['track_number']) - if track_info.get('release_date_original'): - audio['DATE'] = track_info['release_date_original'] - try: - audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year) - except ValueError: - pass - if album_info.get('genre', {}).get('name'): - audio['GENRE'] = album_info['genre']['name'] - if track_info.get('copyright'): - audio['COPYRIGHT'] = track_info['copyright'] - if track_info.get('isrc'): - audio['ISRC'] = track_info['isrc'] - if album_info.get('label', {}).get('name'): - audio['ORGANIZATION'] = album_info['label']['name'] - - img_info = album_info.get('image', {}) - cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail') - if cover_url: - try: - img_response = self.session.get(cover_url, timeout=30) - img_response.raise_for_status() - mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower() - if mime_type in ['image/jpeg', 'image/png']: - picture = Picture() - picture.data = img_response.content - picture.type = PictureType.COVER_FRONT - picture.mime = mime_type - audio.add_picture(picture) - print("Cover added") - except Exception as e: - print(f"Cover error: {str(e)}") - - audio.save() - - except Exception as e: - raise Exception(f"Metadata error: {e}") - -def main(): - print("=== QobuzDL - Qobuz Downloader (Auto) ===") - downloader = QobuzDownloader() - - isrc = "USAT22409172" - output_dir = "." - - try: - downloaded_file = downloader.download(isrc, output_dir) - print(f"Success: File saved as {downloaded_file}") - except Exception as e: - print(f"Error: {str(e)}") - -if __name__ == "__main__": - try: - import sys - if sys.platform == "win32": - import os - os.system("chcp 65001 > nul") - try: - sys.stdout.reconfigure(encoding='utf-8') - except: - pass - except: - pass - - main() \ No newline at end of file diff --git a/qobuzRegionDL.py b/qobuzRegionDL.py deleted file mode 100644 index bafc02d..0000000 --- a/qobuzRegionDL.py +++ /dev/null @@ -1,255 +0,0 @@ -import requests -import time -import os -import re -from datetime import datetime -from mutagen.flac import FLAC, Picture -from mutagen.id3 import PictureType -from random import randrange - -def get_random_user_agent(): - return f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_{randrange(11, 15)}_{randrange(4, 9)}) AppleWebKit/{randrange(530, 537)}.{randrange(30, 37)} (KHTML, like Gecko) Chrome/{randrange(80, 105)}.0.{randrange(3000, 4500)}.{randrange(60, 125)} Safari/{randrange(530, 537)}.{randrange(30, 36)}" - -class ProgressCallback: - def __call__(self, current, total): - if total > 0: - percent = (current / total) * 100 - print(f"\r{percent:.2f}% ({current}/{total})", end="") - else: - print(f"\r{current / (1024 * 1024):.2f} MB", end="") - -class QobuzDownloader: - def __init__(self, region="us", timeout=30): - if region not in ["us", "eu", "br", "jp", "au"]: - raise ValueError("Region must be one of: 'us', 'eu', 'br', 'jp', 'au'") - - self.region = region - self.timeout = timeout - self.session = requests.Session() - self.headers = { - 'User-Agent': get_random_user_agent() - } - self.base_api_url = f"https://{region}.qqdl.site/api" - self.download_chunk_size = 256 * 1024 - self.progress_callback = ProgressCallback() - - def set_progress_callback(self, callback): - self.progress_callback = callback - - def sanitize_filename(self, filename): - if not filename: - return "Unknown Track" - sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename)) - return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track" - - def get_track_info(self, isrc): - print(f"Fetching: {isrc}") - search_url = f"{self.base_api_url}/get-music" - params = {'q': isrc, 'offset': 0, 'limit': 10} - - try: - response = self.session.get(search_url, params=params, timeout=self.timeout) - response.raise_for_status() - data = response.json() - - selected_track = None - if data and data.get("success"): - items = data.get("data", {}).get("tracks", {}).get("items", []) - priority = {24: 1, 16: 2} - for track in items: - if track.get("isrc") == isrc: - current_prio = priority.get(track.get("maximum_bit_depth"), 3) - if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3): - selected_track = track - if current_prio == 1: - break - - if not selected_track: - raise Exception(f"Track not found: {isrc}") - - title = selected_track.get('title', 'Unknown') - bit_depth = selected_track.get('maximum_bit_depth', 'Unknown') - print(f"Found: {title} ({bit_depth}b)") - return selected_track - - except requests.exceptions.RequestException as e: - raise Exception(f"Request error: {e}") - except Exception as e: - raise Exception(f"Error: {e}") - - def get_download_url(self, track_id): - print("Fetching URL...") - download_api_url = f"{self.base_api_url}/download-music" - params = {'track_id': track_id, 'quality': 27} - - try: - response = self.session.get(download_api_url, params=params, timeout=self.timeout) - response.raise_for_status() - data = response.json() - - if data and data.get("success") and data.get("data", {}).get("url"): - download_url = data["data"]["url"] - print("URL found") - return download_url - else: - error_msg = data.get('error', {}).get('message', 'Unknown API error') - raise Exception(f"API error: {error_msg}") - - except requests.exceptions.RequestException as e: - raise Exception(f"Request error: {e}") - except Exception as e: - raise Exception(f"Error: {e}") - - def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None): - if output_dir != ".": - try: - os.makedirs(output_dir, exist_ok=True) - except OSError as e: - raise Exception(f"Directory error: {e}") - - track_info = self.get_track_info(isrc) - track_id = track_info.get("id") - - if not track_id: - raise Exception("No track ID found") - - artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name')) - track_title = self.sanitize_filename(track_info.get('title')) - output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac") - - if os.path.exists(output_filename): - file_size = os.path.getsize(output_filename) - if file_size > 0: - print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)") - return output_filename - - download_url = self.get_download_url(track_id) - temp_filename = output_filename + ".part" - - print(f"Downloading...") - try: - response = self.session.get(download_url, timeout=900) - response.raise_for_status() - - if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped") - - while is_paused_callback and is_paused_callback(): - time.sleep(0.1) - if is_stopped_callback and is_stopped_callback(): - raise Exception("Download stopped") - - with open(temp_filename, 'wb') as f: - f.write(response.content) - - downloaded_size = len(response.content) - total_size = downloaded_size - - if self.progress_callback: - self.progress_callback(downloaded_size, total_size) - - os.rename(temp_filename, output_filename) - print("Download complete") - - except requests.exceptions.RequestException as e: - if os.path.exists(temp_filename): - os.remove(temp_filename) - raise Exception(f"Download failed: {e}") - except Exception as e: - if os.path.exists(temp_filename): - os.remove(temp_filename) - raise Exception(f"File error: {e}") - - print("Adding metadata...") - try: - self._embed_metadata(output_filename, track_info) - print("Metadata saved") - except Exception as e: - print(f"Tagging failed: {e}") - - print(f"Done") - return output_filename - - def _embed_metadata(self, filename, track_info): - try: - audio = FLAC(filename) - audio.delete() - audio.clear_pictures() - - album_info = track_info.get('album', {}) - artist = track_info.get('performer', {}).get('name') - - if track_info.get('title'): - audio['TITLE'] = track_info['title'] - if artist: - audio['ARTIST'] = artist - if album_info.get('title'): - audio['ALBUM'] = album_info['title'] - if album_info.get('artist', {}).get('name', artist): - audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist) - if track_info.get('track_number'): - audio['TRACKNUMBER'] = str(track_info['track_number']) - if track_info.get('release_date_original'): - audio['DATE'] = track_info['release_date_original'] - try: - audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year) - except ValueError: - pass - if album_info.get('genre', {}).get('name'): - audio['GENRE'] = album_info['genre']['name'] - if track_info.get('copyright'): - audio['COPYRIGHT'] = track_info['copyright'] - if track_info.get('isrc'): - audio['ISRC'] = track_info['isrc'] - if album_info.get('label', {}).get('name'): - audio['ORGANIZATION'] = album_info['label']['name'] - - img_info = album_info.get('image', {}) - cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail') - if cover_url: - try: - img_response = self.session.get(cover_url, timeout=30) - img_response.raise_for_status() - mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower() - if mime_type in ['image/jpeg', 'image/png']: - picture = Picture() - picture.data = img_response.content - picture.type = PictureType.COVER_FRONT - picture.mime = mime_type - audio.add_picture(picture) - print("Cover added") - except Exception as e: - print(f"Cover error: {str(e)}") - - audio.save() - - except Exception as e: - raise Exception(f"Metadata error: {e}") - -def main(): - print("=== QobuzDL - Qobuz Downloader (Region) ===") - downloader = QobuzDownloader(region="us") - - isrc = "USAT22409172" - output_dir = "." - - try: - downloaded_file = downloader.download(isrc, output_dir) - print(f"Success: File saved as {downloaded_file}") - except Exception as e: - print(f"Error: {str(e)}") - -if __name__ == "__main__": - try: - import sys - if sys.platform == "win32": - import os - os.system("chcp 65001 > nul") - try: - sys.stdout.reconfigure(encoding='utf-8') - except: - pass - except: - pass - - main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2ae380a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +PyQt6 +pyqt6-tools +pyqtdarktheme +requests +mutagen +pyotp +packaging +pyinstaller \ No newline at end of file diff --git a/us.svg b/us.svg deleted file mode 100644 index 9cfd0c9..0000000 --- a/us.svg +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - -