Threading & Queues: Asynchronous Processing
Why Can't We Execute Events Directly?
Imagine if an event handler would execute things directly:
# WRONG – execute directly:
@client.on(GiftEvent)
def on_gift(event):
execute_minecraft_command(...) # ← Blocks!
wait_for_response(...) # ← Takes a long time!
update_overlay(...) # ← Even longer!
# Meanwhile: New events are piling up
The problem: While we wait for the Minecraft response, no new TikTok events can be processed. The TikTok connection "hangs" and we lose events!
The solution: Place events in a queue and process them asynchronously!
# ✓ CORRECT – place in queue:
@client.on(GiftEvent)
def on_gift(event):
trigger_queue.put_nowait((target, username)) # ← Very fast!
# Done! Event handler returns immediately
# Another thread processes the queue:
while True:
target, username = trigger_queue.get() # ← Wait for next action
execute_minecraft_command(...) # ← No problem if it takes time
The Queue Architecture Visualized
TIKTOK CONNECTION
(very fast, must not block)
↓
Event handler
(also fast!)
↓
Trigger queue
(buffer)
[GIFT_ROSE]
[FOLLOW]
[LIKE_GOAL_100]
[GIFT_DIAMOND]
↓
Worker thread
(can also be slow)
↓
Minecraft commands
(can take a long time)
The advantage: The TikTok connection is never blocked, no matter how overloaded the worker thread is!
Queue Operations: put, get, put_nowait
import queue
from threading import Thread
trigger_queue = queue.Queue(maxsize=1000)
# Operation 1: PUT (with waiting)
trigger_queue.put((target, username))
# If queue is full: Wait until space becomes free
# Operation 2: PUT_NOWAIT (without waiting)
trigger_queue.put_nowait((target, username))
# If queue is full: Exception (QueueFull)
# → That's fine! We catch it if something goes wrong
# Operation 3: GET (with waiting)
item = trigger_queue.get()
# If queue is empty: Wait until item arrives
# → BLOCKS the worker thread until something needs to be done
# Operation 4: GET_NOWAIT (without waiting)
try:
item = trigger_queue.get_nowait()
except queue.Empty:
# Queue was empty, do something else
call_soon_threadsafe: Thread-Safe Calls
In our streaming tool we use call_soon_threadsafe instead of normal put:
# Normal put() – unsafe if MainLoop is active:
trigger_queue.put_nowait((target, username)) # Could cause a race condition
# Better: call_soon_threadsafe
MAIN_LOOP.call_soon_threadsafe(
trigger_queue.put_nowait,
(target, username)
) # ✓ Thread-safe!
Why? call_soon_threadsafe ensures that the operation is executed in the MainLoop thread, not in the event handler thread. This avoids race conditions!
Race Conditions and Locks (Recap)
A race condition occurs when two threads access the same data at the same time:
# Race condition:
counter = 0
Thread 1: counter = counter + 1 # Reads 0, writes 1
↓ (interrupt!)
Thread 2: counter = counter + 1 # Reads 0, writes 1
RESULT: counter = 1 (but should be 2!)
# ✓ With lock:
counter = 0
lock = threading.Lock()
Thread 1: with lock: # Acquires lock
counter = counter + 1 # Reads 0, writes 1
# Lock released
↓
Thread 2: with lock: # Waits for lock
counter = counter + 1 # Reads 1, writes 2
# Lock released
RESULT: counter = 2 ✓
Pattern: Always use with threading.Lock() for critical data!
Practical Example: Worker Thread Implementation
The worker thread reads events from the queue and processes them:
import threading
import queue
trigger_queue = queue.Queue()
def worker_thread():
"""This thread processes triggers from the queue"""
while True:
try:
# Wait for next action
target, username = trigger_queue.get(timeout=1)
# Process action
logger.info(f"Processing: {target} for {username}")
try:
execute_trigger(target, username)
except Exception as e:
logger.error(f"Error executing trigger {target}: {e}")
# Mark as "done"
trigger_queue.task_done()
except queue.Empty:
# Timeout: Nothing in queue, continue
continue
except Exception as e:
logger.error(f"Worker thread error: {e}")
# Start worker thread (as daemon, runs in background)
worker = threading.Thread(target=worker_thread, daemon=True)
worker.start()
Overlay Updates: A Practical Use Case
Overlay updates for like counters also use the queue:
# Separate queue for overlay updates
like_queue = queue.Queue()
@client.on(LikeEvent)
def on_like(event):
global start_likes, last_overlay_sent, last_overlay_time
if start_likes is None:
start_likes = event.total
return
# Calculate new likes
delta = event.total - start_likes
# Send update to overlay (but not too often!)
now = time.time()
if delta > 0 and (now - last_overlay_time) >= 0.5: # Max 2x per second
try:
MAIN_LOOP.call_soon_threadsafe(
like_queue.put_nowait,
delta # Only send the difference
)
last_overlay_sent = delta
last_overlay_time = now
except queue.Full:
logger.warning("Like queue is full, update skipped")
This is important: Don't send every overlay update! With OVERLAY_INTERVAL (e.g. 0.5 seconds) we limit the updates. This saves bandwidth!
Timing & Throttling: Don't Let Events Come Too Quickly
Sometimes events arrive SO quickly that we have to throttle them:
import time
last_event_time = 0
THROTTLE_INTERVAL = 0.1 # At least 100ms between events
@client.on(LikeEvent)
def on_like(event):
global last_event_time
# Ignore events that are too close together
now = time.time()
if now - last_event_time < THROTTLE_INTERVAL:
return # Too fast! Skip.
last_event_time = now
# ... rest of the event handler ...
Why? If like events arrive every 50ms, we can't process them all. With throttling we deliberately slow down processing.
Final Note
The most important thing to understand:
Events are not directly = action.
Instead:
TikTok event → handler → queue → worker thread → action
(fast) (buffer) (can be slow)
This makes the system:
- ✓ Stable (events are not lost)
- ✓ Scalable (many events at the same time)
- ✓ Maintainable (action logic is separated)