443 lines
14 KiB
Python
443 lines
14 KiB
Python
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
from datetime import datetime
|
|
from queue import Queue
|
|
|
|
from dotenv import load_dotenv
|
|
from flask import Flask, render_template, request, redirect, url_for, flash, Response, jsonify, abort
|
|
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'change-me-in-production')
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URI', 'sqlite:///users.db')
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
|
|
app.config['SESSION_COOKIE_SECURE'] = True
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = 'login'
|
|
|
|
|
|
class User(db.Model, UserMixin):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(80), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(256), nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
|
|
class Download(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
url = db.Column(db.String(512), nullable=False)
|
|
format_type = db.Column(db.String(10), nullable=False)
|
|
status = db.Column(db.String(20), default='pending')
|
|
progress = db.Column(db.Integer, default=0)
|
|
log = db.Column(db.Text, default='')
|
|
error = db.Column(db.Text, nullable=True)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
completed_at = db.Column(db.DateTime, nullable=True)
|
|
process_id = db.Column(db.Integer, nullable=True)
|
|
|
|
def append_log(self, message):
|
|
self.log += f"{datetime.now().strftime('%H:%M:%S')} {message}\n"
|
|
|
|
|
|
download_processes = {}
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
|
|
def count_users():
|
|
return User.query.count()
|
|
|
|
|
|
@app.cli.command("init-db")
|
|
def init_db():
|
|
db.create_all()
|
|
print("Database initialized.")
|
|
|
|
|
|
@app.cli.command("create-admin")
|
|
def create_admin():
|
|
if count_users() > 0:
|
|
print("Users already exist.")
|
|
return
|
|
|
|
username = input("Username: ")
|
|
password = input("Password: ")
|
|
|
|
user = User(username=username)
|
|
user.set_password(password)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
print(f"Admin user '{username}' created.")
|
|
|
|
|
|
def validate_spotify_url(url):
|
|
patterns = [
|
|
r'https?://open\.spotify\.com/[^/]+/track/[\w-]+',
|
|
r'https?://open\.spotify\.com/[^/]+/album/[\w-]+',
|
|
r'https?://open\.spotify\.com/[^/]+/playlist/[\w-]+',
|
|
r'https?://open\.spotify\.com/track/[\w-]+',
|
|
r'https?://open\.spotify\.com/album/[\w-]+',
|
|
r'https?://open\.spotify\.com/playlist/[\w-]+',
|
|
r'https?://spotify:[\w-]+',
|
|
]
|
|
return any(re.match(pattern, url) for pattern in patterns)
|
|
|
|
|
|
def run_download(download_id, urls, format_type, delete_old, copy_choice):
|
|
with app.app_context():
|
|
download = Download.query.get(download_id)
|
|
if not download:
|
|
return
|
|
|
|
download.status = 'running'
|
|
db.session.commit()
|
|
|
|
if format_type == 'flac':
|
|
download_dir = os.getenv('DOWNLOAD_DIR_FLAC', '/home/jules/Musique/flac')
|
|
else:
|
|
download_dir = os.getenv('DOWNLOAD_DIR_MP3', '/home/jules/Musique/mp3')
|
|
|
|
copy_destination = os.getenv('COPY_DESTINATION', '/mnt/data/Musique')
|
|
|
|
if delete_old and os.path.exists(download_dir):
|
|
shutil.rmtree(download_dir)
|
|
|
|
os.makedirs(download_dir, exist_ok=True)
|
|
|
|
total_urls = len(urls)
|
|
|
|
try:
|
|
for idx, url in enumerate(urls, 1):
|
|
download.append_log(f"[{idx}/{total_urls}] Téléchargement: {url}")
|
|
db.session.commit()
|
|
|
|
spotdl_command = [
|
|
'spotdl',
|
|
'--output', '{artist}/{album}/{track-number} - {title}.{output-ext}',
|
|
'--format', format_type,
|
|
url
|
|
]
|
|
|
|
process = subprocess.Popen(
|
|
['stdbuf', '-oL'] + spotdl_command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
cwd=download_dir,
|
|
bufsize=1
|
|
)
|
|
download_processes[download_id] = process
|
|
download.process_id = process.pid
|
|
db.session.commit()
|
|
|
|
for line in iter(process.stdout.readline, ""):
|
|
if download_id not in download_processes:
|
|
process.terminate()
|
|
break
|
|
line = line.strip()
|
|
if line:
|
|
download.append_log(line)
|
|
if 'Downloaded' in line or '100%' in line:
|
|
progress = int((idx / total_urls) * 100)
|
|
download.progress = progress
|
|
db.session.commit()
|
|
|
|
process.stdout.close()
|
|
process.wait()
|
|
|
|
if process.returncode != 0:
|
|
stderr = process.stderr.read()
|
|
download.append_log(f"Erreur: {stderr}")
|
|
db.session.commit()
|
|
|
|
download.progress = int((idx / total_urls) * 100)
|
|
db.session.commit()
|
|
|
|
download.append_log("Ajout des couvertures...")
|
|
db.session.commit()
|
|
|
|
sacad_command = ['sacad_r', download_dir, '900', 'cover.jpg']
|
|
sacad_process = subprocess.Popen(
|
|
['stdbuf', '-oL'] + sacad_command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
bufsize=1
|
|
)
|
|
|
|
for line in iter(sacad_process.stdout.readline, ""):
|
|
if download_id not in download_processes:
|
|
sacad_process.terminate()
|
|
break
|
|
line = line.strip()
|
|
if line:
|
|
download.append_log(line)
|
|
db.session.commit()
|
|
|
|
sacad_process.wait()
|
|
|
|
if copy_choice == 'yes':
|
|
download.append_log(f"Copie vers {copy_destination}...")
|
|
db.session.commit()
|
|
|
|
if os.path.exists(download_dir) and os.listdir(download_dir):
|
|
for root, dirs, files in os.walk(download_dir):
|
|
for file in files:
|
|
src = os.path.join(root, file)
|
|
dst = os.path.join(copy_destination, os.path.relpath(src, download_dir))
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
shutil.copy2(src, dst)
|
|
download.append_log("Copie terminée.")
|
|
else:
|
|
download.append_log("Aucune copie - répertoire vide.")
|
|
|
|
download.status = 'completed'
|
|
download.progress = 100
|
|
download.completed_at = datetime.utcnow()
|
|
download.append_log("=== Téléchargement terminé ===")
|
|
db.session.commit()
|
|
|
|
except Exception as e:
|
|
download.status = 'error'
|
|
download.error = str(e)
|
|
download.append_log(f"Erreur critique: {e}")
|
|
db.session.commit()
|
|
|
|
finally:
|
|
if download_id in download_processes:
|
|
del download_processes[download_id]
|
|
|
|
|
|
@app.route('/')
|
|
@login_required
|
|
def index():
|
|
recent = Download.query.order_by(Download.created_at.desc()).limit(5).all()
|
|
return render_template('index.html', recent_downloads=recent)
|
|
|
|
|
|
@app.route('/history')
|
|
@login_required
|
|
def history():
|
|
downloads = Download.query.order_by(Download.created_at.desc()).limit(50).all()
|
|
return render_template('history.html', downloads=downloads)
|
|
|
|
|
|
@app.route('/logs/<int:download_id>')
|
|
@login_required
|
|
def get_logs(download_id):
|
|
download = Download.query.get_or_404(download_id)
|
|
return Response(download.log, mimetype='text/plain')
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
|
|
if user and user.check_password(password):
|
|
login_user(user)
|
|
flash('Connexion réussie', 'success')
|
|
next_page = request.args.get('next')
|
|
return redirect(next_page or url_for('index'))
|
|
else:
|
|
flash('Nom d\'utilisateur ou mot de passe incorrect', 'danger')
|
|
|
|
return render_template('login.html')
|
|
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if count_users() > 0:
|
|
flash('Un utilisateur existe déjà. Connectez-vous.', 'warning')
|
|
return redirect(url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
confirm_password = request.form['confirm_password']
|
|
|
|
if password != confirm_password:
|
|
flash('Les mots de passe ne correspondent pas', 'danger')
|
|
return redirect(url_for('register'))
|
|
|
|
if User.query.filter_by(username=username).first():
|
|
flash('Ce nom d\'utilisateur est déjà pris', 'danger')
|
|
return redirect(url_for('register'))
|
|
|
|
user = User(username=username)
|
|
user.set_password(password)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
flash('Compte créé ! Vous pouvez maintenant vous connecter.', 'success')
|
|
return redirect(url_for('login'))
|
|
|
|
return render_template('register.html')
|
|
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
flash('Vous êtes déconnecté', 'info')
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/api/download', methods=['POST'])
|
|
@login_required
|
|
def api_download():
|
|
data = request.get_json()
|
|
|
|
urls_raw = data.get('url', '')
|
|
format_choice = data.get('format', 'mp3')
|
|
delete_old = data.get('delete_old', False)
|
|
copy_choice = data.get('copy_choice', 'no')
|
|
|
|
urls = [u.strip() for u in urls_raw.split('\n') if u.strip()]
|
|
|
|
if not urls:
|
|
return jsonify({'error': 'Aucune URL fournie'}), 400
|
|
|
|
for url in urls:
|
|
if not validate_spotify_url(url):
|
|
return jsonify({'error': f'URL invalide: {url}'}), 400
|
|
|
|
download = Download(
|
|
url=urls_raw,
|
|
format_type=format_choice,
|
|
status='pending',
|
|
progress=0
|
|
)
|
|
db.session.add(download)
|
|
db.session.commit()
|
|
|
|
thread = threading.Thread(
|
|
target=run_download,
|
|
args=(download.id, urls, format_choice, delete_old, copy_choice)
|
|
)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
return jsonify({'id': download.id, 'status': 'started'})
|
|
|
|
|
|
@app.route('/api/download/<int:download_id>/stream')
|
|
@login_required
|
|
def stream_download(download_id):
|
|
download = Download.query.get_or_404(download_id)
|
|
|
|
def generate():
|
|
last_len = 0
|
|
while True:
|
|
db.session.refresh(download)
|
|
current_len = len(download.log)
|
|
|
|
if current_len > last_len:
|
|
new_log = download.log[last_len:]
|
|
data = f"data: {download.id}|{download.status}|{download.progress}|{new_log}\n\n"
|
|
yield data
|
|
last_len = current_len
|
|
|
|
if download.status in ('completed', 'error'):
|
|
data = f"data: {download.id}|{download.status}|{download.progress}|__END__\n\n"
|
|
yield data
|
|
break
|
|
|
|
import time
|
|
time.sleep(0.5)
|
|
|
|
return Response(generate(), mimetype='text/event-stream')
|
|
|
|
|
|
@app.route('/api/download/<int:download_id>/cancel', methods=['POST'])
|
|
@login_required
|
|
def cancel_download(download_id):
|
|
download = Download.query.get_or_404(download_id)
|
|
|
|
if download_id in download_processes:
|
|
download_processes[download_id].terminate()
|
|
del download_processes[download_id]
|
|
|
|
download.status = 'cancelled'
|
|
download.completed_at = datetime.utcnow()
|
|
download.append_log("=== Annulé par l'utilisateur ===")
|
|
db.session.commit()
|
|
|
|
return jsonify({'status': 'cancelled'})
|
|
|
|
|
|
@app.route('/api/download/<int:download_id>', methods=['GET'])
|
|
@login_required
|
|
def get_download(download_id):
|
|
download = Download.query.get_or_404(download_id)
|
|
return jsonify({
|
|
'id': download.id,
|
|
'url': download.url,
|
|
'format': download.format_type,
|
|
'status': download.status,
|
|
'progress': download.progress,
|
|
'log': download.log,
|
|
'error': download.error,
|
|
'created_at': download.created_at.isoformat() if download.created_at else None,
|
|
'completed_at': download.completed_at.isoformat() if download.completed_at else None
|
|
})
|
|
|
|
|
|
@app.route('/api/download/<int:download_id>', methods=['DELETE'])
|
|
@login_required
|
|
def delete_download(download_id):
|
|
download = Download.query.get_or_404(download_id)
|
|
|
|
if download_id in download_processes:
|
|
return jsonify({'error': 'Impossible de supprimer un téléchargement en cours'}), 400
|
|
|
|
db.session.delete(download)
|
|
db.session.commit()
|
|
|
|
return jsonify({'status': 'deleted'})
|
|
|
|
|
|
def init_app():
|
|
with app.app_context():
|
|
db.create_all()
|
|
if count_users() == 0:
|
|
print("Aucun utilisateur trouvé. Accédez à /register pour créer un compte.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
init_app()
|
|
app.run(debug=True)
|