From 222d681551b4c040d1810cd8159daa5aa924c35a Mon Sep 17 00:00:00 2001 From: afkarxyz <173781715+afkarxyz@users.noreply.github.com> Date: Tue, 8 Apr 2025 13:07:26 +0700 Subject: [PATCH] Update v2.4 --- SpotiFLAC.py | 51 +++++++--- getMetadata.py | 248 ++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 233 insertions(+), 66 deletions(-) diff --git a/SpotiFLAC.py b/SpotiFLAC.py index d755654..27b331e 100644 --- a/SpotiFLAC.py +++ b/SpotiFLAC.py @@ -29,6 +29,26 @@ class Track: duration_ms: int id: str +class MetadataFetchWorker(QThread): + finished = pyqtSignal(dict) + error = pyqtSignal(str) + + def __init__(self, url): + super().__init__() + self.url = url + + def run(self): + try: + metadata = get_filtered_data(self.url) + if "error" in metadata: + self.error.emit(metadata["error"]) + else: + self.finished.emit(metadata) + except SpotifyInvalidUrlException as e: + self.error.emit(str(e)) + except Exception as e: + self.error.emit(f'Failed to fetch metadata: {str(e)}') + class DownloadWorker(QThread): finished = pyqtSignal(bool, str, list) progress = pyqtSignal(str, int) @@ -310,7 +330,7 @@ class ServiceComboBox(QComboBox): class SpotiFLACGUI(QWidget): def __init__(self): super().__init__() - self.current_version = "2.3" + self.current_version = "2.4" self.tracks = [] self.reset_state() @@ -729,7 +749,7 @@ class SpotiFLACGUI(QWidget): spacer = QSpacerItem(20, 6, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed) about_layout.addItem(spacer) - footer_label = QLabel("v2.3 | March 2025") + footer_label = QLabel("v2.4 | April 2025") footer_label.setStyleSheet("font-size: 12px; color: palette(text); margin-top: 10px;") about_layout.addWidget(footer_label, alignment=Qt.AlignmentFlag.AlignCenter) @@ -788,11 +808,20 @@ class SpotiFLACGUI(QWidget): self.reset_state() self.reset_ui() - metadata = get_filtered_data(url) - if "error" in metadata: - raise Exception(metadata["error"]) - - url_info = parse_uri(url) + self.log_output.append('Just a moment. Fetching metadata...') + self.tab_widget.setCurrentWidget(self.process_tab) + + self.metadata_worker = MetadataFetchWorker(url) + self.metadata_worker.finished.connect(self.on_metadata_fetched) + self.metadata_worker.error.connect(self.on_metadata_error) + self.metadata_worker.start() + + except Exception as e: + self.log_output.append(f'Error: Failed to start metadata fetch: {str(e)}') + + def on_metadata_fetched(self, metadata): + try: + url_info = parse_uri(self.spotify_url.text().strip()) if url_info["type"] == "track": self.handle_track_metadata(metadata["track"]) @@ -803,11 +832,11 @@ class SpotiFLACGUI(QWidget): self.update_button_states() self.tab_widget.setCurrentIndex(0) - - except SpotifyInvalidUrlException as e: - self.log_output.append(f'Error: {str(e)}') except Exception as e: - self.log_output.append(f'Error: Failed to fetch metadata: {str(e)}') + self.log_output.append(f'Error: {str(e)}') + + def on_metadata_error(self, error_message): + self.log_output.append(f'Error: {error_message}') def handle_track_metadata(self, track_data): track_id = track_data["external_urls"].split("/")[-1] diff --git a/getMetadata.py b/getMetadata.py index 4d8ccf3..c4ab2c8 100644 --- a/getMetadata.py +++ b/getMetadata.py @@ -5,7 +5,7 @@ import json import hmac import time import hashlib -from typing import Tuple, Callable +from typing import Tuple, Callable, Dict, Any, List _TOTP_SECRET = bytearray([53,53,48,55,49,52,53,56,53,51,52,56,55,52,57,57,53,57,50,50,52,56,54,51,48,51,50,57,51,52,55]) @@ -100,9 +100,7 @@ def get_json_from_api(api_url, access_token): return req.json() -def get_raw_spotify_data(spotify_url): - url_info = parse_uri(spotify_url) - +def get_access_token(): try: totp, timestamp = generate_totp() @@ -117,34 +115,116 @@ def get_raw_spotify_data(spotify_url): req = requests.get(token_url, headers=headers, params=params, timeout=10) if req.status_code != 200: return {"error": f"Failed to get access token. Status code: {req.status_code}"} - token = req.json() + return req.json() except Exception as e: return {"error": f"Failed to get access token: {str(e)}"} + +def fetch_tracks_in_batches(url: str, access_token: str, batch_size: int = 100, delay: float = 1.0) -> Tuple[List[Dict[str, Any]], int]: + all_tracks = [] + current_batch = 0 + while url: + print(f"Batch : {current_batch}") + + url_parts = url.split("offset=") + if len(url_parts) > 1: + offset_part = url_parts[1].split("&")[0] + print(f"Offset : {offset_part}") + print("-------------") + + track_data = get_json_from_api(url, access_token) + if not track_data: + break + + items = track_data.get('items', []) + all_tracks.extend(items) + + url = track_data.get('next') + if url and "&locale=" in url: + url = url.split("&locale=")[0] + + if url and delay > 0: + sleep(delay) + + current_batch += 1 + + return all_tracks, current_batch + +def get_raw_spotify_data(spotify_url, batch: bool = False, delay: float = 1.0): + url_info = parse_uri(spotify_url) + token = get_access_token() + + if "error" in token: + return token + + access_token = token["accessToken"] raw_data = {} if url_info['type'] == "playlist": try: playlist_data = get_json_from_api( playlist_base_url.format(url_info["id"]), - token["accessToken"] + access_token ) if not playlist_data: return {"error": "Failed to get playlist data"} raw_data = playlist_data + total_tracks = playlist_data.get('tracks', {}).get('total', 0) - tracks = [] - tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100' - while tracks_url: - track_data = get_json_from_api(tracks_url, token["accessToken"]) - if not track_data: - break - - tracks.extend(track_data['items']) - tracks_url = track_data.get('next') + if batch: + tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100' + tracks, num_batches = fetch_tracks_in_batches(tracks_url, access_token, 100, delay) + raw_data['tracks']['items'] = tracks + raw_data['_batch_count'] = num_batches + raw_data['_batch_enabled'] = True + + if len(tracks) < total_tracks: + last_offset = len(tracks) + remaining_tracks = [] + + while last_offset < total_tracks: + print(f"Batch : {num_batches}") + print(f"Offset : {last_offset}") + print("-------------") + + remainder_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?offset={last_offset}&limit=100' + track_data = get_json_from_api(remainder_url, access_token) + + if not track_data or not track_data.get('items'): + break + + items = track_data.get('items', []) + remaining_tracks.extend(items) + + if len(items) < 100: + break + + last_offset += len(items) + num_batches += 1 + + if delay > 0: + sleep(delay) + + tracks.extend(remaining_tracks) + raw_data['tracks']['items'] = tracks + raw_data['_batch_count'] = num_batches + else: + tracks = [] + tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100' + while tracks_url: + track_data = get_json_from_api(tracks_url, access_token) + if not track_data: + break + + tracks.extend(track_data['items']) + tracks_url = track_data.get('next') + if tracks_url and "&locale=" in tracks_url: + tracks_url = tracks_url.split("&locale=")[0] + + raw_data['tracks']['items'] = tracks + raw_data['_batch_enabled'] = False - raw_data['tracks']['items'] = tracks except Exception as e: return {"error": f"Failed to get playlist data: {str(e)}"} @@ -152,25 +232,68 @@ def get_raw_spotify_data(spotify_url): try: album_data = get_json_from_api( album_base_url.format(url_info["id"]), - token["accessToken"] + access_token ) if not album_data: return {"error": "Failed to get album data"} - album_data['_token'] = token["accessToken"] + album_data['_token'] = access_token raw_data = album_data + total_tracks = album_data.get('total_tracks', 0) - tracks = [] - tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50' - while tracks_url: - track_data = get_json_from_api(tracks_url, token["accessToken"]) - if not track_data: - break - - tracks.extend(track_data['items']) - tracks_url = track_data.get('next') + if batch: + tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50' + tracks, num_batches = fetch_tracks_in_batches(tracks_url, access_token, 50, delay) + raw_data['tracks']['items'] = tracks + raw_data['_batch_count'] = num_batches + raw_data['_batch_enabled'] = True + + if len(tracks) < total_tracks: + last_offset = len(tracks) + remaining_tracks = [] + + while last_offset < total_tracks: + print(f"Batch : {num_batches}") + print(f"Offset : {last_offset}") + print("-------------") + + remainder_url = f'{album_base_url.format(url_info["id"])}/tracks?offset={last_offset}&limit=50' + track_data = get_json_from_api(remainder_url, access_token) + + if not track_data or not track_data.get('items'): + break + + items = track_data.get('items', []) + remaining_tracks.extend(items) + + if len(items) < 50: + break + + last_offset += len(items) + num_batches += 1 + + if delay > 0: + sleep(delay) + + tracks.extend(remaining_tracks) + raw_data['tracks']['items'] = tracks + raw_data['_batch_count'] = num_batches + else: + tracks = [] + tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50' + while tracks_url: + track_data = get_json_from_api(tracks_url, access_token) + if not track_data: + break + + tracks.extend(track_data['items']) + tracks_url = track_data.get('next') + if tracks_url and "&locale=" in tracks_url: + tracks_url = tracks_url.split("&locale=")[0] + + raw_data['tracks']['items'] = tracks + raw_data['_batch_enabled'] = False - raw_data['tracks']['items'] = tracks except Exception as e: return {"error": f"Failed to get album data: {str(e)}"} @@ -178,7 +301,7 @@ def get_raw_spotify_data(spotify_url): try: track_data = get_json_from_api( track_base_url.format(url_info["id"]), - token["accessToken"] + access_token ) if not track_data: return {"error": "Failed to get track data"} @@ -191,10 +314,10 @@ def get_raw_spotify_data(spotify_url): def format_track_data(track_data): artists = [] - for artist in track_data['artists']: + for artist in track_data.get('artists', []): artists.append(artist['name']) - image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') + image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') if track_data.get('album', {}).get('images') else '' return { "track": { @@ -211,10 +334,10 @@ def format_track_data(track_data): def format_album_data(album_data): artists = [] - for artist in album_data['artists']: + for artist in album_data.get('artists', []): artists.append(artist['name']) - image_url = album_data.get('images', [{}])[0].get('url', '') + image_url = album_data.get('images', [{}])[0].get('url', '') if album_data.get('images') else '' track_list = [] for track in album_data.get('tracks', {}).get('items', []): @@ -233,28 +356,38 @@ def format_album_data(album_data): "external_urls": track.get('external_urls', {}).get('spotify', '') }) + album_info = { + "total_tracks": album_data.get('total_tracks', 0), + "name": album_data.get('name', ''), + "release_date": album_data.get('release_date', ''), + "artists": ", ".join(artists), + "images": image_url + } + + if album_data.get('_batch_enabled', False): + album_info["batch"] = f"{album_data.get('_batch_count', 1)}" + return { - "album_info": { - "total_tracks": album_data.get('total_tracks', 0), - "name": album_data.get('name', ''), - "release_date": album_data.get('release_date', ''), - "artists": ", ".join(artists), - "images": image_url - }, + "album_info": album_info, "track_list": track_list } def format_playlist_data(playlist_data): - image_url = playlist_data.get('images', [{}])[0].get('url', '') + image_url = playlist_data.get('images', [{}])[0].get('url', '') if playlist_data.get('images') else '' track_list = [] for item in playlist_data.get('tracks', {}).get('items', []): track = item.get('track', {}) + if not track: + continue + artists = [] for artist in track.get('artists', []): artists.append(artist['name']) - track_image = track.get('album', {}).get('images', [{}])[0].get('url', '') + track_image = '' + if track.get('album', {}).get('images'): + track_image = track.get('album', {}).get('images', [{}])[0].get('url', '') track_list.append({ "artists": ", ".join(artists), @@ -267,16 +400,21 @@ def format_playlist_data(playlist_data): "external_urls": track.get('external_urls', {}).get('spotify', '') }) + playlist_info = { + "tracks": {"total": playlist_data.get('tracks', {}).get('total', 0)}, + "followers": {"total": playlist_data.get('followers', {}).get('total', 0)}, + "owner": { + "display_name": playlist_data.get('owner', {}).get('display_name', ''), + "name": playlist_data.get('name', ''), + "images": image_url + } + } + + if playlist_data.get('_batch_enabled', False): + playlist_info["batch"] = f"{playlist_data.get('_batch_count', 1)}" + return { - "playlist_info": { - "tracks": {"total": playlist_data.get('tracks', {}).get('total', 0)}, - "followers": {"total": playlist_data.get('followers', {}).get('total', 0)}, - "owner": { - "display_name": playlist_data.get('owner', {}).get('display_name', ''), - "name": playlist_data.get('name', ''), - "images": image_url - } - }, + "playlist_info": playlist_info, "track_list": track_list } @@ -296,8 +434,8 @@ def process_spotify_data(raw_data, data_type): except Exception as e: return {"error": f"Error processing data: {str(e)}"} -def get_filtered_data(spotify_url): - raw_data = get_raw_spotify_data(spotify_url) +def get_filtered_data(spotify_url, batch=False, delay=1.0): + raw_data = get_raw_spotify_data(spotify_url, batch=batch, delay=delay) if raw_data and "error" not in raw_data: url_info = parse_uri(spotify_url) filtered_data = process_spotify_data(raw_data, url_info['type']) @@ -305,11 +443,11 @@ def get_filtered_data(spotify_url): return {"error": "Failed to get raw data"} if __name__ == '__main__': - playlist = "https://open.spotify.com/playlist/37i9dQZEVXbNG2KDcFcKOF" + playlist = "https://open.spotify.com/playlist/5Qvz8wZIRYbEUUFoPueKI5" album = "https://open.spotify.com/album/7kFyd5oyJdVX2pIi6P4iHE" song = "https://open.spotify.com/track/4wJ5Qq0jBN4ajy7ouZIV1c" - filtered_playlist = get_filtered_data(playlist) + filtered_playlist = get_filtered_data(playlist, batch=True, delay=0.1) print(json.dumps(filtered_playlist, indent=2)) filtered_album = get_filtered_data(album)