englisch6a/app/app.py
2026-01-28 19:30:07 +00:00

280 lines
9.0 KiB
Python

import os
import random
import time
import psycopg2
from flask import Flask, render_template, request, jsonify, session
from difflib import SequenceMatcher
app = Flask(__name__)
# Sessions Cookies
app.secret_key = 'super-secret-key-for-english-trainer'
# Wie viele der letzten Wörter sollen pro Nutzer blockiert werden?
HISTORY_LIMIT = 15
# --- DB VERBINDUNG & RESET ---
def get_db_connection():
max_retries = 10
for i in range(max_retries):
try:
conn = psycopg2.connect(
host=os.environ.get('DB_HOST', 'postgres'),
database=os.environ.get('DB_NAME', 'englisch6a'),
user=os.environ.get('DB_USER', 'englisch6a'),
password=os.environ.get('DB_PASS', 'englisch')
)
return conn
except psycopg2.OperationalError:
time.sleep(2)
continue
raise Exception("DB Connection failed")
def reset_db_on_startup():
try:
conn = get_db_connection()
cur = conn.cursor()
if os.path.exists('init.sql'):
with open('init.sql', 'r', encoding='utf-8') as f:
cur.execute(f.read())
conn.commit()
print("DB Reset erfolgreich.")
cur.close()
conn.close()
except Exception as e:
print(f"DB Reset Fehler: {e}")
reset_db_on_startup()
# --- HILFSFUNKTIONEN ---
def clean_text(text):
return text.lower().strip()
def clean_vocab_input(text):
t = text.lower().strip()
t = t.replace('(', '').replace(')', '').replace('[', '').replace(']', '')
t = t.strip()
if t.startswith('to '):
t = t[3:].strip()
return t
def check_part(user_in, correct_str):
variants = [x.strip().lower() for x in correct_str.split('/')]
return user_in in variants
def update_user_history(word):
"""Speichert das Wort im Cookie des Nutzers"""
# History aus Cookie laden (oder leere Liste)
history = session.get('history', [])
# Wort hinzufügen
history.append(word)
# Begrenzen auf die letzten X
if len(history) > HISTORY_LIMIT:
history = history[-HISTORY_LIMIT:]
# Zurück in den Cookie speichern
session['history'] = history
# --- API ROUTEN ---
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/pages')
def get_pages():
mode_type = request.args.get('type', 'vocab')
conn = get_db_connection()
cur = conn.cursor()
if mode_type == 'irregular':
cur.execute("SELECT DISTINCT page FROM irregular_verbs ORDER BY page")
else:
cur.execute("SELECT DISTINCT page FROM vocabulary ORDER BY page")
rows = cur.fetchall()
cur.close()
conn.close()
pages = sorted([r[0] for r in rows])
return jsonify(pages)
@app.route('/api/question', methods=['POST'])
def get_question():
data = request.json
main_mode = data.get('mainMode')
sub_mode = data.get('subMode')
pages = data.get('pages', [])
conn = get_db_connection()
cur = conn.cursor()
# Persönliche History aus dem Cookie holen
user_history = session.get('history', [])
row = None
# --- LOGIK FÜR UNREGELMÄSSIGE VERBEN ---
if main_mode == 'irregular':
# Wir bauen den Query dynamisch: Schließe Wörter aus History aus
if user_history:
query_smart = "SELECT infinitive, simple_past, past_participle, german, page FROM irregular_verbs WHERE page = ANY(%s) AND infinitive NOT IN %s ORDER BY RANDOM() LIMIT 1"
cur.execute(query_smart, (pages, tuple(user_history)))
else:
query_smart = "SELECT infinitive, simple_past, past_participle, german, page FROM irregular_verbs WHERE page = ANY(%s) ORDER BY RANDOM() LIMIT 1"
cur.execute(query_smart, (pages,))
row = cur.fetchone()
# Fallback: Wenn durch History keine Wörter mehr übrig sind -> Reset (Zufall aus allem)
if not row:
print("User History voll/blockiert alles -> Fallback!")
query_fallback = "SELECT infinitive, simple_past, past_participle, german, page FROM irregular_verbs WHERE page = ANY(%s) ORDER BY RANDOM() LIMIT 1"
cur.execute(query_fallback, (pages,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Keine Verben gefunden.'})
# Speichern im Cookie (Infinitive)
update_user_history(row[0])
cur.close(); conn.close()
if sub_mode == 'start-german':
return jsonify({
'type': 'irregular_full',
'question': row[3],
'answer_infinitive': row[0],
'answer_simple': row[1],
'answer_participle': row[2],
'page': row[4]
})
else:
return jsonify({
'type': 'irregular_standard',
'question': row[0],
'german_hint': row[3],
'answer_simple': row[1],
'answer_participle': row[2],
'page': row[4]
})
# --- LOGIK FÜR NORMALE VOKABELN ---
else:
# Auch hier: Exclude History
if user_history:
query_smart = "SELECT english, german, page FROM vocabulary WHERE page = ANY(%s) AND english NOT IN %s ORDER BY RANDOM() LIMIT 1"
cur.execute(query_smart, (pages, tuple(user_history)))
else:
query_smart = "SELECT english, german, page FROM vocabulary WHERE page = ANY(%s) ORDER BY RANDOM() LIMIT 1"
cur.execute(query_smart, (pages,))
row = cur.fetchone()
# Fallback
if not row:
query_fallback = "SELECT english, german, page FROM vocabulary WHERE page = ANY(%s) ORDER BY RANDOM() LIMIT 1"
cur.execute(query_fallback, (pages,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Keine Vokabeln gefunden.'})
# Speichern im Cookie (Englisch als ID)
update_user_history(row[0])
cur.close(); conn.close()
q_text, a_text = "", ""
if sub_mode == 'de-en':
q_text, a_text = row[1], row[0]
elif sub_mode == 'en-de':
q_text, a_text = row[0], row[1]
else:
if random.random() > 0.5: q_text, a_text = row[1], row[0]
else: q_text, a_text = row[0], row[1]
return jsonify({
'type': 'vocab',
'question': q_text,
'answer': a_text,
'page': row[2]
})
@app.route('/api/check', methods=['POST'])
def check_answer():
data = request.json
q_type = data.get('type')
# --- VERBEN ---
if q_type.startswith('irregular'):
is_correct = False
if q_type == 'irregular_full':
u_inf = clean_text(data.get('infinitive', ''))
u_simp = clean_text(data.get('simple', ''))
u_part = clean_text(data.get('participle', ''))
ok_inf = check_part(u_inf, data.get('correct_infinitive', ''))
ok_simp = check_part(u_simp, data.get('correct_simple', ''))
ok_part = check_part(u_part, data.get('correct_participle', ''))
if ok_inf and ok_simp and ok_part: is_correct = True
elif q_type == 'irregular_standard':
u_simp = clean_text(data.get('simple', ''))
u_part = clean_text(data.get('participle', ''))
ok_simp = check_part(u_simp, data.get('correct_simple', ''))
ok_part = check_part(u_part, data.get('correct_participle', ''))
if ok_simp and ok_part: is_correct = True
if is_correct:
return jsonify({'status': 'correct', 'msg': 'Richtig!'})
else:
return jsonify({'status': 'wrong', 'msg': 'Leider nicht ganz richtig.'})
# --- VOKABELN ---
else:
u_input_raw = data.get('input', '')
correct_raw = data.get('correct', '')
u_clean = clean_vocab_input(u_input_raw)
valid_answers = []
raw_parts = correct_raw.split(';')
for p in raw_parts:
valid_answers.append(clean_vocab_input(p))
if u_clean in valid_answers:
return jsonify({'status': 'correct', 'msg': 'Richtig!'})
best_ratio = 0
best_match = ""
for v in valid_answers:
ratio = SequenceMatcher(None, u_clean, v).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_match = v
threshold = 0.85
if len(best_match) < 5:
threshold = 0.9
if best_ratio >= threshold:
return jsonify({
'status': 'typo',
'msg': f'Fast richtig! Meintest du: "{best_match}"? (Achte auf die Schreibweise)'
})
return jsonify({'status': 'wrong', 'msg': 'Leider falsch.'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)