Compare commits
77 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bdc7717ef3 | |||
| 9a7c539418 | |||
| 888ce2b61c | |||
| e2e1ab1cfa | |||
| 9bddeab0d1 | |||
| 03a30ee09a | |||
| 2d908e2f75 | |||
| e8f7bf7313 | |||
| 1f0922f358 | |||
| 3f267a3fa1 | |||
| 22da74a027 | |||
| 783350fe88 | |||
| 0057d43f46 | |||
| 9928968ffb | |||
| af4f1dd401 | |||
| 3414fadbd3 | |||
| 457f30da99 | |||
| d4e621b36c | |||
| 58a733b790 | |||
| c85ab4bc28 | |||
| dac2e99b5a | |||
| d0f494f582 | |||
| 0542d6e86b | |||
| de798e4807 | |||
| 0e7ba6d029 | |||
| 2306b1f8d2 | |||
| 1b0d67702d | |||
| 00e369677f | |||
| c3e1607ca6 | |||
| 59428e7679 | |||
| 33c4698286 | |||
| 3ac4c34d73 | |||
| 88e303cbe4 | |||
| c13855fadd | |||
| 2b12684960 | |||
| 4bc164cc56 | |||
| 46cb65665e | |||
| 276b3b4951 | |||
| e15aadbd61 | |||
| d7639bae8f | |||
| 1af7ab65c9 | |||
| c5240596cb | |||
| c4a9042adc | |||
| 45ac08ecbd | |||
| 0add305d9c | |||
| 9b6b43c0a4 | |||
| 60d20cbebe | |||
| 626d58667e | |||
| 4dd1a7ea12 | |||
| 67964e4acb | |||
| 1486fb13df | |||
| 966536f127 | |||
| 21946321f5 | |||
| 3e3cb0610d | |||
| 160eba0987 | |||
| 71a60ded47 | |||
| e0a0514df9 | |||
| 1e7a48d263 | |||
| 0a83a0dd6e | |||
| da429d9410 | |||
| 63211c726b | |||
| 055cb6991a | |||
| 222d681551 | |||
| 479c6ede2b | |||
| ceb727adb9 | |||
| bbea8ca493 | |||
| f567dd19bf | |||
| 3651833e2a | |||
| 8403b96306 | |||
| d977829e36 | |||
| 2aaf123c98 | |||
| 13567802a0 | |||
| d704782519 | |||
| 884c02278f | |||
| d6abe2bae3 | |||
| 7b858dd0ce | |||
| cfeb9a2ef2 |
@@ -1,99 +0,0 @@
|
||||
import asyncio
|
||||
import zendriver as zd
|
||||
|
||||
async def get_metadata(page, headless=True):
|
||||
max_attempts = 40
|
||||
attempts = 0
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
await page.evaluate("""
|
||||
window.downloadInfo = null;
|
||||
const originalFetch = window.fetch;
|
||||
window.fetch = async function(...args) {
|
||||
const [url, config] = args;
|
||||
if (url.includes('/api/load?url=%2Fapi%2Ffetch%2Fstream%2Fv2')) {
|
||||
const payload = JSON.parse(config.body);
|
||||
const title = document.querySelector('h1.svelte-6pt9ji').textContent.trim();
|
||||
const artists = Array.from(document.querySelectorAll('h2.svelte-6pt9ji a.normal'))
|
||||
.map(a => a.textContent.trim())
|
||||
.join(', ');
|
||||
const cover = document.querySelector('.svelte-6pt9ji .meta.svelte-6pt9ji a').href;
|
||||
|
||||
window.downloadInfo = {
|
||||
url: payload.url,
|
||||
cover: cover,
|
||||
title: title,
|
||||
artists: artists,
|
||||
token: payload.token.primary,
|
||||
expiry: payload.token.expiry
|
||||
};
|
||||
}
|
||||
return originalFetch.apply(this, args);
|
||||
};
|
||||
""")
|
||||
|
||||
await page.evaluate("""
|
||||
function waitForElement(selector) {
|
||||
return new Promise(resolve => {
|
||||
if (document.querySelector(selector)) {
|
||||
return resolve(document.querySelector(selector));
|
||||
}
|
||||
|
||||
const observer = new MutationObserver(mutations => {
|
||||
if (document.querySelector(selector)) {
|
||||
observer.disconnect();
|
||||
resolve(document.querySelector(selector));
|
||||
}
|
||||
});
|
||||
|
||||
observer.observe(document.documentElement, {
|
||||
childList: true,
|
||||
subtree: true
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
(async () => {
|
||||
if (!window.location.hostname.includes('lucida.')) return;
|
||||
|
||||
await Promise.race([
|
||||
waitForElement('.d1-track button'),
|
||||
waitForElement('button[class*="download-button"]')
|
||||
]);
|
||||
|
||||
const clickDownloadButton = () => {
|
||||
const button = document.querySelector('.d1-track button') ||
|
||||
document.querySelector('button[class*="download-button"]');
|
||||
if (button) button.click();
|
||||
};
|
||||
|
||||
clickDownloadButton();
|
||||
})();
|
||||
""")
|
||||
|
||||
while attempts < max_attempts:
|
||||
download_info = await page.evaluate("window.downloadInfo")
|
||||
if download_info:
|
||||
return download_info
|
||||
|
||||
await asyncio.sleep(0.5)
|
||||
attempts += 1
|
||||
|
||||
raise TimeoutError("Timeout")
|
||||
|
||||
async def main(headless=True):
|
||||
browser = await zd.start(headless=headless)
|
||||
try:
|
||||
track_id = "2plbrEY59IikOBgBGLjaoe"
|
||||
url = f"https://lucida.to/?url=https%3A%2F%2Fopen.spotify.com%2Ftrack%2F{track_id}&country=auto&to=tidal"
|
||||
|
||||
page = await browser.get(url)
|
||||
download_info = await get_metadata(page)
|
||||
print(download_info)
|
||||
return download_info
|
||||
finally:
|
||||
await browser.stop()
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,27 +1,20 @@
|
||||
[](https://github.com/afkarxyz/SpotiFLAC/releases)
|
||||
|
||||

|
||||

|
||||
|
||||
<div align="center">
|
||||
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Tidal, Amazon Music and Deezer with the help of Lucida.
|
||||
<b>SpotiFLAC</b> allows you to download Spotify tracks in true FLAC format through services like Qobuz, Tidal & Deezer.
|
||||
</div>
|
||||
|
||||
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v1.9/SpotiFLAC.exe)
|
||||
|
||||
#
|
||||
|
||||
> [!WARNING]
|
||||
Sometimes, the **download speed** from Lucida can be fast or slow; it varies unpredictably.
|
||||
### [Download](https://github.com/afkarxyz/SpotiFLAC/releases/download/v4.1/SpotiFLAC.exe)
|
||||
|
||||
## Screenshots
|
||||
|
||||
> When **Fallback** is enabled, it will use the backup server `Lucida.su`
|
||||

|
||||
|
||||

|
||||

|
||||
|
||||

|
||||
|
||||

|
||||

