Home ›
Blog › Python Async/Await mit Claude Code
Python
asyncio
async/await
aiohttp
httpx
asyncpg
Claude Code
Concurrent Programming
Synchrones Python ist einfach zu lesen, aber sobald dein Programm auf I/O wartet — HTTP-Requests,
Datenbankabfragen, Dateizugriffe — verliert es wertvolle CPU-Zeit. asyncio löst
dieses Problem elegant: Statt Threads oder Multiprocessing koordiniert ein einziger Event Loop
Tausende gleichzeitiger Aufgaben mit minimalem Overhead.
In diesem Guide erklären wir Python Async/Await von Grund auf. Claude Code unterstützt dich dabei,
asynchronen Code zu schreiben, Fehler zu debuggen und Muster wie Producer/Consumer oder
Rate-Limited Crawling umzusetzen — auch wenn du gerade erst anfängst.
1. asyncio Grundlagen: Event Loop, async def, await
Der Kern von Python Async/Await ist der Event Loop. Er ist ein single-threaded
Scheduler, der Coroutinen nacheinander ausführt — aber immer dann, wenn eine Coroutine auf I/O
wartet, übergibt sie die Kontrolle zurück, sodass eine andere Coroutine weiterarbeiten kann.
asyncio
Event Loop Konzept
Stell dir den Event Loop wie einen Dirigenten vor: Er gibt jeder Coroutine reihum das Wort. Sobald eine Coroutine "Ich warte auf den Server" sagt (await), springt der Dirigent zur nächsten Coroutine — Idle-Zeit wird auf null reduziert.
async def: Coroutinen definieren
Jede Funktion, die mit async def definiert ist, wird zur Coroutine.
Aufrufen allein führt sie nicht aus — du erhältst ein Coroutine-Objekt. Erst await
oder asyncio.run() startet die Ausführung.
import asyncio
# Einfachste Coroutine
async def greet(name: str) -> str:
await asyncio.sleep(0.1) # Simuliert I/O-Warten
return f"Hallo, {name}!"
# FALSCH: greet("Claude") → gibt nur ein Coroutine-Objekt zurück
# RICHTIG: await in einer anderen Coroutine
async def main():
result = await greet("Claude")
print(result) # Hallo, Claude!
# Einstiegspunkt: asyncio.run() startet den Event Loop
if __name__ == "__main__":
asyncio.run(main())
Coroutinen vs. Threads: Der entscheidende Unterschied
| Merkmal |
Threads (threading) |
Coroutinen (asyncio) |
| Overhead pro Unit | ~1–8 MB RAM | ~2–4 KB RAM |
| Parallelität | Präemptiv (OS-gesteuert) | Kooperativ (await-Punkte) |
| Race Conditions | Möglich (GIL hilft wenig) | Selten (single-threaded) |
| Geeignet für | CPU-intensive Arbeit | I/O-intensive Arbeit |
| Skalierung | Hunderte Threads | Zehntausende Coroutinen |
| Debugging | Komplex, nicht-deterministisch | Einfacher, nachvollziehbar |
Claude Code Tipp: Frag Claude Code: "Erkläre mir den Unterschied zwischen asyncio und threading an einem konkreten Beispiel mit HTTP-Requests." — Claude generiert dir einen direkten Vergleich mit Benchmarks.
asyncio.run() und der Event Loop
Seit Python 3.7 ist asyncio.run() der Standard-Einstiegspunkt. Er erstellt einen neuen
Event Loop, führt die übergebene Coroutine aus und schließt den Loop danach sauber.
import asyncio
import time
async def fetch_data(endpoint: str, delay: float):
print(f"→ Start: {endpoint}")
await asyncio.sleep(delay) # I/O simulieren
print(f"← Ende: {endpoint} (nach {delay}s)")
return {"endpoint": endpoint, "data": "[...]"}
async def main():
start = time.monotonic()
# Sequenziell: 0.3 + 0.5 + 0.2 = 1.0s
r1 = await fetch_data("/users", 0.3)
r2 = await fetch_data("/products", 0.5)
r3 = await fetch_data("/orders", 0.2)
elapsed = time.monotonic() - start
print(f"Sequenziell: {elapsed:.2f}s") # ~1.0s
asyncio.run(main())
Das Ergebnis: 1.0 Sekunden — weil jedes await wartet, bis die Coroutine abgeschlossen ist.
Im nächsten Abschnitt zeigen wir, wie asyncio.gather() das auf ~0.5s reduziert.
2. Tasks & Gather: Parallelität strukturieren
Um mehrere Coroutinen gleichzeitig laufen zu lassen, brauchen wir Tasks.
Ein Task ist eine Coroutine, die im Event Loop eingereiht wurde und eigenständig läuft —
ohne dass wir auf ihr Ende warten müssen.
gather
asyncio.create_task vs asyncio.gather
create_task() startet eine Coroutine sofort als Task im Hintergrund. gather() startet mehrere Coroutinen gleichzeitig und wartet, bis alle fertig sind — das Ergebnis ist eine Liste der Rückgabewerte.
asyncio.create_task: Tasks manuell steuern
import asyncio
async def fetch_data(endpoint: str, delay: float):
await asyncio.sleep(delay)
return {"endpoint": endpoint, "status": "ok"}
async def main():
# Tasks sofort starten — sie laufen im Hintergrund
task1 = asyncio.create_task(fetch_data("/users", 0.3))
task2 = asyncio.create_task(fetch_data("/products", 0.5))
task3 = asyncio.create_task(fetch_data("/orders", 0.2))
# Auf alle warten
r1 = await task1
r2 = await task2
r3 = await task3
print(r1, r2, r3)
# Laufzeit: ~0.5s statt 1.0s!
asyncio.run(main())
asyncio.gather: Der elegantere Weg
import asyncio
import time
async def fetch(url: str, delay: float):
await asyncio.sleep(delay)
return f"Response von {url}"
async def main():
start = time.monotonic()
# Alle gleichzeitig starten, alle Ergebnisse auf einmal
results = await asyncio.gather(
fetch("/api/users", 0.3),
fetch("/api/products", 0.5),
fetch("/api/orders", 0.2),
)
elapsed = time.monotonic() - start
print(f"Parallel: {elapsed:.2f}s") # ~0.5s
for r in results:
print(f" ✓ {r}")
asyncio.run(main())
Fehlerbehandlung mit gather(return_exceptions=True)
Standardmäßig wirft gather() beim ersten Fehler eine Exception und bricht ab.
Mit return_exceptions=True werden Exceptions als Ergebnisse zurückgegeben —
alle Tasks laufen durch.
async def risky_fetch(url: str):
if "bad" in url:
raise ValueError(f"Fehler bei {url}")
await asyncio.sleep(0.1)
return f"OK: {url}"
async def main():
results = await asyncio.gather(
risky_fetch("/api/good"),
risky_fetch("/api/bad"), # Wirft ValueError
risky_fetch("/api/also-good"),
return_exceptions=True # Alle Tasks laufen durch
)
for r in results:
if isinstance(r, Exception):
print(f" ✗ Fehler: {r}")
else:
print(f" ✓ {r}")
asyncio.run(main())
TaskGroup (Python 3.11+): Strukturierte Parallelität
asyncio.TaskGroup ist der moderne Weg, Tasks zu verwalten. Er garantiert,
dass alle Tasks abgewartet werden — auch bei Fehlern. Exceptions werden gesammelt und
als ExceptionGroup weitergeworfen.
import asyncio
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("/users", 0.3))
task2 = tg.create_task(fetch_data("/products", 0.5))
task3 = tg.create_task(fetch_data("/orders", 0.2))
# Hier sind alle Tasks garantiert fertig (oder Exception geworfen)
print(task1.result())
print(task2.result())
print(task3.result())
Task Cancellation
async def long_running():
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
print("Task wurde abgebrochen — Cleanup...")
raise # WICHTIG: CancelledError immer weiterwerfen!
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(2) # 2s warten
task.cancel() # Task abbrechen
try:
await task
except asyncio.CancelledError:
print("Task erfolgreich abgebrochen")
Achtung: Fange asyncio.CancelledError nur ab, um Cleanup zu machen — und wirf sie danach immer weiter. Schluckst du die Exception, bleibt der Task scheinbar "hängen" und der Event Loop kann nicht sauber beenden.
3. asyncio.Queue & Producer/Consumer Pattern
Wenn Produzenten und Konsumenten unterschiedlich schnell sind, brauchen wir eine Puffer-Strategie.
asyncio.Queue ist die Thread-sichere (und Coroutinen-sichere) Lösung: Produzenten
legen Elemente hinein, Konsumenten nehmen sie heraus — ohne manuelles Locking.
Queue
Wann asyncio.Queue?
- Web-Scraping: URL-Queue mit mehreren Download-Workers
- Event-Processing: Eingehende Events puffern, sequenziell verarbeiten
- Pipeline-Pattern: Mehrere Verarbeitungsstufen hintereinander
- Rate-Limiting: Backpressure durch begrenzte Queue-Größe
Grundstruktur: Producer + Consumer
import asyncio
import random
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await asyncio.sleep(random.uniform(0.05, 0.2))
await queue.put(item)
print(f" [Producer] Eingereiht: {item}")
print(" [Producer] Fertig!")
async def consumer(queue: asyncio.Queue, worker_id: int):
while True:
item = await queue.get() # Blockiert bis Item verfügbar
try:
await asyncio.sleep(random.uniform(0.1, 0.4))
print(f" [Worker-{worker_id}] Verarbeitet: {item}")
finally:
queue.task_done() # PFLICHT: Queue mitteilen dass Item erledigt
async def main():
queue = asyncio.Queue(maxsize=5) # maxsize=5 → Backpressure
urls = [f"/api/item/{i}" for i in range(10)]
# 3 parallele Consumer starten
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# Producer läuft
await producer(queue, urls)
# Warten bis alle Items verarbeitet
await queue.join()
# Consumer beenden
for c in consumers:
c.cancel()
asyncio.run(main())
Queue-Typen: FIFO, LIFO, Priority
# Standard FIFO Queue (First In, First Out)
q_fifo = asyncio.Queue()
# LIFO Queue (Stack — Last In, First Out)
q_lifo = asyncio.LifoQueue()
# Priority Queue (kleinste Zahl = höchste Priorität)
q_prio = asyncio.PriorityQueue()
await q_prio.put((3, "niedrig"))
await q_prio.put((1, "kritisch"))
await q_prio.put((2, "normal"))
prio, item = await q_prio.get() # → (1, "kritisch")
Backpressure mit maxsize
Wenn du maxsize setzt, blockiert queue.put() automatisch, sobald die Queue
voll ist — der Producer wartet, bis der Consumer Platz schafft. Das ist eingebautes Backpressure
ohne zusätzlichen Code.
# maxsize=0 → unbegrenzt (Gefahr: Speicherüberlauf!)
q_unbounded = asyncio.Queue()
# maxsize=10 → Producer wartet wenn Queue voll
q_bounded = asyncio.Queue(maxsize=10)
# put_nowait() wirft QueueFull statt zu blockieren
try:
q_bounded.put_nowait("item")
except asyncio.QueueFull:
print("Queue voll — Item verworfen oder umgeleitet")
Claude Code Workflow: Beschreibe dein Producer/Consumer-Problem in natürlicher Sprache. Claude Code generiert die Queue-Struktur, schlägt die richtige Anzahl Worker vor und warnt dich vor typischen Fallen wie dem Vergessen von task_done().
4. aiohttp & httpx: Async HTTP-Requests
Das häufigste I/O in Python-Projekten sind HTTP-Requests. Die synchrone requests-Bibliothek
blockiert den gesamten Thread — für Async-Code brauchen wir aiohttp oder httpx.
aiohttp
aiohttp vs httpx: Wann was?
| Kriterium | aiohttp | httpx |
| Async-First | ✅ Vollständig async | ✅ Async + Sync API |
| requests-kompatibel | ❌ Andere API | ✅ Drop-in für requests |
| HTTP/2 | ❌ | ✅ |
| WebSocket | ✅ Native | ❌ |
| Performance (HTTP/1.1) | Sehr hoch | Hoch |
| Beliebtheit | Sehr verbreitet | Wachsend |
aiohttp: ClientSession korrekt nutzen
Wichtigste Regel: Erstelle eine ClientSession pro Anwendung —
nicht pro Request. Sessions wiederverwendet Verbindungen (Connection Pooling) und sind deutlich effizienter.
import asyncio
import aiohttp
async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
]
# Session EINMAL erstellen, alle Requests darüber
timeout = aiohttp.ClientTimeout(total=30, connect=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = [fetch_json(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f" ✗ {url}: {result}")
else:
print(f" ✓ {url}: {result.get('title', 'N/A')}")
asyncio.run(main())
httpx: Die moderne requests-Alternative
import asyncio
import httpx
async def parallel_requests(urls: list[str]) -> list:
async with httpx.AsyncClient(
timeout=httpx.Timeout(10.0, connect=3.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
http2=True, # HTTP/2 aktivieren
) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for url, resp in zip(urls, responses):
if isinstance(resp, Exception):
results.append({"url": url, "error": str(resp)})
else:
results.append({
"url": url,
"status": resp.status_code,
"size": len(resp.content),
})
return results
async def main():
urls = [f"https://httpbin.org/delay/0"] * 10
results = await parallel_requests(urls)
for r in results:
print(r)
asyncio.run(main())
Retry-Logik mit exponentialem Backoff
import asyncio
import aiohttp
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> dict:
for attempt in range(max_retries):
try:
async with session.get(url) as resp:
if resp.status == 429: # Rate Limited
retry_after = float(resp.headers.get("Retry-After", base_delay * (2 ** attempt)))
await asyncio.sleep(retry_after)
continue
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Versuch {attempt + 1} fehlgeschlagen, warte {delay}s...")
await asyncio.sleep(delay)
5. asyncpg & Datenbanken: Async SQL
Datenbankoperationen sind oft der größte I/O-Engpass. Mit asyncpg (PostgreSQL)
nutzt du die volle Power des async Event Loops für SQL-Queries — mit Connection Pools,
Prepared Statements und nativer PostgreSQL-Protocol-Unterstützung.
asyncpg
asyncpg Performance-Vorteile
asyncpg implementiert das PostgreSQL Binary Protocol direkt — ohne psycopg2-Overhead. Benchmarks zeigen bis zu 3x schnellere Queries bei hoher Parallelität. Der Connection Pool eliminiert Verbindungs-Overhead für jede Query.
asyncpg.connect: Einfache Verbindung
import asyncio
import asyncpg
async def main():
# Direkte Verbindung (für einfache Skripte)
conn = await asyncpg.connect(
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="secret",
)
try:
# Einzelner Record
user = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
42
)
print(f"User: {user['name']} ({user['email']})")
# Mehrere Records
users = await conn.fetch(
"SELECT * FROM users WHERE active = $1 LIMIT $2",
True, 100
)
for u in users:
print(dict(u))
# Einzelner Wert
count = await conn.fetchval("SELECT COUNT(*) FROM users")
print(f"Gesamt: {count} User")
# INSERT/UPDATE/DELETE
await conn.execute(
"UPDATE users SET last_seen = NOW() WHERE id = $1",
42
)
finally:
await conn.close()
asyncio.run(main())
Connection Pool: Der richtige Weg für Produktionscode
import asyncio
import asyncpg
# Globaler Pool (einmal erstellen, überall nutzen)
pool: asyncpg.Pool | None = None
async def init_db():
global pool
pool = await asyncpg.create_pool(
dsn="postgresql://postgres:secret@localhost/mydb",
min_size=5, # Mindestens 5 offene Verbindungen
max_size=20, # Maximal 20 Verbindungen
command_timeout=60,
)
async def get_user(user_id: int) -> dict | None:
async with pool.acquire() as conn: # Aus dem Pool ausleihen
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return dict(row) if row else None
async def main():
await init_db()
try:
# 50 parallele Queries
tasks = [get_user(i) for i in range(1, 51)]
users = await asyncio.gather(*tasks)
print(f"{len([u for u in users if u])} User gefunden")
finally:
await pool.close()
asyncio.run(main())
Transaktionen mit asyncpg
async def transfer_money(
from_id: int,
to_id: int,
amount: float
) -> bool:
async with pool.acquire() as conn:
async with conn.transaction(): # Automatisches COMMIT/ROLLBACK
# Beide Updates in einer Transaktion — atomar!
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_id
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_id
)
# Bei Exception → automatisches ROLLBACK
return True
SQLAlchemy 2.0 mit asyncio
Wer ORM-Features braucht, nutzt SQLAlchemy 2.0 mit dem asyncio-Extension. Das
AsyncSession-Objekt funktioniert wie eine normale Session, aber mit await.
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb",
pool_size=10,
echo=False,
)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_users_orm():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(User).where(User.active == True).limit(10)
)
return result.scalars().all()
6. Semaphore & Rate Limiting: Kontrolle behalten
asyncio.gather() ohne Begrenzung kann Server überlasten oder Rate-Limits triggern.
asyncio.Semaphore ist das Werkzeug, um gleichzeitige Operationen zu begrenzen —
ein eingebauter Concurrency-Limiter ohne externe Libraries.
Semaphore
Semaphore-Konzept
Ein Semaphore mit Wert N erlaubt genau N gleichzeitige Operationen. Der 101. Request wartet automatisch, bis einer der ersten 100 fertig ist — wie ein Türsteher der nur N Gäste gleichzeitig reinlässt.
Rate-Limited Web Crawler
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import NamedTuple
class CrawlResult(NamedTuple):
url: str
status: int
size: int
error: str | None = None
async def crawl_url(
session: aiohttp.ClientSession,
semaphore: asyncio.Semaphore,
url: str,
delay: float = 0.1 # Rate Limiting: 100ms zwischen Requests
) -> CrawlResult:
async with semaphore: # Slot belegen
try:
async with session.get(url, allow_redirects=True) as resp:
content = await resp.read()
await asyncio.sleep(delay) # Höfliche Pause
return CrawlResult(url, resp.status, len(content))
except Exception as e:
return CrawlResult(url, 0, 0, str(e))
async def crawl_all(urls: list[str], max_concurrent: int = 10) -> list[CrawlResult]:
semaphore = asyncio.Semaphore(max_concurrent) # Max 10 gleichzeitig
async with aiohttp.ClientSession(
headers={"User-Agent": "SpockyBot/1.0"},
timeout=aiohttp.ClientTimeout(total=30),
) as session:
tasks = [crawl_url(session, semaphore, url) for url in urls]
results = await asyncio.gather(*tasks)
return list(results)
async def main():
urls = [f"https://example.com/page/{i}" for i in range(100)]
results = await crawl_all(urls, max_concurrent=10)
ok = [r for r in results if r.status == 200]
errors = [r for r in results if r.error]
print(f"✓ Erfolgreich: {len(ok)}, ✗ Fehler: {len(errors)}")
asyncio.run(main())
Token-Bucket Rate Limiter
Für präzises Rate Limiting (z.B. "max 100 Requests pro Sekunde") implementierst du einen
Token-Bucket mit asyncio. Der Bucket füllt sich kontinuierlich auf und leert sich mit jedem Request.
import asyncio
import time
class AsyncTokenBucket:
def __init__(self, rate: float, capacity: float):
self.rate = rate # Tokens/Sekunde
self.capacity = capacity # Max Tokens
self.tokens = capacity # Start: voll
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: float = 1.0):
async with self._lock:
while True:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return
# Warten bis genug Tokens vorhanden
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait_time)
# Verwendung: max 10 Requests/Sekunde
async def main():
bucket = AsyncTokenBucket(rate=10, capacity=20)
async def rate_limited_fetch(url: str):
await bucket.acquire()
# ... eigentlicher Request
print(f"Fetching: {url}")
tasks = [rate_limited_fetch(f"/api/{i}") for i in range(50)]
await asyncio.gather(*tasks)
asyncio.Semaphore für Datenbankverbindungen
# DB-Pool hat 20 Verbindungen → max 20 gleichzeitige Queries
db_semaphore = asyncio.Semaphore(20)
async def safe_db_query(query: str, *args):
async with db_semaphore: # Sicherstellen: nie mehr als 20
async with pool.acquire() as conn:
return await conn.fetch(query, *args)
Faustregel: Semaphore-Wert = min(verfügbare Ressourcen, was der Server verträgt). Für externe APIs: Semaphore(10) als Startpunkt. Für lokale Datenbanken: Semaphore(pool_size - 2) (2 Verbindungen als Reserve).
Best Practices: Async Python 2026
- Eine Session pro Anwendung: aiohttp.ClientSession / httpx.AsyncClient niemals in Schleifen erstellen
- Immer Timeouts setzen: Ohne Timeout kann eine Coroutine ewig blockieren
- task_done() nicht vergessen: Bei Queue.get() immer danach task_done() aufrufen
- CancelledError weiterwerfen: Nur für Cleanup fangen, dann immer re-raise
- Semaphore nach außen ziehen: Einmal erstellen, an alle Coroutinen übergeben — nie lokal in der Coroutine
- asyncio.run() nur einmal: Nicht verschachteln — nur eine einzige asyncio.run()-Instanz pro Prozess
- Connection Pools nutzen: asyncpg.create_pool() statt asyncpg.connect() in Produktionscode
Async Python mit Claude Code schreiben
Claude Code hilft dir, komplexe asyncio-Patterns umzusetzen, Bugs zu finden und deinen
asynchronen Code zu optimieren — von Coroutinen bis zum produktionsreifen Rate-Limiter.
Starte kostenlos und erlebe, wie KI-Assisted Development deine Python-Produktivität transformiert.
Kostenlos testen — Trial starten
asyncio
async/await
aiohttp
httpx
asyncpg
Semaphore
Producer/Consumer
Rate Limiting
Python 3.12
Claude Code