New version
This commit is contained in:
485
app.py
485
app.py
@@ -1,56 +1,309 @@
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from flask import Flask, render_template, request, redirect, url_for, flash, Response
|
||||
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
|
||||
import sys
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from queue import Queue
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from flask import Flask, render_template, request, redirect, url_for, flash, Response, jsonify, abort
|
||||
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
|
||||
from werkzeug.security import generate_password_hash, check_password_hash
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
|
||||
load_dotenv()
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = 'votre_cle_secrete' # Clé secrète pour Flask
|
||||
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'change-me-in-production')
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URI', 'sqlite:///users.db')
|
||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||||
|
||||
app.config['SESSION_COOKIE_SECURE'] = True
|
||||
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
||||
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
||||
|
||||
db = SQLAlchemy(app)
|
||||
|
||||
# Gestion de l'authentification
|
||||
login_manager = LoginManager()
|
||||
login_manager.init_app(app)
|
||||
login_manager.login_view = 'login' # Rediriger ici si non connecté
|
||||
login_manager.login_view = 'login'
|
||||
|
||||
# Simuler une base de données utilisateur
|
||||
USERS = {
|
||||
"admin": {"password": "password123"}
|
||||
}
|
||||
|
||||
# Classe utilisateur
|
||||
class User(UserMixin):
|
||||
def __init__(self, username):
|
||||
self.id = username
|
||||
class User(db.Model, UserMixin):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
username = db.Column(db.String(80), unique=True, nullable=False)
|
||||
password_hash = db.Column(db.String(256), nullable=False)
|
||||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||||
|
||||
def set_password(self, password):
|
||||
self.password_hash = generate_password_hash(password)
|
||||
|
||||
def check_password(self, password):
|
||||
return check_password_hash(self.password_hash, password)
|
||||
|
||||
|
||||
class Download(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
url = db.Column(db.String(512), nullable=False)
|
||||
format_type = db.Column(db.String(10), nullable=False)
|
||||
status = db.Column(db.String(20), default='pending')
|
||||
progress = db.Column(db.Integer, default=0)
|
||||
log = db.Column(db.Text, default='')
|
||||
error = db.Column(db.Text, nullable=True)
|
||||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||||
completed_at = db.Column(db.DateTime, nullable=True)
|
||||
process_id = db.Column(db.Integer, nullable=True)
|
||||
|
||||
def append_log(self, message):
|
||||
self.log += f"{datetime.now().strftime('%H:%M:%S')} {message}\n"
|
||||
|
||||
|
||||
download_processes = {}
|
||||
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
if user_id in USERS:
|
||||
return User(user_id)
|
||||
return None
|
||||
return User.query.get(int(user_id))
|
||||
|
||||
|
||||
def count_users():
|
||||
return User.query.count()
|
||||
|
||||
|
||||
@app.cli.command("init-db")
|
||||
def init_db():
|
||||
db.create_all()
|
||||
print("Database initialized.")
|
||||
|
||||
|
||||
@app.cli.command("create-admin")
|
||||
def create_admin():
|
||||
if count_users() > 0:
|
||||
print("Users already exist.")
|
||||
return
|
||||
|
||||
username = input("Username: ")
|
||||
password = input("Password: ")
|
||||
|
||||
user = User(username=username)
|
||||
user.set_password(password)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
print(f"Admin user '{username}' created.")
|
||||
|
||||
|
||||
def validate_spotify_url(url):
|
||||
patterns = [
|
||||
r'https?://open\.spotify\.com/track/[\w-]+',
|
||||
r'https?://open\.spotify\.com/album/[\w-]+',
|
||||
r'https?://open\.spotify\.com/playlist/[\w-]+',
|
||||
r'https?://spotify:[\w-]+',
|
||||
]
|
||||
return any(re.match(pattern, url) for pattern in patterns)
|
||||
|
||||
|
||||
def run_download(download_id, urls, format_type, delete_old, copy_choice):
|
||||
download = Download.query.get(download_id)
|
||||
if not download:
|
||||
return
|
||||
|
||||
download.status = 'running'
|
||||
db.session.commit()
|
||||
|
||||
if format_type == 'flac':
|
||||
download_dir = os.getenv('DOWNLOAD_DIR_FLAC', '/home/jules/Musique/flac')
|
||||
else:
|
||||
download_dir = os.getenv('DOWNLOAD_DIR_MP3', '/home/jules/Musique/mp3')
|
||||
|
||||
copy_destination = os.getenv('COPY_DESTINATION', '/mnt/data/Musique')
|
||||
|
||||
if delete_old and os.path.exists(download_dir):
|
||||
shutil.rmtree(download_dir)
|
||||
|
||||
os.makedirs(download_dir, exist_ok=True)
|
||||
|
||||
total_urls = len(urls)
|
||||
|
||||
try:
|
||||
for idx, url in enumerate(urls, 1):
|
||||
download.append_log(f"[{idx}/{total_urls}] Téléchargement: {url}")
|
||||
db.session.commit()
|
||||
|
||||
spotdl_command = [
|
||||
'spotdl',
|
||||
'--output', '{artist}/{album}/{track-number} - {title}.{output-ext}',
|
||||
'--format', format_type,
|
||||
url
|
||||
]
|
||||
|
||||
process = subprocess.Popen(
|
||||
['stdbuf', '-oL'] + spotdl_command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=True,
|
||||
cwd=download_dir,
|
||||
bufsize=1
|
||||
)
|
||||
download_processes[download_id] = process
|
||||
download.process_id = process.pid
|
||||
db.session.commit()
|
||||
|
||||
for line in iter(process.stdout.readline, ""):
|
||||
if download_id not in download_processes:
|
||||
process.terminate()
|
||||
break
|
||||
line = line.strip()
|
||||
if line:
|
||||
download.append_log(line)
|
||||
if 'Downloaded' in line or '100%' in line:
|
||||
progress = int((idx / total_urls) * 100)
|
||||
download.progress = progress
|
||||
db.session.commit()
|
||||
|
||||
process.stdout.close()
|
||||
process.wait()
|
||||
|
||||
if process.returncode != 0:
|
||||
stderr = process.stderr.read()
|
||||
download.append_log(f"Erreur: {stderr}")
|
||||
db.session.commit()
|
||||
|
||||
download.progress = int((idx / total_urls) * 100)
|
||||
db.session.commit()
|
||||
|
||||
download.append_log("Ajout des couvertures...")
|
||||
db.session.commit()
|
||||
|
||||
sacad_command = ['sacad_r', download_dir, '900', 'cover.jpg']
|
||||
sacad_process = subprocess.Popen(
|
||||
['stdbuf', '-oL'] + sacad_command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=True,
|
||||
bufsize=1
|
||||
)
|
||||
|
||||
for line in iter(sacad_process.stdout.readline, ""):
|
||||
if download_id not in download_processes:
|
||||
sacad_process.terminate()
|
||||
break
|
||||
line = line.strip()
|
||||
if line:
|
||||
download.append_log(line)
|
||||
db.session.commit()
|
||||
|
||||
sacad_process.wait()
|
||||
|
||||
if copy_choice == 'yes':
|
||||
download.append_log(f"Copie vers {copy_destination}...")
|
||||
db.session.commit()
|
||||
|
||||
if os.path.exists(download_dir) and os.listdir(download_dir):
|
||||
for root, dirs, files in os.walk(download_dir):
|
||||
for file in files:
|
||||
src = os.path.join(root, file)
|
||||
dst = os.path.join(copy_destination, os.path.relpath(src, download_dir))
|
||||
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
||||
shutil.copy2(src, dst)
|
||||
download.append_log("Copie terminée.")
|
||||
else:
|
||||
download.append_log("Aucune copie - répertoire vide.")
|
||||
|
||||
download.status = 'completed'
|
||||
download.progress = 100
|
||||
download.completed_at = datetime.utcnow()
|
||||
download.append_log("=== Téléchargement terminé ===")
|
||||
db.session.commit()
|
||||
|
||||
except Exception as e:
|
||||
download.status = 'error'
|
||||
download.error = str(e)
|
||||
download.append_log(f"Erreur critique: {e}")
|
||||
db.session.commit()
|
||||
|
||||
finally:
|
||||
if download_id in download_processes:
|
||||
del download_processes[download_id]
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@login_required
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
recent = Download.query.order_by(Download.created_at.desc()).limit(5).all()
|
||||
return render_template('index.html', recent_downloads=recent)
|
||||
|
||||
|
||||
@app.route('/history')
|
||||
@login_required
|
||||
def history():
|
||||
downloads = Download.query.order_by(Download.created_at.desc()).limit(50).all()
|
||||
return render_template('history.html', downloads=downloads)
|
||||
|
||||
|
||||
@app.route('/logs/<int:download_id>')
|
||||
@login_required
|
||||
def get_logs(download_id):
|
||||
download = Download.query.get_or_404(download_id)
|
||||
return Response(download.log, mimetype='text/plain')
|
||||
|
||||
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
def login():
|
||||
if current_user.is_authenticated:
|
||||
return redirect(url_for('index'))
|
||||
|
||||
if request.method == 'POST':
|
||||
username = request.form['username']
|
||||
password = request.form['password']
|
||||
|
||||
if username in USERS and USERS[username]['password'] == password:
|
||||
user = User(username)
|
||||
|
||||
user = User.query.filter_by(username=username).first()
|
||||
|
||||
if user and user.check_password(password):
|
||||
login_user(user)
|
||||
flash('Connexion réussie', 'success')
|
||||
return redirect(url_for('index')) # Redirection vers la page d'accueil
|
||||
next_page = request.args.get('next')
|
||||
return redirect(next_page or url_for('index'))
|
||||
else:
|
||||
flash('Nom d\'utilisateur ou mot de passe incorrect', 'danger')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
|
||||
return render_template('login.html')
|
||||
|
||||
|
||||
@app.route('/register', methods=['GET', 'POST'])
|
||||
def register():
|
||||
if current_user.is_authenticated:
|
||||
return redirect(url_for('index'))
|
||||
|
||||
if count_users() > 0:
|
||||
flash('Un utilisateur existe déjà. Connectez-vous.', 'warning')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
if request.method == 'POST':
|
||||
username = request.form['username']
|
||||
password = request.form['password']
|
||||
confirm_password = request.form['confirm_password']
|
||||
|
||||
if password != confirm_password:
|
||||
flash('Les mots de passe ne correspondent pas', 'danger')
|
||||
return redirect(url_for('register'))
|
||||
|
||||
if User.query.filter_by(username=username).first():
|
||||
flash('Ce nom d\'utilisateur est déjà pris', 'danger')
|
||||
return redirect(url_for('register'))
|
||||
|
||||
user = User(username=username)
|
||||
user.set_password(password)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
|
||||
flash('Compte créé ! Vous pouvez maintenant vous connecter.', 'success')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
return render_template('register.html')
|
||||
|
||||
|
||||
@app.route('/logout')
|
||||
@login_required
|
||||
def logout():
|
||||
@@ -58,88 +311,128 @@ def logout():
|
||||
flash('Vous êtes déconnecté', 'info')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
# Fonction pour exécuter une commande en temps réel
|
||||
def run_command_live(command, cwd=None):
|
||||
process = subprocess.Popen(['stdbuf', '-oL'] + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, cwd=cwd, bufsize=1)
|
||||
|
||||
for stdout_line in iter(process.stdout.readline, ""):
|
||||
yield stdout_line
|
||||
sys.stdout.flush()
|
||||
process.stdout.close()
|
||||
process.wait()
|
||||
|
||||
# Fonction pour copier les fichiers individuellement
|
||||
def copy_files(source, destination):
|
||||
try:
|
||||
if os.path.exists(source):
|
||||
if len(os.listdir(source)) > 0: # Vérifier si le répertoire contient des fichiers
|
||||
for root, dirs, files in os.walk(source):
|
||||
for file in files:
|
||||
file_source = os.path.join(root, file)
|
||||
file_destination = os.path.join(destination, os.path.relpath(file_source, source))
|
||||
os.makedirs(os.path.dirname(file_destination), exist_ok=True)
|
||||
shutil.copy2(file_source, file_destination)
|
||||
return f"Fichiers copiés vers {destination}\n"
|
||||
else:
|
||||
return f"Le répertoire source ({source}) est vide. Aucune copie effectuée.\n"
|
||||
else:
|
||||
return f"Le répertoire source ({source}) n'existe pas. Aucune copie effectuée.\n"
|
||||
except Exception as e:
|
||||
return f"Erreur lors de la copie : {str(e)}\n"
|
||||
|
||||
# Fonction pour gérer le téléchargement d'une seule URL
|
||||
@app.route('/download', methods=['POST'])
|
||||
@app.route('/api/download', methods=['POST'])
|
||||
@login_required
|
||||
def download():
|
||||
url = request.form.get('url') # Récupérer une seule URL
|
||||
format_choice = request.form.get('format', 'mp3') # Par défaut à 'mp3'
|
||||
delete_old = request.form.get('delete_old', False)
|
||||
copy_choice = request.form.get('copy_choice', 'no')
|
||||
def api_download():
|
||||
data = request.get_json()
|
||||
|
||||
urls_raw = data.get('url', '')
|
||||
format_choice = data.get('format', 'mp3')
|
||||
delete_old = data.get('delete_old', False)
|
||||
copy_choice = data.get('copy_choice', 'no')
|
||||
|
||||
if not url:
|
||||
return "Aucune URL n'a été soumise.", 400 # Gérer le cas où aucune URL n'est soumise
|
||||
urls = [u.strip() for u in urls_raw.split('\n') if u.strip()]
|
||||
|
||||
if not urls:
|
||||
return jsonify({'error': 'Aucune URL fournie'}), 400
|
||||
|
||||
# Déterminer le répertoire de téléchargement en fonction du format
|
||||
if format_choice == 'flac':
|
||||
download_dir = "/home/jules/Musique/flac"
|
||||
format_type = 'flac'
|
||||
else:
|
||||
download_dir = "/home/jules/Musique/mp3"
|
||||
format_type = 'mp3'
|
||||
for url in urls:
|
||||
if not validate_spotify_url(url):
|
||||
return jsonify({'error': f'URL invalide: {url}'}), 400
|
||||
|
||||
# Supprimer les anciens répertoires si l'utilisateur a choisi
|
||||
if delete_old:
|
||||
if os.path.exists(download_dir):
|
||||
shutil.rmtree(download_dir)
|
||||
download = Download(
|
||||
url=urls_raw,
|
||||
format_type=format_choice,
|
||||
status='pending',
|
||||
progress=0
|
||||
)
|
||||
db.session.add(download)
|
||||
db.session.commit()
|
||||
|
||||
# Créer le répertoire de téléchargement s'il n'existe pas
|
||||
os.makedirs(download_dir, exist_ok=True)
|
||||
thread = threading.Thread(
|
||||
target=run_download,
|
||||
args=(download.id, urls, format_choice, delete_old, copy_choice)
|
||||
)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
# Commande pour télécharger la musique avec spotdl dans le répertoire correct
|
||||
spotdl_command = [
|
||||
'spotdl',
|
||||
'--output', '{artist}/{album}/{track-number} - {title}.{output-ext}',
|
||||
'--format', format_type,
|
||||
url
|
||||
]
|
||||
return jsonify({'id': download.id, 'status': 'started'})
|
||||
|
||||
def stream():
|
||||
yield f"Téléchargement en cours dans {download_dir}...\n"
|
||||
for output in run_command_live(spotdl_command, cwd=download_dir):
|
||||
yield output
|
||||
|
||||
# Exécuter sacad_r pour ajouter les couvertures
|
||||
sacad_command = ['sacad_r', download_dir, '900', 'cover.jpg']
|
||||
yield "Ajout des couvertures...\n"
|
||||
for output in run_command_live(sacad_command):
|
||||
yield output
|
||||
|
||||
# Copier les fichiers vers /mnt/data/Musique si l'utilisateur a choisi "Oui"
|
||||
if copy_choice == 'yes':
|
||||
yield f"Copie des fichiers depuis {download_dir} vers /mnt/data/Musique...\n"
|
||||
yield copy_files(download_dir, "/mnt/data/Musique")
|
||||
|
||||
return Response(stream(), mimetype='text/plain')
|
||||
@app.route('/api/download/<int:download_id>/stream')
|
||||
@login_required
|
||||
def stream_download(download_id):
|
||||
download = Download.query.get_or_404(download_id)
|
||||
|
||||
def generate():
|
||||
last_len = 0
|
||||
while True:
|
||||
db.session.refresh(download)
|
||||
current_len = len(download.log)
|
||||
|
||||
if current_len > last_len:
|
||||
new_log = download.log[last_len:]
|
||||
data = f"data: {download.id}|{download.status}|{download.progress}|{new_log}\n\n"
|
||||
yield data
|
||||
last_len = current_len
|
||||
|
||||
if download.status in ('completed', 'error'):
|
||||
data = f"data: {download.id}|{download.status}|{download.progress}|__END__\n\n"
|
||||
yield data
|
||||
break
|
||||
|
||||
import time
|
||||
time.sleep(0.5)
|
||||
|
||||
return Response(generate(), mimetype='text/event-stream')
|
||||
|
||||
|
||||
@app.route('/api/download/<int:download_id>/cancel', methods=['POST'])
|
||||
@login_required
|
||||
def cancel_download(download_id):
|
||||
download = Download.query.get_or_404(download_id)
|
||||
|
||||
if download_id in download_processes:
|
||||
download_processes[download_id].terminate()
|
||||
del download_processes[download_id]
|
||||
|
||||
download.status = 'cancelled'
|
||||
download.completed_at = datetime.utcnow()
|
||||
download.append_log("=== Annulé par l'utilisateur ===")
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({'status': 'cancelled'})
|
||||
|
||||
|
||||
@app.route('/api/download/<int:download_id>', methods=['GET'])
|
||||
@login_required
|
||||
def get_download(download_id):
|
||||
download = Download.query.get_or_404(download_id)
|
||||
return jsonify({
|
||||
'id': download.id,
|
||||
'url': download.url,
|
||||
'format': download.format_type,
|
||||
'status': download.status,
|
||||
'progress': download.progress,
|
||||
'log': download.log,
|
||||
'error': download.error,
|
||||
'created_at': download.created_at.isoformat() if download.created_at else None,
|
||||
'completed_at': download.completed_at.isoformat() if download.completed_at else None
|
||||
})
|
||||
|
||||
|
||||
@app.route('/api/download/<int:download_id>', methods=['DELETE'])
|
||||
@login_required
|
||||
def delete_download(download_id):
|
||||
download = Download.query.get_or_404(download_id)
|
||||
|
||||
if download_id in download_processes:
|
||||
return jsonify({'error': 'Impossible de supprimer un téléchargement en cours'}), 400
|
||||
|
||||
db.session.delete(download)
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({'status': 'deleted'})
|
||||
|
||||
|
||||
def init_app():
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
if count_users() == 0:
|
||||
print("Aucun utilisateur trouvé. Accédez à /register pour créer un compte.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
init_app()
|
||||
app.run(debug=True)
|
||||
|
||||
Reference in New Issue
Block a user