|
||||
|
||||
## Lossless Audio Check
|
||||
|
||||
|
||||
+1627
-608
File diff suppressed because it is too large
Load Diff
BIN
Binary file not shown.
|
After Width: | Height: | Size: 6.4 KiB |
+237
@@ -0,0 +1,237 @@
|
||||
import requests
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from mutagen.flac import FLAC
|
||||
|
||||
class DeezerDownloader:
|
||||
def __init__(self):
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||||
})
|
||||
self.progress_callback = None
|
||||
|
||||
def set_progress_callback(self, callback):
|
||||
self.progress_callback = callback
|
||||
|
||||
def get_track_by_isrc(self, isrc):
|
||||
try:
|
||||
url = f"https://api.deezer.com/2.0/track/isrc:{isrc}"
|
||||
response = self.session.get(url)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
|
||||
if 'error' in data:
|
||||
print(f"Error from Deezer API: {data['error']['message']}")
|
||||
return None
|
||||
|
||||
return data
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Error fetching track data: {e}")
|
||||
return None
|
||||
|
||||
def extract_metadata(self, track_data):
|
||||
metadata = {}
|
||||
|
||||
metadata['title'] = track_data.get('title', '')
|
||||
metadata['title_short'] = track_data.get('title_short', '')
|
||||
metadata['duration'] = track_data.get('duration', 0)
|
||||
metadata['track_position'] = track_data.get('track_position', 1)
|
||||
metadata['disk_number'] = track_data.get('disk_number', 1)
|
||||
metadata['isrc'] = track_data.get('isrc', '')
|
||||
metadata['release_date'] = track_data.get('release_date', '')
|
||||
metadata['explicit_lyrics'] = track_data.get('explicit_lyrics', False)
|
||||
|
||||
if 'artist' in track_data:
|
||||
metadata['artist'] = track_data['artist'].get('name', '')
|
||||
metadata['artist_id'] = track_data['artist'].get('id', '')
|
||||
|
||||
if 'contributors' in track_data:
|
||||
artists = []
|
||||
for contributor in track_data['contributors']:
|
||||
if contributor.get('role') == 'Main':
|
||||
artists.append(contributor.get('name', ''))
|
||||
metadata['artists'] = ', '.join(artists) if artists else metadata.get('artist', '')
|
||||
|
||||
if 'album' in track_data:
|
||||
album = track_data['album']
|
||||
metadata['album'] = album.get('title', '')
|
||||
metadata['album_id'] = album.get('id', '')
|
||||
metadata['cover_url'] = album.get('cover_xl', album.get('cover_big', ''))
|
||||
metadata['cover_md5'] = album.get('md5_image', '')
|
||||
|
||||
metadata['deezer_link'] = track_data.get('link', '')
|
||||
metadata['preview_url'] = track_data.get('preview', '')
|
||||
|
||||
return metadata
|
||||
|
||||
def download_cover_art(self, cover_url, filename):
|
||||
if not cover_url:
|
||||
return None
|
||||
|
||||
try:
|
||||
response = self.session.get(cover_url)
|
||||
response.raise_for_status()
|
||||
|
||||
cover_path = f"{filename}_cover.jpg"
|
||||
with open(cover_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
|
||||
return cover_path
|
||||
except Exception as e:
|
||||
print(f"Error downloading cover art: {e}")
|
||||
return None
|
||||
|
||||
def embed_metadata(self, file_path, metadata, cover_path=None):
|
||||
try:
|
||||
audio = FLAC(file_path)
|
||||
|
||||
audio.clear()
|
||||
|
||||
if metadata.get('title'):
|
||||
audio['TITLE'] = metadata['title']
|
||||
if metadata.get('artists'):
|
||||
audio['ARTIST'] = metadata['artists']
|
||||
elif metadata.get('artist'):
|
||||
audio['ARTIST'] = metadata['artist']
|
||||
if metadata.get('album'):
|
||||
audio['ALBUM'] = metadata['album']
|
||||
if metadata.get('release_date'):
|
||||
audio['DATE'] = metadata['release_date']
|
||||
if metadata.get('track_position'):
|
||||
audio['TRACKNUMBER'] = str(metadata['track_position'])
|
||||
if metadata.get('disk_number'):
|
||||
audio['DISCNUMBER'] = str(metadata['disk_number'])
|
||||
if metadata.get('isrc'):
|
||||
audio['ISRC'] = metadata['isrc']
|
||||
|
||||
if cover_path and os.path.exists(cover_path):
|
||||
with open(cover_path, 'rb') as f:
|
||||
cover_data = f.read()
|
||||
|
||||
from mutagen.flac import Picture
|
||||
picture = Picture()
|
||||
picture.type = 3
|
||||
picture.mime = 'image/jpeg'
|
||||
picture.desc = 'Cover'
|
||||
picture.data = cover_data
|
||||
audio.add_picture(picture)
|
||||
|
||||
audio.save()
|
||||
print(f"Metadata embedded successfully in {file_path}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error embedding metadata: {e}")
|
||||
|
||||
async def download_by_isrc(self, isrc, output_dir="."):
|
||||
print(f"Fetching track info for ISRC: {isrc}")
|
||||
|
||||
track_data = self.get_track_by_isrc(isrc)
|
||||
if not track_data:
|
||||
print("Failed to get track data from Deezer API")
|
||||
return False
|
||||
|
||||
metadata = self.extract_metadata(track_data)
|
||||
print(f"Found track: {metadata.get('artists', 'Unknown')} - {metadata.get('title', 'Unknown')}")
|
||||
|
||||
track_id = track_data.get('id')
|
||||
if not track_id:
|
||||
print("No track ID found in Deezer API response")
|
||||
return False
|
||||
|
||||
print(f"Using track ID: {track_id}")
|
||||
|
||||
api_url = f"https://api.deezmate.com/dl/{track_id}"
|
||||
print(f"Requesting download links from: {api_url}")
|
||||
|
||||
try:
|
||||
response = self.session.get(api_url)
|
||||
response.raise_for_status()
|
||||
api_data = response.json()
|
||||
|
||||
if not api_data.get('success'):
|
||||
print("API request failed")
|
||||
return False
|
||||
|
||||
links = api_data.get('links', {})
|
||||
flac_url = links.get('flac')
|
||||
|
||||
if not flac_url:
|
||||
print("No FLAC download link found in API response")
|
||||
return False
|
||||
|
||||
print(f"Successfully obtained FLAC download URL")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error getting download URL from API: {e}")
|
||||
return False
|
||||
|
||||
print("Downloading FLAC file...")
|
||||
try:
|
||||
response = self.session.get(flac_url)
|
||||
response.raise_for_status()
|
||||
|
||||
safe_title = "".join(c for c in metadata.get('title', 'Unknown') if c.isalnum() or c in (' ', '-', '_')).rstrip()
|
||||
safe_artist = "".join(c for c in metadata.get('artists', 'Unknown') if c.isalnum() or c in (' ', '-', '_')).rstrip()
|
||||
filename = f"{safe_artist} - {safe_title}.flac"
|
||||
file_path = os.path.join(output_dir, filename)
|
||||
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
|
||||
downloaded = len(response.content)
|
||||
print(f"File size: {downloaded} bytes ({downloaded / (1024*1024):.2f} MB)")
|
||||
|
||||
if self.progress_callback:
|
||||
self.progress_callback(downloaded, downloaded)
|
||||
|
||||
print(f"Downloaded: {file_path}")
|
||||
|
||||
cover_path = None
|
||||
if metadata.get('cover_url'):
|
||||
print("Downloading cover art...")
|
||||
cover_path = self.download_cover_art(metadata['cover_url'],
|
||||
os.path.join(output_dir, f"{safe_artist} - {safe_title}"))
|
||||
|
||||
print("Embedding metadata...")
|
||||
self.embed_metadata(file_path, metadata, cover_path)
|
||||
|
||||
if cover_path and os.path.exists(cover_path):
|
||||
os.remove(cover_path)
|
||||
|
||||
print(f"Successfully downloaded and tagged: {filename}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error downloading file: {e}")
|
||||
return False
|
||||
|
||||
async def main():
|
||||
print("=== DeezerDL - Deezer Downloader ===")
|
||||
downloader = DeezerDownloader()
|
||||
|
||||
isrc = "USAT22409172"
|
||||
output_dir = "."
|
||||
|
||||
success = await downloader.download_by_isrc(isrc, output_dir)
|
||||
if success:
|
||||
print("Download completed successfully!")
|
||||
else:
|
||||
print("Download failed!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
import sys
|
||||
if sys.platform == "win32":
|
||||
import os
|
||||
os.system("chcp 65001 > nul")
|
||||
try:
|
||||
sys.stdout.reconfigure(encoding='utf-8')
|
||||
except:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,28 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" id="flag-icons-eu" viewBox="0 0 640 480">
|
||||
<defs>
|
||||
<g id="eu-d">
|
||||
<g id="eu-b">
|
||||
<path id="eu-a" d="m0-1-.3 1 .5.1z"/>
|
||||
<use xlink:href="#eu-a" transform="scale(-1 1)"/>
|
||||
</g>
|
||||
<g id="eu-c">
|
||||
<use xlink:href="#eu-b" transform="rotate(72)"/>
|
||||
<use xlink:href="#eu-b" transform="rotate(144)"/>
|
||||
</g>
|
||||
<use xlink:href="#eu-c" transform="scale(-1 1)"/>
|
||||
</g>
|
||||
</defs>
|
||||
<path fill="#039" d="M0 0h640v480H0z"/>
|
||||
<g fill="#fc0" transform="translate(320 242.3)scale(23.7037)">
|
||||
<use xlink:href="#eu-d" width="100%" height="100%" y="-6"/>
|
||||
<use xlink:href="#eu-d" width="100%" height="100%" y="6"/>
|
||||
<g id="eu-e">
|
||||
<use xlink:href="#eu-d" width="100%" height="100%" x="-6"/>
|
||||
<use xlink:href="#eu-d" width="100%" height="100%" transform="rotate(-144 -2.3 -2.1)"/>
|
||||
<use xlink:href="#eu-d" width="100%" height="100%" transform="rotate(144 -2.1 -2.3)"/>
|
||||
<use xlink:href="#eu-d" width="100%" height="100%" transform="rotate(72 -4.7 -2)"/>
|
||||
<use xlink:href="#eu-d" width="100%" height="100%" transform="rotate(72 -5 .5)"/>
|
||||
</g>
|
||||
<use xlink:href="#eu-e" width="100%" height="100%" transform="scale(-1 1)"/>
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 1.3 KiB |
+505
@@ -0,0 +1,505 @@
|
||||
from time import sleep
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
import pyotp
|
||||
import base64
|
||||
from random import randrange
|
||||
from typing import Dict, Any, List, Tuple
|
||||
|
||||
# https://github.com/visagenull/Spotify-Free
|
||||
def get_random_user_agent():
|
||||
return f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_{randrange(11, 15)}_{randrange(4, 9)}) AppleWebKit/{randrange(530, 537)}.{randrange(30, 37)} (KHTML, like Gecko) Chrome/{randrange(80, 105)}.0.{randrange(3000, 4500)}.{randrange(60, 125)} Safari/{randrange(530, 537)}.{randrange(30, 36)}"
|
||||
|
||||
def generate_totp():
|
||||
url = "https://raw.githubusercontent.com/Thereallo1026/spotify-secrets/refs/heads/main/secrets/secretBytes.json"
|
||||
|
||||
try:
|
||||
resp = requests.get(url, timeout=10)
|
||||
if resp.status_code != 200:
|
||||
raise Exception(f"Failed to fetch TOTP secrets from GitHub. Status: {resp.status_code}")
|
||||
secrets_list = resp.json()
|
||||
|
||||
latest_entry = max(secrets_list, key=lambda x: x["version"])
|
||||
version = latest_entry["version"]
|
||||
secret_cipher = latest_entry["secret"]
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to fetch secrets from GitHub: {str(e)}")
|
||||
|
||||
processed = [byte ^ ((i % 33) + 9) for i, byte in enumerate(secret_cipher)]
|
||||
processed_str = "".join(map(str, processed))
|
||||
utf8_bytes = processed_str.encode('utf-8')
|
||||
hex_str = utf8_bytes.hex()
|
||||
secret_bytes = bytes.fromhex(hex_str)
|
||||
b32_secret = base64.b32encode(secret_bytes).decode('utf-8')
|
||||
totp = pyotp.TOTP(b32_secret)
|
||||
|
||||
headers = {
|
||||
"Host": "open.spotify.com",
|
||||
"User-Agent": get_random_user_agent(),
|
||||
"Accept": "*/*",
|
||||
}
|
||||
|
||||
try:
|
||||
resp = requests.get("https://open.spotify.com/api/server-time", headers=headers, timeout=10)
|
||||
if resp.status_code != 200:
|
||||
raise Exception(f"Failed to get server time. Status code: {resp.status_code}")
|
||||
data = resp.json()
|
||||
server_time = data.get("serverTime")
|
||||
if server_time is None:
|
||||
raise Exception("Failed to fetch server time from Spotify")
|
||||
return totp, server_time, version
|
||||
except Exception as e:
|
||||
raise Exception(f"Error getting server time: {str(e)}")
|
||||
|
||||
token_url = 'https://open.spotify.com/api/token'
|
||||
playlist_base_url = 'https://api.spotify.com/v1/playlists/{}'
|
||||
album_base_url = 'https://api.spotify.com/v1/albums/{}'
|
||||
track_base_url = 'https://api.spotify.com/v1/tracks/{}'
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
|
||||
'Accept': 'application/json',
|
||||
'Accept-Language': 'en-US,en;q=0.9',
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
'sec-ch-ua-platform': '"Windows"',
|
||||
'sec-fetch-dest': 'empty',
|
||||
'sec-fetch-mode': 'cors',
|
||||
'sec-fetch-site': 'same-origin',
|
||||
'Referer': 'https://open.spotify.com/',
|
||||
'Origin': 'https://open.spotify.com'
|
||||
}
|
||||
|
||||
class SpotifyInvalidUrlException(Exception):
|
||||
pass
|
||||
|
||||
class SpotifyWebsiteParserException(Exception):
|
||||
pass
|
||||
|
||||
def parse_uri(uri):
|
||||
u = urlparse(uri)
|
||||
if u.netloc == "embed.spotify.com":
|
||||
if not u.query:
|
||||
raise SpotifyInvalidUrlException("ERROR: url {} is not supported".format(uri))
|
||||
qs = parse_qs(u.query)
|
||||
return parse_uri(qs['uri'][0])
|
||||
|
||||
if not u.scheme and not u.netloc:
|
||||
return {"type": "playlist", "id": u.path}
|
||||
|
||||
if u.scheme == "spotify":
|
||||
parts = uri.split(":")
|
||||
else:
|
||||
if u.netloc != "open.spotify.com" and u.netloc != "play.spotify.com":
|
||||
raise SpotifyInvalidUrlException("ERROR: url {} is not supported".format(uri))
|
||||
parts = u.path.split("/")
|
||||
|
||||
if parts[1] == "embed":
|
||||
parts = parts[1:]
|
||||
|
||||
l = len(parts)
|
||||
if l == 3 and parts[1] in ["album", "track", "playlist"]:
|
||||
return {"type": parts[1], "id": parts[2]}
|
||||
if l == 5 and parts[3] == "playlist":
|
||||
return {"type": parts[3], "id": parts[4]}
|
||||
|
||||
raise SpotifyInvalidUrlException("ERROR: unable to determine Spotify URL type or type is unsupported.")
|
||||
|
||||
def get_json_from_api(api_url, access_token):
|
||||
headers.update({'Authorization': 'Bearer {}'.format(access_token)})
|
||||
|
||||
req = requests.get(api_url, headers=headers, timeout=10)
|
||||
|
||||
if req.status_code == 429:
|
||||
seconds = int(req.headers.get("Retry-After", "5")) + 1
|
||||
print(f"INFO: rate limited! Sleeping for {seconds} seconds")
|
||||
sleep(seconds)
|
||||
return None
|
||||
|
||||
if req.status_code != 200:
|
||||
raise SpotifyWebsiteParserException(f"ERROR: {api_url} gave us not a 200. Instead: {req.status_code}")
|
||||
|
||||
return req.json()
|
||||
|
||||
def get_access_token():
|
||||
try:
|
||||
totp, server_time, totp_version = generate_totp()
|
||||
otp_code = totp.at(int(server_time))
|
||||
timestamp_ms = int(time.time() * 1000)
|
||||
|
||||
params = {
|
||||
'reason': 'init',
|
||||
'productType': 'web-player',
|
||||
'totp': otp_code,
|
||||
'totpServerTime': server_time,
|
||||
'totpVer': str(totp_version),
|
||||
'sTime': server_time,
|
||||
'cTime': timestamp_ms,
|
||||
'buildVer': 'web-player_2025-07-02_1720000000000_12345678',
|
||||
'buildDate': '2025-07-02'
|
||||
}
|
||||
|
||||
req = requests.get(token_url, headers=headers, params=params, timeout=10)
|
||||
if req.status_code != 200:
|
||||
return {"error": f"Failed to get access token. Status code: {req.status_code}"}
|
||||
return req.json()
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to get access token: {str(e)}"}
|
||||
|
||||
def fetch_tracks_in_batches(url: str, access_token: str, batch_size: int = 100, delay: float = 1.0) -> Tuple[List[Dict[str, Any]], int]:
|
||||
all_tracks = []
|
||||
current_batch = 0
|
||||
|
||||
while url:
|
||||
print(f"Batch : {current_batch}")
|
||||
|
||||
url_parts = url.split("offset=")
|
||||
if len(url_parts) > 1:
|
||||
offset_part = url_parts[1].split("&")[0]
|
||||
print(f"Offset : {offset_part}")
|
||||
print("-------------")
|
||||
|
||||
track_data = get_json_from_api(url, access_token)
|
||||
if not track_data:
|
||||
break
|
||||
|
||||
items = track_data.get('items', [])
|
||||
all_tracks.extend(items)
|
||||
|
||||
url = track_data.get('next')
|
||||
if url and "&locale=" in url:
|
||||
url = url.split("&locale=")[0]
|
||||
|
||||
if url and delay > 0:
|
||||
sleep(delay)
|
||||
|
||||
current_batch += 1
|
||||
|
||||
return all_tracks, current_batch
|
||||
|
||||
def get_raw_spotify_data(spotify_url, batch: bool = False, delay: float = 1.0):
|
||||
url_info = parse_uri(spotify_url)
|
||||
token = get_access_token()
|
||||
|
||||
if "error" in token:
|
||||
return token
|
||||
|
||||
access_token = token["accessToken"]
|
||||
raw_data = {}
|
||||
|
||||
if url_info['type'] == "playlist":
|
||||
try:
|
||||
playlist_data = get_json_from_api(
|
||||
playlist_base_url.format(url_info["id"]),
|
||||
access_token
|
||||
)
|
||||
if not playlist_data:
|
||||
return {"error": "Failed to get playlist data"}
|
||||
|
||||
raw_data = playlist_data
|
||||
total_tracks = playlist_data.get('tracks', {}).get('total', 0)
|
||||
|
||||
if batch:
|
||||
tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100'
|
||||
tracks, num_batches = fetch_tracks_in_batches(tracks_url, access_token, 100, delay)
|
||||
raw_data['tracks']['items'] = tracks
|
||||
raw_data['_batch_count'] = num_batches
|
||||
raw_data['_batch_enabled'] = True
|
||||
|
||||
if len(tracks) < total_tracks:
|
||||
last_offset = len(tracks)
|
||||
remaining_tracks = []
|
||||
|
||||
while last_offset < total_tracks:
|
||||
print(f"Batch : {num_batches}")
|
||||
print(f"Offset : {last_offset}")
|
||||
print("-------------")
|
||||
|
||||
remainder_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?offset={last_offset}&limit=100'
|
||||
track_data = get_json_from_api(remainder_url, access_token)
|
||||
|
||||
if not track_data or not track_data.get('items'):
|
||||
break
|
||||
|
||||
items = track_data.get('items', [])
|
||||
remaining_tracks.extend(items)
|
||||
|
||||
if len(items) < 100:
|
||||
break
|
||||
|
||||
last_offset += len(items)
|
||||
num_batches += 1
|
||||
|
||||
if delay > 0:
|
||||
sleep(delay)
|
||||
|
||||
tracks.extend(remaining_tracks)
|
||||
raw_data['tracks']['items'] = tracks
|
||||
raw_data['_batch_count'] = num_batches
|
||||
else:
|
||||
tracks = []
|
||||
tracks_url = f'https://api.spotify.com/v1/playlists/{url_info["id"]}/tracks?limit=100'
|
||||
while tracks_url:
|
||||
track_data = get_json_from_api(tracks_url, access_token)
|
||||
if not track_data:
|
||||
break
|
||||
|
||||
tracks.extend(track_data['items'])
|
||||
tracks_url = track_data.get('next')
|
||||
if tracks_url and "&locale=" in tracks_url:
|
||||
tracks_url = tracks_url.split("&locale=")[0]
|
||||
|
||||
raw_data['tracks']['items'] = tracks
|
||||
raw_data['_batch_enabled'] = False
|
||||
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to get playlist data: {str(e)}"}
|
||||
|
||||
elif url_info["type"] == "album":
|
||||
try:
|
||||
album_data = get_json_from_api(
|
||||
album_base_url.format(url_info["id"]),
|
||||
access_token
|
||||
)
|
||||
if not album_data:
|
||||
return {"error": "Failed to get album data"}
|
||||
|
||||
album_data['_token'] = access_token
|
||||
raw_data = album_data
|
||||
total_tracks = album_data.get('total_tracks', 0)
|
||||
|
||||
if batch:
|
||||
tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50'
|
||||
tracks, num_batches = fetch_tracks_in_batches(tracks_url, access_token, 50, delay)
|
||||
raw_data['tracks']['items'] = tracks
|
||||
raw_data['_batch_count'] = num_batches
|
||||
raw_data['_batch_enabled'] = True
|
||||
|
||||
if len(tracks) < total_tracks:
|
||||
last_offset = len(tracks)
|
||||
remaining_tracks = []
|
||||
|
||||
while last_offset < total_tracks:
|
||||
print(f"Batch : {num_batches}")
|
||||
print(f"Offset : {last_offset}")
|
||||
print("-------------")
|
||||
|
||||
remainder_url = f'{album_base_url.format(url_info["id"])}/tracks?offset={last_offset}&limit=50'
|
||||
track_data = get_json_from_api(remainder_url, access_token)
|
||||
|
||||
if not track_data or not track_data.get('items'):
|
||||
break
|
||||
|
||||
items = track_data.get('items', [])
|
||||
remaining_tracks.extend(items)
|
||||
|
||||
if len(items) < 50:
|
||||
break
|
||||
|
||||
last_offset += len(items)
|
||||
num_batches += 1
|
||||
|
||||
if delay > 0:
|
||||
sleep(delay)
|
||||
|
||||
tracks.extend(remaining_tracks)
|
||||
raw_data['tracks']['items'] = tracks
|
||||
raw_data['_batch_count'] = num_batches
|
||||
else:
|
||||
tracks = []
|
||||
tracks_url = f'{album_base_url.format(url_info["id"])}/tracks?limit=50'
|
||||
while tracks_url:
|
||||
track_data = get_json_from_api(tracks_url, access_token)
|
||||
if not track_data:
|
||||
break
|
||||
|
||||
tracks.extend(track_data['items'])
|
||||
tracks_url = track_data.get('next')
|
||||
if tracks_url and "&locale=" in tracks_url:
|
||||
tracks_url = tracks_url.split("&locale=")[0]
|
||||
|
||||
raw_data['tracks']['items'] = tracks
|
||||
raw_data['_batch_enabled'] = False
|
||||
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to get album data: {str(e)}"}
|
||||
|
||||
elif url_info["type"] == "track":
|
||||
try:
|
||||
track_data = get_json_from_api(
|
||||
track_base_url.format(url_info["id"]),
|
||||
access_token
|
||||
)
|
||||
if not track_data:
|
||||
return {"error": "Failed to get track data"}
|
||||
|
||||
raw_data = track_data
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to get track data: {str(e)}"}
|
||||
|
||||
return raw_data
|
||||
|
||||
def format_track_data(track_data):
|
||||
artists = []
|
||||
for artist in track_data.get('artists', []):
|
||||
artists.append(artist['name'])
|
||||
|
||||
image_url = track_data.get('album', {}).get('images', [{}])[0].get('url', '') if track_data.get('album', {}).get('images') else ''
|
||||
|
||||
isrc = track_data.get('external_ids', {}).get('isrc', '')
|
||||
|
||||
return {
|
||||
"track": {
|
||||
"artists": ", ".join(artists),
|
||||
"name": track_data.get('name', ''),
|
||||
"album_name": track_data.get('album', {}).get('name', ''),
|
||||
"duration_ms": track_data.get('duration_ms', 0),
|
||||
"images": image_url,
|
||||
"release_date": track_data.get('album', {}).get('release_date', ''),
|
||||
"track_number": track_data.get('track_number', 0),
|
||||
"external_urls": track_data.get('external_urls', {}).get('spotify', ''),
|
||||
"isrc": isrc
|
||||
}
|
||||
}
|
||||
|
||||
def format_album_data(album_data):
|
||||
artists = []
|
||||
for artist in album_data.get('artists', []):
|
||||
artists.append(artist['name'])
|
||||
|
||||
image_url = album_data.get('images', [{}])[0].get('url', '') if album_data.get('images') else ''
|
||||
|
||||
track_list = []
|
||||
for track in album_data.get('tracks', {}).get('items', []):
|
||||
track_artists = []
|
||||
for artist in track.get('artists', []):
|
||||
track_artists.append(artist['name'])
|
||||
|
||||
track_id = track.get('id', '')
|
||||
track_isrc = ''
|
||||
|
||||
if track_id and album_data.get('_token'):
|
||||
try:
|
||||
full_track_data = get_json_from_api(
|
||||
track_base_url.format(track_id),
|
||||
album_data.get('_token')
|
||||
)
|
||||
if full_track_data:
|
||||
track_isrc = full_track_data.get('external_ids', {}).get('isrc', '')
|
||||
except:
|
||||
pass
|
||||
|
||||
track_list.append({
|
||||
"artists": ", ".join(track_artists),
|
||||
"name": track.get('name', ''),
|
||||
"album_name": album_data.get('name', ''),
|
||||
"duration_ms": track.get('duration_ms', 0),
|
||||
"images": image_url,
|
||||
"release_date": album_data.get('release_date', ''),
|
||||
"track_number": track.get('track_number', 0),
|
||||
"external_urls": track.get('external_urls', {}).get('spotify', ''),
|
||||
"isrc": track_isrc
|
||||
})
|
||||
|
||||
album_info = {
|
||||
"total_tracks": album_data.get('total_tracks', 0),
|
||||
"name": album_data.get('name', ''),
|
||||
"release_date": album_data.get('release_date', ''),
|
||||
"artists": ", ".join(artists),
|
||||
"images": image_url
|
||||
}
|
||||
|
||||
if album_data.get('_batch_enabled', False):
|
||||
album_info["batch"] = f"{album_data.get('_batch_count', 1)}"
|
||||
|
||||
return {
|
||||
"album_info": album_info,
|
||||
"track_list": track_list
|
||||
}
|
||||
|
||||
def format_playlist_data(playlist_data):
|
||||
image_url = playlist_data.get('images', [{}])[0].get('url', '') if playlist_data.get('images') else ''
|
||||
|
||||
track_list = []
|
||||
for item in playlist_data.get('tracks', {}).get('items', []):
|
||||
track = item.get('track', {})
|
||||
if not track:
|
||||
continue
|
||||
|
||||
artists = []
|
||||
for artist in track.get('artists', []):
|
||||
artists.append(artist['name'])
|
||||
|
||||
track_image = ''
|
||||
if track.get('album', {}).get('images'):
|
||||
track_image = track.get('album', {}).get('images', [{}])[0].get('url', '')
|
||||
|
||||
track_isrc = track.get('external_ids', {}).get('isrc', '')
|
||||
|
||||
track_list.append({
|
||||
"artists": ", ".join(artists),
|
||||
"name": track.get('name', ''),
|
||||
"album_name": track.get('album', {}).get('name', ''),
|
||||
"duration_ms": track.get('duration_ms', 0),
|
||||
"images": track_image,
|
||||
"release_date": track.get('album', {}).get('release_date', ''),
|
||||
"track_number": track.get('track_number', 0),
|
||||
"external_urls": track.get('external_urls', {}).get('spotify', ''),
|
||||
"isrc": track_isrc
|
||||
})
|
||||
|
||||
playlist_info = {
|
||||
"tracks": {"total": playlist_data.get('tracks', {}).get('total', 0)},
|
||||
"followers": {"total": playlist_data.get('followers', {}).get('total', 0)},
|
||||
"owner": {
|
||||
"display_name": playlist_data.get('owner', {}).get('display_name', ''),
|
||||
"name": playlist_data.get('name', ''),
|
||||
"images": image_url
|
||||
}
|
||||
}
|
||||
|
||||
if playlist_data.get('_batch_enabled', False):
|
||||
playlist_info["batch"] = f"{playlist_data.get('_batch_count', 1)}"
|
||||
|
||||
return {
|
||||
"playlist_info": playlist_info,
|
||||
"track_list": track_list
|
||||
}
|
||||
|
||||
def process_spotify_data(raw_data, data_type):
|
||||
if not raw_data or "error" in raw_data:
|
||||
return {"error": "Invalid data provided"}
|
||||
|
||||
try:
|
||||
if data_type == "track":
|
||||
return format_track_data(raw_data)
|
||||
elif data_type == "album":
|
||||
return format_album_data(raw_data)
|
||||
elif data_type == "playlist":
|
||||
return format_playlist_data(raw_data)
|
||||
else:
|
||||
return {"error": "Invalid data type"}
|
||||
except Exception as e:
|
||||
return {"error": f"Error processing data: {str(e)}"}
|
||||
|
||||
def get_filtered_data(spotify_url, batch=False, delay=1.0):
|
||||
raw_data = get_raw_spotify_data(spotify_url, batch=batch, delay=delay)
|
||||
if raw_data and "error" not in raw_data:
|
||||
url_info = parse_uri(spotify_url)
|
||||
filtered_data = process_spotify_data(raw_data, url_info['type'])
|
||||
return filtered_data
|
||||
return {"error": "Failed to get raw data"}
|
||||
|
||||
if __name__ == '__main__':
|
||||
playlist = "https://open.spotify.com/playlist/37i9dQZEVXbNG2KDcFcKOF"
|
||||
album = "https://open.spotify.com/album/6J84szYCnMfzEcvIcfWMFL"
|
||||
song = "https://open.spotify.com/track/7so0lgd0zP2Sbgs2d7a1SZ"
|
||||
|
||||
filtered_playlist = get_filtered_data(playlist, batch=True, delay=0.1)
|
||||
print(json.dumps(filtered_playlist, indent=2))
|
||||
|
||||
filtered_album = get_filtered_data(album)
|
||||
print(json.dumps(filtered_album, indent=2))
|
||||
|
||||
filtered_track = get_filtered_data(song)
|
||||
print(json.dumps(filtered_track, indent=2))
|
||||
-155
@@ -1,155 +0,0 @@
|
||||
import requests
|
||||
import time
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
class TrackDownloader:
|
||||
def __init__(self, use_fallback=False):
|
||||
self.client = requests.Session()
|
||||
self.headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||||
}
|
||||
self.progress_callback = None
|
||||
self.filename_format = 'title_artist'
|
||||
self.use_fallback = use_fallback
|
||||
self.base_domain = "lucida.su" if use_fallback else "lucida.to"
|
||||
self.api_base = "https://apislucida.vercel.app"
|
||||
|
||||
def set_progress_callback(self, callback):
|
||||
self.progress_callback = callback
|
||||
|
||||
def set_filename_format(self, format_type):
|
||||
self.filename_format = format_type
|
||||
|
||||
def generate_filename(self, metadata):
|
||||
if self.filename_format == 'artist_title':
|
||||
filename = f"{metadata['artists']} - {metadata['title']}.flac"
|
||||
else:
|
||||
filename = f"{metadata['title']} - {metadata['artists']}.flac"
|
||||
return self.sanitize_filename(filename)
|
||||
|
||||
async def get_track_info(self, track_id, service="amazon", use_fallback=None):
|
||||
if use_fallback is None:
|
||||
use_fallback = self.use_fallback
|
||||
|
||||
fallback = "su" if use_fallback else "to"
|
||||
api_url = f"{self.api_base}/{fallback}/{track_id}/{service}"
|
||||
|
||||
try:
|
||||
response = requests.get(api_url)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Failed to get track info: {str(e)}")
|
||||
|
||||
def sanitize_filename(self, filename):
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
for char in invalid_chars:
|
||||
filename = filename.replace(char, '')
|
||||
|
||||
filename = ' '.join(filename.split())
|
||||
filename = filename.replace(' ,', ',')
|
||||
filename = filename.replace(',', ', ')
|
||||
while ' ' in filename:
|
||||
filename = filename.replace(' ', ' ')
|
||||
filename = filename.rsplit('.', 1)
|
||||
filename[0] = filename[0].strip()
|
||||
return '.'.join(filename)
|
||||
|
||||
def download(self, metadata, output_dir):
|
||||
track_url = metadata['url']
|
||||
primary_token = metadata['token']['primary']
|
||||
expiry = metadata['token']['expiry']
|
||||
|
||||
print(f"Starting download for: {track_url}")
|
||||
|
||||
initial_request = {
|
||||
"account": {"id": "auto", "type": "country"},
|
||||
"compat": "false",
|
||||
"downscale": "original",
|
||||
"handoff": True,
|
||||
"metadata": True,
|
||||
"private": True,
|
||||
"token": {
|
||||
"expiry": expiry,
|
||||
"primary": primary_token
|
||||
},
|
||||
"upload": {"enabled": False, "service": "pixeldrain"},
|
||||
"url": track_url
|
||||
}
|
||||
|
||||
response = self.client.post(f"https://{self.base_domain}/api/load?url=/api/fetch/stream/v2",
|
||||
json=initial_request,
|
||||
headers=self.headers)
|
||||
|
||||
csrf_token = response.cookies.get('csrf_token')
|
||||
if csrf_token:
|
||||
self.headers['X-CSRF-Token'] = csrf_token
|
||||
|
||||
initial_response = response.json()
|
||||
|
||||
if not initial_response.get("success", False):
|
||||
raise Exception(f"Initial request failed: {initial_response.get('error', 'Unknown error')}")
|
||||
|
||||
handoff = initial_response["handoff"]
|
||||
server = initial_response["server"]
|
||||
|
||||
file_name = self.generate_filename(metadata)
|
||||
|
||||
completion_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}"
|
||||
|
||||
print("Waiting for track processing to complete")
|
||||
while True:
|
||||
completion_response = self.client.get(completion_url, headers=self.headers).json()
|
||||
if completion_response["status"] == "completed":
|
||||
break
|
||||
elif completion_response["status"] == "error":
|
||||
raise Exception(f"API request failed: {completion_response.get('message', 'Unknown error')}")
|
||||
time.sleep(1)
|
||||
|
||||
download_url = f"https://{server}.{self.base_domain}/api/fetch/request/{handoff}/download"
|
||||
print(f"Starting download of: {file_name}")
|
||||
|
||||
response = self.client.get(download_url, stream=True, headers=self.headers)
|
||||
total_size = int(response.headers.get('content-length', 0))
|
||||
downloaded_size = 0
|
||||
|
||||
file_path = os.path.join(output_dir, file_name)
|
||||
|
||||
try:
|
||||
with open(file_path, 'wb') as file:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
file.write(chunk)
|
||||
downloaded_size += len(chunk)
|
||||
if self.progress_callback:
|
||||
self.progress_callback(downloaded_size, total_size)
|
||||
|
||||
if downloaded_size == 0:
|
||||
raise Exception("No data received from server")
|
||||
|
||||
return file_path
|
||||
|
||||
except Exception as e:
|
||||
if os.path.exists(file_path) and os.path.getsize(file_path) == 0:
|
||||
try:
|
||||
os.remove(file_path)
|
||||
except:
|
||||
pass
|
||||
raise e
|
||||
|
||||
async def main():
|
||||
downloader = TrackDownloader()
|
||||
output_dir = "."
|
||||
track_id = "2plbrEY59IikOBgBGLjaoe"
|
||||
service = "amazon"
|
||||
|
||||
try:
|
||||
metadata = await downloader.get_track_info(track_id, service)
|
||||
downloaded_file = downloader.download(metadata, output_dir)
|
||||
print(f"File downloaded successfully: {downloaded_file}")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,48 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<!-- Generator: Adobe Illustrator 28.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 0) -->
|
||||
<svg version="1.1" id="Layer_1" xmlns:sketch="http://www.bohemiancoding.com/sketch/ns"
|
||||
xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px" viewBox="0 0 512 512"
|
||||
style="enable-background:new 0 0 512 512;" xml:space="preserve">
|
||||
<style type="text/css">
|
||||
.st0{fill-rule:evenodd;clip-rule:evenodd;fill:#FF0000;stroke:#373435;stroke-width:0.5669;stroke-miterlimit:10;}
|
||||
.st1{fill-rule:evenodd;clip-rule:evenodd;fill:#FFFF00;stroke:#373435;stroke-width:0.5669;stroke-miterlimit:10;}
|
||||
.st2{fill-rule:evenodd;clip-rule:evenodd;fill:#2AA125;stroke:#373435;stroke-width:0.5669;stroke-miterlimit:10;}
|
||||
</style>
|
||||
<g id="SVGRepo_bgCarrier">
|
||||
</g>
|
||||
<g id="SVGRepo_tracerCarrier">
|
||||
</g>
|
||||
<g id="Layer_x0020_1">
|
||||
<g id="_1818452274576">
|
||||
<g id="SVGRepo_bgCarrier_00000044893734704698182460000014884511992085247122_">
|
||||
</g>
|
||||
<g id="SVGRepo_tracerCarrier_00000067939915892718314930000001743019108017086612_">
|
||||
</g>
|
||||
<g id="SVGRepo_iconCarrier_00000176000695080737548300000008661679408724292005_">
|
||||
<path d="M407.1,227.2c-54.7-27.6-119.3-43.8-187.6-43.8c-38.9,0-76.5,5.2-112.3,15l3-0.7c-2.1,0.7-4.5,1.1-7,1.1
|
||||
c-13,0-23.5-10.5-23.5-23.5c0-10.5,6.8-19.3,16.3-22.4l0.2-0.1c90.9-26.9,240.6-21.8,335.4,34.6c7.3,4.4,12.1,12.3,12.1,21.3
|
||||
c0,4.4-1.2,8.6-3.2,12.2l0.1-0.1c-5,5.9-12.3,9.6-20.6,9.6c-4.7,0-9-1.2-12.8-3.3L407.1,227.2L407.1,227.2z M404.5,298.9
|
||||
c-3.4,5.7-9.6,9.4-16.6,9.4c-3.8,0-7.4-1.1-10.4-3l0.1,0.1c-46.8-26.8-102.8-42.5-162.5-42.5c-32.9,0-64.6,4.8-94.6,13.7l2.3-0.6
|
||||
c-1.7,0.5-3.7,0.9-5.8,0.9c-10.7,0-19.4-8.7-19.4-19.4c0-8.7,5.7-16,13.5-18.5l0.1,0c30.8-9.1,66.2-14.4,102.8-14.4
|
||||
c68.1,0,132,18.2,187,49.9l-1.8-1c5.1,3.3,8.4,8.9,8.4,15.3C407.7,292.5,406.5,296,404.5,298.9L404.5,298.9L404.5,298.9
|
||||
L404.5,298.9z M373.8,369.3c-2.7,4.6-7.6,7.7-13.3,7.7c-3.2,0-6.1-1-8.6-2.6l0.1,0c-40.9-23-89.7-36.5-141.7-36.5
|
||||
c-29.8,0-58.6,4.5-85.7,12.7l2.1-0.5c-1.1,0.3-2.5,0.5-3.8,0.5c-8.7,0-15.8-7.1-15.8-15.8c0-7.4,5.1-13.6,11.9-15.3l0.1,0
|
||||
c27-8,58-12.6,90.1-12.6c58.1,0,112.6,15.1,159.9,41.7l-1.7-0.9c5.2,2.6,8.6,7.8,8.6,13.8C376,364.3,375.2,367.1,373.8,369.3
|
||||
L373.8,369.3L373.8,369.3L373.8,369.3z M256-0.6L256-0.6C114.6-0.6,0,114,0,255.4s114.6,256,256,256s256-114.6,256-256l0,0
|
||||
C511.6,114.2,397.2-0.2,256-0.6L256-0.6L256-0.6L256-0.6z"/>
|
||||
</g>
|
||||
</g>
|
||||
<path class="st0" d="M406.9,227.2c0,0,0.1,0,0.1,0.1L406.9,227.2z M107.1,198.5c35.8-9.8,73.4-15,112.3-15
|
||||
c68.3,0,132.8,16.1,187.5,43.7c3.8,2.1,8.2,3.3,12.8,3.3c8.2,0,15.6-3.7,20.5-9.6c0,0,0,0,0,0c2-3.6,3.1-7.7,3.1-12
|
||||
c0-9-4.8-16.8-12.1-21.3c-94.7-56.4-244.5-61.5-335.4-34.6l-0.2,0.1c-9.4,3.1-16.3,11.9-16.3,22.4c0,13,10.5,23.5,23.5,23.5
|
||||
c2.5,0,4.9-0.4,7-1.1L107.1,198.5L107.1,198.5z"/>
|
||||
<path class="st1" d="M401.2,274.3c-55-31.8-118.9-49.9-187-49.9c-36.6,0-72,5.3-102.8,14.4l-0.1,0c-7.9,2.5-13.5,9.9-13.5,18.5
|
||||
c0,10.7,8.7,19.4,19.4,19.4c1.3,0,2.5-0.1,3.6-0.3c29.9-8.8,61.5-13.6,94.3-13.6c59.7,0,115.7,15.7,162.4,42.5l0.1,0.1
|
||||
c3,1.9,6.5,3,10.3,3c7,0,13.1-3.7,16.6-9.4l0,0c2-2.9,3.2-6.5,3.2-10.3c0-6.4-3.3-12-8.4-15.3L401.2,274.3L401.2,274.3z"/>
|
||||
<path class="st2" d="M352,374.4C352,374.4,352,374.4,352,374.4L352,374.4z M373.8,369.3C373.8,369.3,373.7,369.4,373.8,369.3
|
||||
L373.8,369.3L373.8,369.3z M367.8,347.8l-0.4-0.2C367.5,347.6,367.6,347.7,367.8,347.8z M369,348.4
|
||||
c-47.3-26.5-101.8-41.7-159.9-41.7c-32.1,0-63.1,4.6-90.1,12.6l-0.1,0c-6.8,1.8-11.9,8-11.9,15.3c0,8.7,7.1,15.8,15.8,15.8
|
||||
c1,0,1.9-0.1,2.8-0.3c26.8-8.1,55.2-12.4,84.6-12.4c52,0,100.8,13.5,141.7,36.5l0,0c2.4,1.6,5.4,2.6,8.5,2.6
|
||||
c5.7,0,10.6-3.1,13.3-7.7h0c1.4-2.3,2.2-5,2.2-7.9c0-5.9-3.3-11-8.2-13.6L369,348.4L369,348.4z"/>
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 3.7 KiB |
+251
@@ -0,0 +1,251 @@
|
||||
import requests
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime
|
||||
from mutagen.flac import FLAC, Picture
|
||||
from mutagen.id3 import PictureType
|
||||
|
||||
class ProgressCallback:
|
||||
def __call__(self, current, total):
|
||||
if total > 0:
|
||||
percent = (current / total) * 100
|
||||
print(f"\r{percent:.2f}% ({current}/{total})", end="")
|
||||
else:
|
||||
print(f"\r{current / (1024 * 1024):.2f} MB", end="")
|
||||
|
||||
class QobuzDownloader:
|
||||
def __init__(self, region="us", timeout=30):
|
||||
if region not in ["eu", "us"]:
|
||||
raise ValueError("Region must be either 'us' or 'eu'")
|
||||
|
||||
self.region = region
|
||||
self.timeout = timeout
|
||||
self.session = requests.Session()
|
||||
self.headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||||
}
|
||||
self.base_api_url = f"https://{region}.qobuz.squid.wtf/api"
|
||||
self.download_chunk_size = 256 * 1024
|
||||
self.progress_callback = ProgressCallback()
|
||||
|
||||
def set_progress_callback(self, callback):
|
||||
self.progress_callback = callback
|
||||
|
||||
def sanitize_filename(self, filename):
|
||||
if not filename:
|
||||
return "Unknown Track"
|
||||
sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename))
|
||||
return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track"
|
||||
|
||||
def get_track_info(self, isrc):
|
||||
print(f"Fetching: {isrc}")
|
||||
search_url = f"{self.base_api_url}/get-music"
|
||||
params = {'q': isrc, 'offset': 0, 'limit': 10}
|
||||
|
||||
try:
|
||||
response = self.session.get(search_url, params=params, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
selected_track = None
|
||||
if data and data.get("success"):
|
||||
items = data.get("data", {}).get("tracks", {}).get("items", [])
|
||||
priority = {24: 1, 16: 2}
|
||||
for track in items:
|
||||
if track.get("isrc") == isrc:
|
||||
current_prio = priority.get(track.get("maximum_bit_depth"), 3)
|
||||
if selected_track is None or current_prio < priority.get(selected_track.get("maximum_bit_depth"), 3):
|
||||
selected_track = track
|
||||
if current_prio == 1:
|
||||
break
|
||||
|
||||
if not selected_track:
|
||||
raise Exception(f"Track not found: {isrc}")
|
||||
|
||||
title = selected_track.get('title', 'Unknown')
|
||||
bit_depth = selected_track.get('maximum_bit_depth', 'Unknown')
|
||||
print(f"Found: {title} ({bit_depth}b)")
|
||||
return selected_track
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Request error: {e}")
|
||||
except Exception as e:
|
||||
raise Exception(f"Error: {e}")
|
||||
|
||||
def get_download_url(self, track_id):
|
||||
print("Fetching URL...")
|
||||
download_api_url = f"{self.base_api_url}/download-music"
|
||||
params = {'track_id': track_id, 'quality': 27}
|
||||
|
||||
try:
|
||||
response = self.session.get(download_api_url, params=params, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data and data.get("success") and data.get("data", {}).get("url"):
|
||||
download_url = data["data"]["url"]
|
||||
print("URL found")
|
||||
return download_url
|
||||
else:
|
||||
error_msg = data.get('error', {}).get('message', 'Unknown API error')
|
||||
raise Exception(f"API error: {error_msg}")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Request error: {e}")
|
||||
except Exception as e:
|
||||
raise Exception(f"Error: {e}")
|
||||
|
||||
def download(self, isrc, output_dir=".", is_paused_callback=None, is_stopped_callback=None):
|
||||
if output_dir != ".":
|
||||
try:
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
except OSError as e:
|
||||
raise Exception(f"Directory error: {e}")
|
||||
|
||||
track_info = self.get_track_info(isrc)
|
||||
track_id = track_info.get("id")
|
||||
|
||||
if not track_id:
|
||||
raise Exception("No track ID found")
|
||||
|
||||
artist_name = self.sanitize_filename(track_info.get('performer', {}).get('name'))
|
||||
track_title = self.sanitize_filename(track_info.get('title'))
|
||||
output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac")
|
||||
|
||||
if os.path.exists(output_filename):
|
||||
file_size = os.path.getsize(output_filename)
|
||||
if file_size > 0:
|
||||
print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)")
|
||||
return output_filename
|
||||
|
||||
download_url = self.get_download_url(track_id)
|
||||
temp_filename = output_filename + ".part"
|
||||
|
||||
print(f"Downloading...")
|
||||
try:
|
||||
response = self.session.get(download_url, timeout=900)
|
||||
response.raise_for_status()
|
||||
|
||||
if is_stopped_callback and is_stopped_callback():
|
||||
raise Exception("Download stopped")
|
||||
|
||||
while is_paused_callback and is_paused_callback():
|
||||
time.sleep(0.1)
|
||||
if is_stopped_callback and is_stopped_callback():
|
||||
raise Exception("Download stopped")
|
||||
|
||||
with open(temp_filename, 'wb') as f:
|
||||
f.write(response.content)
|
||||
|
||||
downloaded_size = len(response.content)
|
||||
total_size = downloaded_size
|
||||
|
||||
if self.progress_callback:
|
||||
self.progress_callback(downloaded_size, total_size)
|
||||
|
||||
os.rename(temp_filename, output_filename)
|
||||
print("Download complete")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
if os.path.exists(temp_filename):
|
||||
os.remove(temp_filename)
|
||||
raise Exception(f"Download failed: {e}")
|
||||
except Exception as e:
|
||||
if os.path.exists(temp_filename):
|
||||
os.remove(temp_filename)
|
||||
raise Exception(f"File error: {e}")
|
||||
|
||||
print("Adding metadata...")
|
||||
try:
|
||||
self._embed_metadata(output_filename, track_info)
|
||||
print("Metadata saved")
|
||||
except Exception as e:
|
||||
print(f"Tagging failed: {e}")
|
||||
|
||||
print(f"Done")
|
||||
return output_filename
|
||||
|
||||
def _embed_metadata(self, filename, track_info):
|
||||
try:
|
||||
audio = FLAC(filename)
|
||||
audio.delete()
|
||||
audio.clear_pictures()
|
||||
|
||||
album_info = track_info.get('album', {})
|
||||
artist = track_info.get('performer', {}).get('name')
|
||||
|
||||
if track_info.get('title'):
|
||||
audio['TITLE'] = track_info['title']
|
||||
if artist:
|
||||
audio['ARTIST'] = artist
|
||||
if album_info.get('title'):
|
||||
audio['ALBUM'] = album_info['title']
|
||||
if album_info.get('artist', {}).get('name', artist):
|
||||
audio['ALBUMARTIST'] = album_info.get('artist', {}).get('name', artist)
|
||||
if track_info.get('track_number'):
|
||||
audio['TRACKNUMBER'] = str(track_info['track_number'])
|
||||
if track_info.get('release_date_original'):
|
||||
audio['DATE'] = track_info['release_date_original']
|
||||
try:
|
||||
audio['YEAR'] = str(datetime.strptime(track_info['release_date_original'], '%Y-%m-%d').year)
|
||||
except ValueError:
|
||||
pass
|
||||
if album_info.get('genre', {}).get('name'):
|
||||
audio['GENRE'] = album_info['genre']['name']
|
||||
if track_info.get('copyright'):
|
||||
audio['COPYRIGHT'] = track_info['copyright']
|
||||
if track_info.get('isrc'):
|
||||
audio['ISRC'] = track_info['isrc']
|
||||
if album_info.get('label', {}).get('name'):
|
||||
audio['ORGANIZATION'] = album_info['label']['name']
|
||||
|
||||
img_info = album_info.get('image', {})
|
||||
cover_url = img_info.get('large') or img_info.get('small') or img_info.get('thumbnail')
|
||||
if cover_url:
|
||||
try:
|
||||
img_response = self.session.get(cover_url, timeout=30)
|
||||
img_response.raise_for_status()
|
||||
mime_type = img_response.headers.get('Content-Type', 'image/jpeg').lower()
|
||||
if mime_type in ['image/jpeg', 'image/png']:
|
||||
picture = Picture()
|
||||
picture.data = img_response.content
|
||||
picture.type = PictureType.COVER_FRONT
|
||||
picture.mime = mime_type
|
||||
audio.add_picture(picture)
|
||||
print("Cover added")
|
||||
except Exception as e:
|
||||
print(f"Cover error: {str(e)}")
|
||||
|
||||
audio.save()
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Metadata error: {e}")
|
||||
|
||||
def main():
|
||||
print("=== QobuzDL - Qobuz Downloader ===")
|
||||
downloader = QobuzDownloader(region="us")
|
||||
|
||||
isrc = "USAT22409172"
|
||||
output_dir = "."
|
||||
|
||||
try:
|
||||
downloaded_file = downloader.download(isrc, output_dir)
|
||||
print(f"Success: File saved as {downloaded_file}")
|
||||
except Exception as e:
|
||||
print(f"Error: {str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
import sys
|
||||
if sys.platform == "win32":
|
||||
import os
|
||||
os.system("chcp 65001 > nul")
|
||||
try:
|
||||
sys.stdout.reconfigure(encoding='utf-8')
|
||||
except:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
main()
|
||||
+401
@@ -0,0 +1,401 @@
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import requests
|
||||
from mutagen.flac import FLAC, Picture
|
||||
from mutagen.id3 import PictureType
|
||||
|
||||
class ProgressCallback:
|
||||
def __call__(self, current, total):
|
||||
if total > 0:
|
||||
percent = (current / total) * 100
|
||||
print(f"\r{percent:.2f}% ({current}/{total})", end="")
|
||||
else:
|
||||
print(f"\r{current / (1024 * 1024):.2f} MB", end="")
|
||||
|
||||
class TidalDownloader:
|
||||
def __init__(self, timeout=30, max_retries=3):
|
||||
self.timeout = timeout
|
||||
self.max_retries = max_retries
|
||||
self.download_chunk_size = 256 * 1024
|
||||
self.progress_callback = ProgressCallback()
|
||||
self.client_id = "zU4XHVVkc2tDPo4t"
|
||||
self.client_secret = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4="
|
||||
|
||||
def set_progress_callback(self, callback):
|
||||
self.progress_callback = callback
|
||||
|
||||
|
||||
|
||||
def sanitize_filename(self, filename):
|
||||
if not filename:
|
||||
return "Unknown Track"
|
||||
sanitized = re.sub(r'[\\/*?:"<>|]', "", str(filename))
|
||||
return re.sub(r'\s+', ' ', sanitized).strip() or "Unnamed Track"
|
||||
|
||||
def get_access_token(self):
|
||||
refresh_url = "https://auth.tidal.com/v1/oauth2/token"
|
||||
|
||||
payload = {
|
||||
"client_id": self.client_id,
|
||||
"grant_type": "client_credentials",
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
url=refresh_url,
|
||||
data=payload,
|
||||
auth=(self.client_id, self.client_secret),
|
||||
timeout=self.timeout
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
token_data = response.json()
|
||||
return token_data.get("access_token")
|
||||
else:
|
||||
return None
|
||||
|
||||
except:
|
||||
return None
|
||||
|
||||
def search_tracks(self, query):
|
||||
try:
|
||||
tidal_token = self.get_access_token()
|
||||
if not tidal_token:
|
||||
raise Exception("Failed to get access token")
|
||||
|
||||
search_url = f"https://api.tidal.com/v1/search/tracks?query={query}&limit=25&offset=0&countryCode=US"
|
||||
header = {"authorization": f"Bearer {tidal_token}"}
|
||||
|
||||
search_data = requests.get(url=search_url, headers=header, timeout=self.timeout)
|
||||
response_data = search_data.json()
|
||||
|
||||
filtered_items = [{
|
||||
"id": item.get("id"),
|
||||
"title": item.get("title"),
|
||||
"url": item.get("url"),
|
||||
"isrc": item.get("isrc"),
|
||||
"audioQuality": item.get("audioQuality"),
|
||||
"mediaMetadata": item.get("mediaMetadata"),
|
||||
"album": item.get("album", {}),
|
||||
"artists": item.get("artists", []),
|
||||
"artist": item.get("artist", {}),
|
||||
"trackNumber": item.get("trackNumber"),
|
||||
"volumeNumber": item.get("volumeNumber"),
|
||||
"duration": item.get("duration"),
|
||||
"copyright": item.get("copyright"),
|
||||
"explicit": item.get("explicit")
|
||||
} for item in response_data.get("items", [])]
|
||||
|
||||
return {
|
||||
"limit": response_data.get("limit"),
|
||||
"offset": response_data.get("offset"),
|
||||
"totalNumberOfItems": response_data.get("totalNumberOfItems"),
|
||||
"items": filtered_items
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Search error: {str(e)}")
|
||||
|
||||
def get_track_info(self, query, isrc=None):
|
||||
print(f"Fetching: {query}" + (f" (ISRC: {isrc})" if isrc else ""))
|
||||
|
||||
try:
|
||||
result = self.search_tracks(query)
|
||||
|
||||
if not result or not result.get("items"):
|
||||
raise Exception(f"No tracks found for query: {query}")
|
||||
|
||||
selected_track = None
|
||||
if isrc:
|
||||
isrc_items = [item for item in result["items"] if item.get("isrc") == isrc]
|
||||
|
||||
if len(isrc_items) > 1:
|
||||
hires_items = []
|
||||
for item in isrc_items:
|
||||
media_metadata = item.get("mediaMetadata", {})
|
||||
tags = media_metadata.get("tags", []) if media_metadata else []
|
||||
if "HIRES_LOSSLESS" in tags:
|
||||
hires_items.append(item)
|
||||
|
||||
if hires_items:
|
||||
selected_track = hires_items[0]
|
||||
else:
|
||||
selected_track = isrc_items[0]
|
||||
elif len(isrc_items) == 1:
|
||||
selected_track = isrc_items[0]
|
||||
else:
|
||||
selected_track = result["items"][0]
|
||||
else:
|
||||
selected_track = result["items"][0]
|
||||
|
||||
if not selected_track:
|
||||
raise Exception(f"Track not found: {query}" + (f" (ISRC: {isrc})" if isrc else ""))
|
||||
|
||||
title = selected_track.get('title', 'Unknown')
|
||||
quality = selected_track.get('audioQuality', 'Unknown')
|
||||
print(f"Found: {title} ({quality})")
|
||||
return selected_track
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Error getting track info: {str(e)}")
|
||||
|
||||
def get_download_url(self, track_id, quality="LOSSLESS"):
|
||||
print("Fetching URL...")
|
||||
download_api_url = f"https://hifi.401658.xyz/track/?id={track_id}&quality={quality}"
|
||||
|
||||
try:
|
||||
response = requests.get(download_api_url, timeout=self.timeout)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
for item in data:
|
||||
if "OriginalTrackUrl" in item:
|
||||
print("URL found")
|
||||
return {
|
||||
"download_url": item["OriginalTrackUrl"],
|
||||
"track_info": data[0] if data else {}
|
||||
}
|
||||
|
||||
raise Exception("Download URL not found in response")
|
||||
else:
|
||||
raise Exception(f"API returned status code: {response.status_code}")
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Error getting download URL: {str(e)}")
|
||||
|
||||
def download_album_art(self, album_id, size="1280x1280"):
|
||||
try:
|
||||
art_url = f"https://resources.tidal.com/images/{album_id.replace('-', '/')}/{size}.jpg"
|
||||
|
||||
response = requests.get(art_url, timeout=self.timeout)
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.content
|
||||
else:
|
||||
print(f"Failed to download album art: HTTP {response.status_code}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error downloading album art: {str(e)}")
|
||||
return None
|
||||
|
||||
def download_file(self, url, filepath, is_paused_callback=None, is_stopped_callback=None):
|
||||
temp_filepath = filepath + ".part"
|
||||
retry_count = 0
|
||||
|
||||
while retry_count <= self.max_retries:
|
||||
try:
|
||||
response = requests.get(url, timeout=60.0)
|
||||
if response.status_code != 200:
|
||||
raise Exception(f"HTTP {response.status_code}")
|
||||
|
||||
if is_stopped_callback and is_stopped_callback():
|
||||
raise Exception("Download stopped")
|
||||
|
||||
while is_paused_callback and is_paused_callback():
|
||||
time.sleep(0.1)
|
||||
if is_stopped_callback and is_stopped_callback():
|
||||
raise Exception("Download stopped")
|
||||
|
||||
with open(temp_filepath, 'wb') as f:
|
||||
f.write(response.content)
|
||||
|
||||
downloaded_size = len(response.content)
|
||||
|
||||
if self.progress_callback:
|
||||
self.progress_callback(downloaded_size, downloaded_size)
|
||||
|
||||
os.rename(temp_filepath, filepath)
|
||||
print("Download complete")
|
||||
return {"success": True, "size": downloaded_size}
|
||||
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
if retry_count > self.max_retries:
|
||||
if os.path.exists(temp_filepath):
|
||||
try:
|
||||
os.remove(temp_filepath)
|
||||
except:
|
||||
pass
|
||||
raise Exception(f"Download error after {self.max_retries} retries: {str(e)}")
|
||||
|
||||
print(f"Download error (attempt {retry_count}/{self.max_retries}): {str(e)}")
|
||||
print(f"Retrying in {retry_count * 2} seconds...")
|
||||
time.sleep(retry_count * 2)
|
||||
|
||||
def embed_metadata(self, filepath, track_info, search_info=None):
|
||||
try:
|
||||
print("Embedding metadata...")
|
||||
audio = FLAC(filepath)
|
||||
audio.clear()
|
||||
audio.clear_pictures()
|
||||
|
||||
if track_info.get("title"):
|
||||
audio["TITLE"] = track_info["title"]
|
||||
|
||||
artists_list = []
|
||||
if search_info and search_info.get("artists"):
|
||||
for artist in search_info["artists"]:
|
||||
if artist.get("name"):
|
||||
artists_list.append(artist["name"])
|
||||
elif search_info and search_info.get("artist") and search_info["artist"].get("name"):
|
||||
artists_list.append(search_info["artist"]["name"])
|
||||
elif track_info.get("artists"):
|
||||
for artist in track_info["artists"]:
|
||||
if artist.get("name"):
|
||||
artists_list.append(artist["name"])
|
||||
elif track_info.get("artist") and track_info["artist"].get("name"):
|
||||
artists_list.append(track_info["artist"]["name"])
|
||||
|
||||
if artists_list:
|
||||
audio["ARTIST"] = artists_list[0]
|
||||
if len(artists_list) > 1:
|
||||
audio["ALBUMARTIST"] = "; ".join(artists_list)
|
||||
else:
|
||||
audio["ALBUMARTIST"] = artists_list[0]
|
||||
|
||||
album_info = search_info.get("album", {}) if search_info else track_info.get("album", {})
|
||||
if album_info.get("title"):
|
||||
audio["ALBUM"] = album_info["title"]
|
||||
|
||||
if search_info and search_info.get("trackNumber"):
|
||||
audio["TRACKNUMBER"] = str(search_info["trackNumber"])
|
||||
elif track_info.get("trackNumber"):
|
||||
audio["TRACKNUMBER"] = str(track_info["trackNumber"])
|
||||
|
||||
if search_info and search_info.get("volumeNumber"):
|
||||
audio["DISCNUMBER"] = str(search_info["volumeNumber"])
|
||||
elif track_info.get("volumeNumber"):
|
||||
audio["DISCNUMBER"] = str(track_info["volumeNumber"])
|
||||
|
||||
duration = search_info.get("duration") if search_info else track_info.get("duration")
|
||||
if duration:
|
||||
audio["LENGTH"] = str(duration)
|
||||
|
||||
isrc = search_info.get("isrc") if search_info else track_info.get("isrc")
|
||||
if isrc:
|
||||
audio["ISRC"] = isrc
|
||||
|
||||
copyright_info = search_info.get("copyright") if search_info else track_info.get("copyright")
|
||||
if copyright_info:
|
||||
audio["COPYRIGHT"] = copyright_info
|
||||
|
||||
if album_info.get("releaseDate"):
|
||||
audio["DATE"] = album_info["releaseDate"][:4]
|
||||
try:
|
||||
audio["YEAR"] = album_info["releaseDate"][:4]
|
||||
except:
|
||||
pass
|
||||
|
||||
if track_info.get("genre"):
|
||||
audio["GENRE"] = track_info["genre"]
|
||||
|
||||
if track_info.get("audioQuality"):
|
||||
audio["COMMENT"] = f"Tidal {track_info['audioQuality']}"
|
||||
|
||||
if album_info.get("cover"):
|
||||
album_art = self.download_album_art(album_info["cover"])
|
||||
if album_art:
|
||||
picture = Picture()
|
||||
picture.data = album_art
|
||||
picture.type = PictureType.COVER_FRONT
|
||||
picture.mime = "image/jpeg"
|
||||
picture.desc = "Cover"
|
||||
audio.add_picture(picture)
|
||||
print("Album art embedded")
|
||||
|
||||
audio.save()
|
||||
print(f"Metadata embedded successfully for: {track_info.get('title', 'Unknown')}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error embedding metadata: {str(e)}")
|
||||
return False
|
||||
|
||||
def download(self, query, isrc=None, output_dir=".", quality="LOSSLESS", is_paused_callback=None, is_stopped_callback=None):
|
||||
if output_dir != ".":
|
||||
try:
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
except OSError as e:
|
||||
raise Exception(f"Directory error: {e}")
|
||||
|
||||
track_info = self.get_track_info(query, isrc)
|
||||
track_id = track_info.get("id")
|
||||
|
||||
if not track_id:
|
||||
raise Exception("No track ID found")
|
||||
|
||||
artists_list = []
|
||||
if track_info.get("artists"):
|
||||
for artist in track_info["artists"]:
|
||||
if artist.get("name"):
|
||||
artists_list.append(artist["name"])
|
||||
elif track_info.get("artist") and track_info["artist"].get("name"):
|
||||
artists_list.append(track_info["artist"]["name"])
|
||||
|
||||
artist_name = ", ".join(artists_list) if artists_list else "Unknown Artist"
|
||||
artist_name = self.sanitize_filename(artist_name)
|
||||
track_title = self.sanitize_filename(track_info.get("title", f"track_{track_id}"))
|
||||
|
||||
output_filename = os.path.join(output_dir, f"{artist_name} - {track_title}.flac")
|
||||
|
||||
if os.path.exists(output_filename):
|
||||
file_size = os.path.getsize(output_filename)
|
||||
if file_size > 0:
|
||||
print(f"File already exists: {output_filename} ({file_size / (1024 * 1024):.2f} MB)")
|
||||
return output_filename
|
||||
|
||||
download_info = self.get_download_url(track_id, quality)
|
||||
download_url = download_info["download_url"]
|
||||
download_track_info = download_info["track_info"]
|
||||
|
||||
print(f"Downloading to: {output_filename}")
|
||||
self.download_file(
|
||||
download_url,
|
||||
output_filename,
|
||||
is_paused_callback=is_paused_callback,
|
||||
is_stopped_callback=is_stopped_callback
|
||||
)
|
||||
|
||||
print("Adding metadata...")
|
||||
try:
|
||||
self.embed_metadata(output_filename, download_track_info, track_info)
|
||||
print("Metadata saved")
|
||||
except Exception as e:
|
||||
print(f"Tagging failed: {e}")
|
||||
|
||||
print("Done")
|
||||
return output_filename
|
||||
|
||||
def main():
|
||||
print("=== TidalDL - Tidal Downloader ===")
|
||||
downloader = TidalDownloader(timeout=30, max_retries=3)
|
||||
|
||||
query = "APT."
|
||||
isrc = "USAT22409172"
|
||||
output_dir = "."
|
||||
|
||||
try:
|
||||
downloaded_file = downloader.download(query, isrc, output_dir)
|
||||
print(f"Success: File saved as {downloaded_file}")
|
||||
except Exception as e:
|
||||
print(f"Error: {str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
import sys
|
||||
if sys.platform == "win32":
|
||||
import os
|
||||
os.system("chcp 65001 > nul")
|
||||
try:
|
||||
sys.stdout.reconfigure(encoding='utf-8')
|
||||
except:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
main()
|
||||
@@ -0,0 +1,9 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" id="flag-icons-us" viewBox="0 0 640 480">
|
||||
<path fill="#bd3d44" d="M0 0h640v480H0"/>
|
||||
<path stroke="#fff" stroke-width="37" d="M0 55.3h640M0 129h640M0 203h640M0 277h640M0 351h640M0 425h640"/>
|
||||
<path fill="#192f5d" d="M0 0h364.8v258.5H0"/>
|
||||
<marker id="us-a" markerHeight="30" markerWidth="30">
|
||||
<path fill="#fff" d="m14 0 9 27L0 10h28L5 27z"/>
|
||||
</marker>
|
||||
<path fill="none" marker-mid="url(#us-a)" d="m0 0 16 11h61 61 61 61 60L47 37h61 61 60 61L16 63h61 61 61 61 60L47 89h61 61 60 61L16 115h61 61 61 61 60L47 141h61 61 60 61L16 166h61 61 61 61 60L47 192h61 61 60 61L16 218h61 61 61 61 60z"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 648 B |
+1
-1
@@ -1,3 +1,3 @@
|
||||
{
|
||||
"version": "1.8"
|
||||
"version": "4.1"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user