Files
spotdl-py/app.py

446 lines
14 KiB
Python

import os
import re
import shutil
import subprocess
import sys
import threading
from datetime import datetime
from queue import Queue
from dotenv import load_dotenv
from flask import Flask, render_template, request, redirect, url_for, flash, Response, jsonify, abort
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from flask_sqlalchemy import SQLAlchemy
load_dotenv()
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'change-me-in-production')
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URI', 'sqlite:///users.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
db = SQLAlchemy(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class Download(db.Model):
id = db.Column(db.Integer, primary_key=True)
url = db.Column(db.String(512), nullable=False)
format_type = db.Column(db.String(10), nullable=False)
status = db.Column(db.String(20), default='pending')
progress = db.Column(db.Integer, default=0)
log = db.Column(db.Text, default='')
error = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
completed_at = db.Column(db.DateTime, nullable=True)
process_id = db.Column(db.Integer, nullable=True)
def append_log(self, message):
self.log += f"{datetime.now().strftime('%H:%M:%S')} {message}\n"
download_processes = {}
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
def count_users():
return User.query.count()
@app.cli.command("init-db")
def init_db():
db.create_all()
print("Database initialized.")
@app.cli.command("create-admin")
def create_admin():
if count_users() > 0:
print("Users already exist.")
return
username = input("Username: ")
password = input("Password: ")
user = User(username=username)
user.set_password(password)
db.session.add(user)
db.session.commit()
print(f"Admin user '{username}' created.")
def validate_spotify_url(url):
patterns = [
r'https?://open\.spotify\.com/[^/]+/track/[\w-]+',
r'https?://open\.spotify\.com/[^/]+/album/[\w-]+',
r'https?://open\.spotify\.com/[^/]+/playlist/[\w-]+',
r'https?://open\.spotify\.com/track/[\w-]+',
r'https?://open\.spotify\.com/album/[\w-]+',
r'https?://open\.spotify\.com/playlist/[\w-]+',
r'https?://spotify:[\w-]+',
]
return any(re.match(pattern, url) for pattern in patterns)
def run_download(download_id, urls, format_type, delete_old, copy_choice):
with app.app_context():
download = Download.query.get(download_id)
if not download:
return
download.status = 'running'
db.session.commit()
if format_type == 'flac':
download_dir = os.getenv('DOWNLOAD_DIR_FLAC', '/home/jules/Musique/flac')
else:
download_dir = os.getenv('DOWNLOAD_DIR_MP3', '/home/jules/Musique/mp3')
copy_destination = os.getenv('COPY_DESTINATION', '/mnt/data/Musique')
if delete_old and os.path.exists(download_dir):
shutil.rmtree(download_dir)
os.makedirs(download_dir, exist_ok=True)
total_urls = len(urls)
try:
for idx, url in enumerate(urls, 1):
download.append_log(f"[{idx}/{total_urls}] Téléchargement: {url}")
db.session.commit()
spotdl_command = [
'spotdl',
'--output', '{artist}/{album}/{track-number} - {title}.{output-ext}',
'--format', format_type,
url
]
process = subprocess.Popen(
['stdbuf', '-oL'] + spotdl_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
cwd=download_dir,
bufsize=1
)
download_processes[download_id] = process
download.process_id = process.pid
db.session.commit()
for line in iter(process.stdout.readline, ""):
if download_id not in download_processes:
process.terminate()
break
line = line.strip()
if line:
download.append_log(line)
if 'Downloaded' in line or '100%' in line:
progress = int((idx / total_urls) * 100)
download.progress = progress
db.session.commit()
process.stdout.close()
process.wait()
if process.returncode != 0:
stderr = process.stderr.read()
download.append_log(f"Erreur: {stderr}")
db.session.commit()
download.progress = int((idx / total_urls) * 100)
db.session.commit()
download.append_log("Ajout des couvertures...")
db.session.commit()
sacad_command = ['sacad_r', download_dir, '900', 'cover.jpg']
sacad_process = subprocess.Popen(
['stdbuf', '-oL'] + sacad_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1
)
for line in iter(sacad_process.stdout.readline, ""):
if download_id not in download_processes:
sacad_process.terminate()
break
line = line.strip()
if line:
download.append_log(line)
db.session.commit()
sacad_process.wait()
if copy_choice == 'yes':
download.append_log(f"Copie vers {copy_destination}...")
db.session.commit()
if os.path.exists(download_dir) and os.listdir(download_dir):
for root, dirs, files in os.walk(download_dir):
for file in files:
src = os.path.join(root, file)
dst = os.path.join(copy_destination, os.path.relpath(src, download_dir))
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
download.append_log("Copie terminée.")
else:
download.append_log("Aucune copie - répertoire vide.")
download.status = 'completed'
download.progress = 100
download.completed_at = datetime.utcnow()
download.append_log("=== Téléchargement terminé ===")
db.session.commit()
except Exception as e:
download.status = 'error'
download.error = str(e)
download.append_log(f"Erreur critique: {e}")
db.session.commit()
finally:
if download_id in download_processes:
del download_processes[download_id]
@app.route('/')
@login_required
def index():
recent = Download.query.order_by(Download.created_at.desc()).limit(5).all()
return render_template('index.html', recent_downloads=recent)
@app.route('/history')
@login_required
def history():
downloads = Download.query.order_by(Download.created_at.desc()).limit(50).all()
return render_template('history.html', downloads=downloads)
@app.route('/logs/<int:download_id>')
@login_required
def get_logs(download_id):
download = Download.query.get_or_404(download_id)
return Response(download.log, mimetype='text/plain')
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
flash('Connexion réussie', 'success')
next_page = request.args.get('next')
return redirect(next_page or url_for('index'))
else:
flash('Nom d\'utilisateur ou mot de passe incorrect', 'danger')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('index'))
if count_users() > 0:
flash('Un utilisateur existe déjà. Connectez-vous.', 'warning')
return redirect(url_for('login'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
confirm_password = request.form['confirm_password']
if password != confirm_password:
flash('Les mots de passe ne correspondent pas', 'danger')
return redirect(url_for('register'))
if User.query.filter_by(username=username).first():
flash('Ce nom d\'utilisateur est déjà pris', 'danger')
return redirect(url_for('register'))
user = User(username=username)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('Compte créé ! Vous pouvez maintenant vous connecter.', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Vous êtes déconnecté', 'info')
return redirect(url_for('login'))
@app.route('/api/download', methods=['POST'])
@login_required
def api_download():
data = request.get_json()
urls_raw = data.get('url', '')
format_choice = data.get('format', 'mp3')
delete_old = data.get('delete_old', False)
copy_choice = data.get('copy_choice', 'no')
urls = [u.strip() for u in urls_raw.split('\n') if u.strip()]
if not urls:
return jsonify({'error': 'Aucune URL fournie'}), 400
for url in urls:
if not validate_spotify_url(url):
return jsonify({'error': f'URL invalide: {url}'}), 400
download = Download(
url=urls_raw,
format_type=format_choice,
status='pending',
progress=0
)
db.session.add(download)
db.session.commit()
thread = threading.Thread(
target=run_download,
args=(download.id, urls, format_choice, delete_old, copy_choice)
)
thread.daemon = True
thread.start()
return jsonify({'id': download.id, 'status': 'started'})
@app.route('/api/download/<int:download_id>/stream')
@login_required
def stream_download(download_id):
download = Download.query.get_or_404(download_id)
download_id_val = download.id
def generate():
with app.app_context():
download = Download.query.get(download_id_val)
last_len = 0
while True:
db.session.refresh(download)
current_len = len(download.log)
if current_len > last_len:
new_log = download.log[last_len:]
data = f"data: {download.id}|{download.status}|{download.progress}|{new_log}\n\n"
yield data
last_len = current_len
if download.status in ('completed', 'error', 'cancelled'):
data = f"data: {download.id}|{download.status}|{download.progress}|__END__\n\n"
yield data
break
import time
time.sleep(0.5)
return Response(generate(), mimetype='text/event-stream')
@app.route('/api/download/<int:download_id>/cancel', methods=['POST'])
@login_required
def cancel_download(download_id):
download = Download.query.get_or_404(download_id)
if download_id in download_processes:
download_processes[download_id].terminate()
del download_processes[download_id]
download.status = 'cancelled'
download.completed_at = datetime.utcnow()
download.append_log("=== Annulé par l'utilisateur ===")
db.session.commit()
return jsonify({'status': 'cancelled'})
@app.route('/api/download/<int:download_id>', methods=['GET'])
@login_required
def get_download(download_id):
download = Download.query.get_or_404(download_id)
return jsonify({
'id': download.id,
'url': download.url,
'format': download.format_type,
'status': download.status,
'progress': download.progress,
'log': download.log,
'error': download.error,
'created_at': download.created_at.isoformat() if download.created_at else None,
'completed_at': download.completed_at.isoformat() if download.completed_at else None
})
@app.route('/api/download/<int:download_id>', methods=['DELETE'])
@login_required
def delete_download(download_id):
download = Download.query.get_or_404(download_id)
if download_id in download_processes:
return jsonify({'error': 'Impossible de supprimer un téléchargement en cours'}), 400
db.session.delete(download)
db.session.commit()
return jsonify({'status': 'deleted'})
def init_app():
with app.app_context():
db.create_all()
if count_users() == 0:
print("Aucun utilisateur trouvé. Accédez à /register pour créer un compte.")
if __name__ == '__main__':
init_app()
app.run(debug=True)