Threading & Queues: Asynchrone Verarbeitung

Warum können wir Events nicht direkt ausführen?

Stell dir vor, ein Event-Handler würde Dinge direkt ausführen:

# FALSCH - direkt ausführen:
@client.on(GiftEvent)
def on_gift(event):
    execute_minecraft_command(...)   # ← Blockt!
    wait_for_response()              # ← Dauert lange!
    update_overlay(...)              # ← Noch länger!
    # In der Zeit: Neue Events stauen sich auf

Das Problem: Während wir auf die Minecraft-Response warten, können keine neuen TikTok-Events verarbeitet werden. Die TikTok-Verbindung "hängt" und wir verlieren Events!

Die Lösung: Events in eine Queue (Warteschlange) legen und asynchron verarbeiten!

# ✓ RICHTIG - in Queue legen:
@client.on(GiftEvent)
def on_gift(event):
    trigger_queue.put_nowait((target, username))  # ← Sehr schnell!
    # Fertig! Event-Handler kehrt sofort zurück
    
# Ein anderer Thread verarbeitet die Queue:
while True:
    target, username = trigger_queue.get()  # ← Warte auf nächste Aktion
    execute_minecraft_command(...)          # ← Kein Problem, wenn es dauert

Die Queue-Architektur visualisiert

TIKTOK-VERBINDUNG
  (sehr schnell, darf nicht blocken)
        ↓
  Event-Handler
  (auch schnell!)
        ↓
  Trigger-Queue
  (Warteschlange)
   [GIFT_ROSE]
   [FOLLOW]
   [LIKE_GOAL_100]
   [GIFT_DIAMOND]
        ↓
  Worker-Thread
  (kann auch langsam sein)
        ↓
  Minecraft-Befehle
  (können lange dauern)

Der Vorteil: Die TikTok-Verbindung wird nie blockiert, egal wie überlastet der Worker-Thread ist!


Queue Operations: put, get, put_nowait

import queue
from threading import Thread

trigger_queue = queue.Queue(maxsize=1000)

# Operation 1: PUT (mit Warten)
trigger_queue.put((target, username))  
# Wenn Queue voll: Warte bis Platz frei wird

# Operation 2: PUT_NOWAIT (ohne Warten)
trigger_queue.put_nowait((target, username))
# Wenn Queue voll: Exception (QueueFull)
# → Das ist gut! Es wir ntuns, wenn was schiefläuft

# Operation 3: GET (mit Warten)
item = trigger_queue.get()
# Wenn Queue leer: Warte bis Item kommt
# → BLOCKT den Worker-Thread, bis etwas zu tun ist

# Operation 4: GET_NOWAIT (ohne Warten)
try:
    item = trigger_queue.get_nowait()
except queue.Empty:
    # Queue war leer, mach was anderes

call_soon_threadsafe: Thread-sichere Aufrufe

In unserem Streaming-Tool verwenden wir call_soon_threadsafe statt normalem put:

# Normaler put() - unsicher wenn MainLoop aktiv:
trigger_queue.put_nowait((target, username))  # Könnte Race Condition sein

# Besser: call_soon_threadsafe
MAIN_LOOP.call_soon_threadsafe(
    trigger_queue.put_nowait,
    (target, username)
)  # ✓ Thread-sicher!

Warum? call_soon_threadsafe sorgt dafür, dass die Operation im MainLoop-Thread ausgeführt wird, nicht im Event-Handler-Thread. Das vermeidet Race Conditions!


Race Conditions und Locks (nochmal wiederholt)

Eine Race Condition tritt auf, wenn zwei Threads gleichzeitig auf die gleiche Daten zugreifen:

# Race Condition:
counter = 0

Thread 1: counter = counter + 1  # Liest 0, schreibt 1
         ↓ (interrupt!)
Thread 2: counter = counter + 1  # Liest 0, schreibt 1
         
RESULT: counter = 1 (sollte aber 2 sein!)

