1.2.0.0
This commit is contained in:
188
app.py
188
app.py
@@ -1,129 +1,145 @@
|
||||
from flask import Flask, render_template, request, Response, redirect, url_for, send_file
|
||||
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
||||
import subprocess
|
||||
import os
|
||||
import shutil
|
||||
import zipfile
|
||||
from io import BytesIO
|
||||
import subprocess
|
||||
from flask import Flask, render_template, request, redirect, url_for, flash, Reponse
|
||||
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
|
||||
import sys
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = 'your_secret_key'
|
||||
app.secret_key = 'votre_cle_secrete' # Clé secrète pour Flask
|
||||
|
||||
# Configuration Flask-Login
|
||||
# Gestion de l'authentification
|
||||
login_manager = LoginManager()
|
||||
login_manager.init_app(app)
|
||||
login_manager.login_view = 'login'
|
||||
login_manager.login_view = 'login' # Rediriger ici si non connecté
|
||||
|
||||
# Utilisateur fictif
|
||||
# Simuler une base de données utilisateur
|
||||
USERS = {
|
||||
"admin": {"password": "password123"}
|
||||
}
|
||||
|
||||
# Classe utilisateur
|
||||
class User(UserMixin):
|
||||
def __init__(self, id):
|
||||
self.id = id
|
||||
|
||||
# Stockage simple d'utilisateurs (en production, utiliser une base de données)
|
||||
users = {'garfi': {'admin': 'password123'}}
|
||||
def __init__(self, username):
|
||||
self.id = username
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
return User(user_id)
|
||||
if user_id in USERS:
|
||||
return User(user_id)
|
||||
return None
|
||||
|
||||
@app.route('/')
|
||||
@login_required
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
def login():
|
||||
if request.method == 'POST':
|
||||
username = request.form['username']
|
||||
password = request.form['password']
|
||||
if username in users and users[username]['password'] == password:
|
||||
|
||||
if username in USERS and USERS[username]['password'] == password:
|
||||
user = User(username)
|
||||
login_user(user)
|
||||
return redirect(url_for('index'))
|
||||
flash('Connexion réussie', 'success')
|
||||
return redirect(url_for('index')) # Redirection vers la page d'accueil
|
||||
else:
|
||||
return "Nom d'utilisateur ou mot de passe incorrect", 401
|
||||
flash('Nom d\'utilisateur ou mot de passe incorrect', 'danger')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
return render_template('login.html')
|
||||
|
||||
@app.route('/logout')
|
||||
@login_required
|
||||
def logout():
|
||||
logout_user()
|
||||
flash('Vous êtes déconnecté', 'info')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
@app.route('/')
|
||||
@login_required
|
||||
def index():
|
||||
return render_template('stream.html')
|
||||
# Fonction pour exécuter une commande en temps réel
|
||||
def run_command_live(command, cwd=None):
|
||||
process = subprocess.Popen(['stdbuf', '-oL'] + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, cwd=cwd, bufsize=1)
|
||||
|
||||
for stdout_line in iter(process.stdout.readline, ""):
|
||||
yield stdout_line
|
||||
sys.stdout.flush()
|
||||
process.stdout.close()
|
||||
process.wait()
|
||||
|
||||
# Fonction pour copier les fichiers individuellement
|
||||
def copy_files(source, destination):
|
||||
try:
|
||||
if os.path.exists(source):
|
||||
if len(os.listdir(source)) > 0: # Vérifier si le répertoire contient des fichiers
|
||||
for root, dirs, files in os.walk(source):
|
||||
for file in files:
|
||||
file_source = os.path.join(root, file)
|
||||
file_destination = os.path.join(destination, os.path.relpath(file_source, source))
|
||||
os.makedirs(os.path.dirname(file_destination), exist_ok=True)
|
||||
shutil.copy2(file_source, file_destination)
|
||||
return f"Fichiers copiés vers {destination}\n"
|
||||
else:
|
||||
return f"Le répertoire source ({source}) est vide. Aucune copie effectuée.\n"
|
||||
else:
|
||||
return f"Le répertoire source ({source}) n'existe pas. Aucune copie effectuée.\n"
|
||||
except Exception as e:
|
||||
return f"Erreur lors de la copie : {str(e)}\n"
|
||||
|
||||
# Fonction pour gérer le téléchargement d'une seule URL
|
||||
@app.route('/download', methods=['POST'])
|
||||
@login_required
|
||||
def download():
|
||||
url = request.form['url']
|
||||
format_choice = request.form['format']
|
||||
delete_choice = request.form['delete_choice']
|
||||
copy_choice = request.form['copy_choice'] # Nouvelle option pour copier ou non
|
||||
url = request.form.get('url') # Récupérer une seule URL
|
||||
format_choice = request.form.get('format', 'mp3') # Par défaut à 'mp3'
|
||||
delete_old = request.form.get('delete_old', False)
|
||||
copy_choice = request.form.get('copy_choice', 'no')
|
||||
|
||||
def generate_output():
|
||||
# Répertoires pour le téléchargement en fonction du format choisi
|
||||
output_dir = "/home/garfi/Musique/flac" if format_choice == "flac" else "/home/garfi/Musique/mp3"
|
||||
if not url:
|
||||
return "Aucune URL n'a été soumise.", 400 # Gérer le cas où aucune URL n'est soumise
|
||||
|
||||
# Si l'utilisateur a choisi de supprimer les anciens répertoires
|
||||
if delete_choice == "1":
|
||||
yield "Suppression des anciens répertoires...\n"
|
||||
try:
|
||||
if os.path.exists("/home/garfi/Musique/flac"):
|
||||
shutil.rmtree("/home/garfi/Musique/flac")
|
||||
yield "Le répertoire /home/garfi/Musique/flac a été supprimé.\n"
|
||||
# Déterminer le répertoire de téléchargement en fonction du format
|
||||
if format_choice == 'flac':
|
||||
download_dir = "/home/jules/Musique/flac"
|
||||
format_type = 'flac'
|
||||
else:
|
||||
download_dir = "/home/jules/Musique/mp3"
|
||||
format_type = 'mp3'
|
||||
|
||||
if os.path.exists("/home/garfi/Musique/mp3"):
|
||||
shutil.rmtree("/home/garfi/Musique/mp3")
|
||||
yield "Le répertoire /home/garfi/Musique/mp3 a été supprimé.\n"
|
||||
except FileNotFoundError as e:
|
||||
yield f"Erreur : {str(e)} - Le répertoire n'existe pas.\n"
|
||||
except Exception as e:
|
||||
yield f"Erreur inattendue lors de la suppression : {str(e)}\n"
|
||||
# Supprimer les anciens répertoires si l'utilisateur a choisi
|
||||
if delete_old:
|
||||
if os.path.exists(download_dir):
|
||||
shutil.rmtree(download_dir)
|
||||
|
||||
# Créer le répertoire pour stocker les nouveaux fichiers
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
# Créer le répertoire de téléchargement s'il n'existe pas
|
||||
os.makedirs(download_dir, exist_ok=True)
|
||||
|
||||
# Lancer le téléchargement avec spotdl
|
||||
command = f'spotdl --output "{{artist}}/{{album}}/{{track-number}} - {{title}}.{{output-ext}}" "{url}" --format={format_choice}'
|
||||
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=output_dir)
|
||||
# Commande pour télécharger la musique avec spotdl dans le répertoire correct
|
||||
spotdl_command = [
|
||||
'spotdl',
|
||||
'--output', '{artist}/{album}/{track-number} - {title}.{output-ext}',
|
||||
'--format', format_type,
|
||||
url
|
||||
]
|
||||
|
||||
for line in process.stdout:
|
||||
yield line # Envoyer les lignes de la commande en temps réel
|
||||
def stream():
|
||||
yield f"Téléchargement en cours dans {download_dir}...\n"
|
||||
for output in run_command_live(spotdl_command, cwd=download_dir):
|
||||
yield output
|
||||
|
||||
# Exécuter sacad_r pour ajouter les couvertures
|
||||
sacad_command = ['sacad_r', download_dir, '900', 'cover.jpg']
|
||||
yield "Ajout des couvertures...\n"
|
||||
for output in run_command_live(sacad_command):
|
||||
yield output
|
||||
|
||||
# Copier les fichiers vers /mnt/data/Musique si l'utilisateur a choisi "Oui"
|
||||
if copy_choice == 'yes':
|
||||
yield f"Copie des fichiers depuis {download_dir} vers /mnt/data/Musique...\n"
|
||||
yield copy_files(download_dir, "/mnt/data/Musique")
|
||||
|
||||
process.wait()
|
||||
if process.returncode == 0:
|
||||
yield "\nTéléchargement terminé avec succès.\n"
|
||||
|
||||
|
||||
# Si l'utilisateur a choisi de copier vers /mnt/data/Musique/
|
||||
if copy_choice == "1":
|
||||
share_dir = "/mnt/data/Musique/"
|
||||
try:
|
||||
shutil.copytree(output_dir, share_dir, dirs_exist_ok=True)
|
||||
yield f"\nLes fichiers ont été copiés vers {share_dir}.\n"
|
||||
except Exception as e:
|
||||
yield f"\nErreur lors de la copie des fichiers vers {share_dir} : {str(e)}\n"
|
||||
else:
|
||||
yield f"\nUne erreur est survenue avec le code {process.returncode}.\n"
|
||||
|
||||
return Response(generate_output(), mimetype='text/plain')
|
||||
|
||||
@app.route('/download_zip')
|
||||
@login_required
|
||||
def download_zip():
|
||||
format_choice = request.args.get('format')
|
||||
output_dir = "/home/garfi/Musique/flac" if format_choice == "flac" else "/home/garfi/Musique/mp3"
|
||||
|
||||
# Créer un fichier ZIP dans la mémoire
|
||||
memory_file = BytesIO()
|
||||
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||||
for root, dirs, files in os.walk(output_dir):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
zf.write(file_path, arcname=os.path.relpath(file_path, output_dir))
|
||||
|
||||
memory_file.seek(0)
|
||||
|
||||
# Télécharger le fichier ZIP
|
||||
return send_file(memory_file, mimetype='application/zip', as_attachment=True, download_name=f'musique_{format_choice}.zip')
|
||||
return Response(stream(), mimetype='text/plain')
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
|
||||
Reference in New Issue
Block a user