# ✓ Mit Lock:
counter = 0
lock = threading.Lock()

Thread 1: with lock:            # Sperrt Lock
           counter = counter + 1  # Liest 0, schreibt 1
           # Lock freigegeben
         ↓
Thread 2: with lock:            # Wartet auf Lock
           counter = counter + 1  # Liest 1, schreibt 2
           # Lock freigegeben
           
RESULT: counter = 2 ✓

Pattern: Immer with threading.Lock() für kritische Daten verwenden!


Praktisches Beispiel: Worker-Thread Implementation

Der Worker-Thread liest Events aus der Queue und verarbeitet sie:

import threading
import queue

trigger_queue = queue.Queue()

def worker_thread():
    """Dieser Thread verarbeitet Trigger aus der Queue"""
    while True:
        try:
            # Warte auf nächste Aktion
            target, username = trigger_queue.get(timeout=1)
            
            # Verarbeite Aktion
            logger.info(f"Verarbeite: {target} für {username}")
            
            try:
                execute_trigger(target, username)
            except Exception as e:
                logger.error(f"Fehler bei Trigger {target}: {e}")
            
            # Markiere als "done"
            trigger_queue.task_done()
            
        except queue.Empty:
            # Timeout: Nichts in der Queue, weitermachen
            continue
        except Exception as e:
            logger.error(f"Worker-Thread Fehler: {e}")

# Starte Worker-Thread (als Daemon, läuft im Hintergrund)
worker = threading.Thread(target=worker_thread, daemon=True)
worker.start()

Overlay-Updates: Ein praktisches Anwendungsbeispiel

Overlay-Updates für Like-Counter verwenden auch die Queue:

# Separate Queue für Overlay-Updates
like_queue = queue.Queue()

@client.on(LikeEvent)
def on_like(event):
    global start_likes, last_overlay_sent, last_overlay_time
    
    if start_likes is None:
        start_likes = event.total
        return
    
    # Berechne neue Likes
    delta = event.total - start_likes
    
    # Sende Update an Overlay (aber nicht zu oft!)
    now = time.time()
    if delta > 0 and (now - last_overlay_time) >= 0.5:  # Max. 2x pro Sekunde
        try:
            MAIN_LOOP.call_soon_threadsafe(
                like_queue.put_nowait,
                delta  # Nur die Differenz senden
            )
            last_overlay_sent = delta
            last_overlay_time = now
        except queue.Full:
            logger.warning("Like-Queue ist voll, Update übersprungen")

Das ist wichtig: Nicht jedes Overlay-Update senden! Mit OVERLAY_INTERVAL (z.B. 0.5 Sekunden) begrenzen wir die Updates. Das spart Bandbreite!


Timing & Throttling: Events nicht zu schnell kommen lassen

Manchmal kommen Events SO schnell an, dass wir sie drosseln (throttle) müssen:

import time

last_event_time = 0
THROTTLE_INTERVAL = 0.1  # Mindestens 100ms zwischen Events

@client.on(LikeEvent)
def on_like(event):
    global last_event_time
    
    # Ignoriere Events die zu dicht beieinander liegen
    now = time.time()
    if now - last_event_time < THROTTLE_INTERVAL:
        return  # Zu schnell! Skippen.
    
    last_event_time = now
    
    # ... rest des Event-Handlers ...

Warum? Wenn Like-Events alle 50ms ankommen , können wir sie nicht alle verarbeiten. Mit Throttling verlanagsamen wir gezielt die ausführung.


Finale Anmerkung

Das Wichtigste zu verstehen:

Events sind nicht direkt = Aktion.

Stattdessen:

TikTok-Event → Handler → Queue → Worker-Thread → Aktion
               (schnell)   (Puffer)    (kann langsam sein)

Das macht das System:

  • ✓ Stabil (Events gehen nicht verloren)
  • ✓ Skalierbar (viele Events gleichzeitig)
  • ✓ Wartbar (Aktion-Logik ist getrennt)