Python & Async

Python Async/Await mit Claude Code:
Concurrent Programming 2026

asyncio von Grund auf: async/await, Tasks, Gather, Queue, aiohttp, httpx, asyncpg und Semaphore — praxisnah erklärt mit Claude Code an der Seite.

📅 6. Mai 2026 ⏱ 11 min Lesezeit 🐍 Python 3.12+
Python asyncio async/await aiohttp httpx asyncpg Claude Code Concurrent Programming

Inhalt

  1. asyncio Grundlagen: Event Loop, async def, await
  2. Tasks & Gather: Parallelität strukturieren
  3. asyncio.Queue & Producer/Consumer Pattern
  4. aiohttp & httpx: Async HTTP-Requests
  5. asyncpg & Datenbanken: Async SQL
  6. Semaphore & Rate Limiting: Kontrolle behalten

Synchrones Python ist einfach zu lesen, aber sobald dein Programm auf I/O wartet — HTTP-Requests, Datenbankabfragen, Dateizugriffe — verliert es wertvolle CPU-Zeit. asyncio löst dieses Problem elegant: Statt Threads oder Multiprocessing koordiniert ein einziger Event Loop Tausende gleichzeitiger Aufgaben mit minimalem Overhead.

In diesem Guide erklären wir Python Async/Await von Grund auf. Claude Code unterstützt dich dabei, asynchronen Code zu schreiben, Fehler zu debuggen und Muster wie Producer/Consumer oder Rate-Limited Crawling umzusetzen — auch wenn du gerade erst anfängst.

1. asyncio Grundlagen: Event Loop, async def, await

Der Kern von Python Async/Await ist der Event Loop. Er ist ein single-threaded Scheduler, der Coroutinen nacheinander ausführt — aber immer dann, wenn eine Coroutine auf I/O wartet, übergibt sie die Kontrolle zurück, sodass eine andere Coroutine weiterarbeiten kann.

asyncio

Event Loop Konzept

Stell dir den Event Loop wie einen Dirigenten vor: Er gibt jeder Coroutine reihum das Wort. Sobald eine Coroutine "Ich warte auf den Server" sagt (await), springt der Dirigent zur nächsten Coroutine — Idle-Zeit wird auf null reduziert.

async def: Coroutinen definieren

Jede Funktion, die mit async def definiert ist, wird zur Coroutine. Aufrufen allein führt sie nicht aus — du erhältst ein Coroutine-Objekt. Erst await oder asyncio.run() startet die Ausführung.

import asyncio # Einfachste Coroutine async def greet(name: str) -> str: await asyncio.sleep(0.1) # Simuliert I/O-Warten return f"Hallo, {name}!" # FALSCH: greet("Claude") → gibt nur ein Coroutine-Objekt zurück # RICHTIG: await in einer anderen Coroutine async def main(): result = await greet("Claude") print(result) # Hallo, Claude! # Einstiegspunkt: asyncio.run() startet den Event Loop if __name__ == "__main__": asyncio.run(main())

Coroutinen vs. Threads: Der entscheidende Unterschied

Merkmal Threads (threading) Coroutinen (asyncio)
Overhead pro Unit~1–8 MB RAM~2–4 KB RAM
ParallelitätPräemptiv (OS-gesteuert)Kooperativ (await-Punkte)
Race ConditionsMöglich (GIL hilft wenig)Selten (single-threaded)
Geeignet fürCPU-intensive ArbeitI/O-intensive Arbeit
SkalierungHunderte ThreadsZehntausende Coroutinen
DebuggingKomplex, nicht-deterministischEinfacher, nachvollziehbar
Claude Code Tipp: Frag Claude Code: "Erkläre mir den Unterschied zwischen asyncio und threading an einem konkreten Beispiel mit HTTP-Requests." — Claude generiert dir einen direkten Vergleich mit Benchmarks.

asyncio.run() und der Event Loop

Seit Python 3.7 ist asyncio.run() der Standard-Einstiegspunkt. Er erstellt einen neuen Event Loop, führt die übergebene Coroutine aus und schließt den Loop danach sauber.

import asyncio import time async def fetch_data(endpoint: str, delay: float): print(f"→ Start: {endpoint}") await asyncio.sleep(delay) # I/O simulieren print(f"← Ende: {endpoint} (nach {delay}s)") return {"endpoint": endpoint, "data": "[...]"} async def main(): start = time.monotonic() # Sequenziell: 0.3 + 0.5 + 0.2 = 1.0s r1 = await fetch_data("/users", 0.3) r2 = await fetch_data("/products", 0.5) r3 = await fetch_data("/orders", 0.2) elapsed = time.monotonic() - start print(f"Sequenziell: {elapsed:.2f}s") # ~1.0s asyncio.run(main())

Das Ergebnis: 1.0 Sekunden — weil jedes await wartet, bis die Coroutine abgeschlossen ist. Im nächsten Abschnitt zeigen wir, wie asyncio.gather() das auf ~0.5s reduziert.

2. Tasks & Gather: Parallelität strukturieren

Um mehrere Coroutinen gleichzeitig laufen zu lassen, brauchen wir Tasks. Ein Task ist eine Coroutine, die im Event Loop eingereiht wurde und eigenständig läuft — ohne dass wir auf ihr Ende warten müssen.

gather

asyncio.create_task vs asyncio.gather

create_task() startet eine Coroutine sofort als Task im Hintergrund. gather() startet mehrere Coroutinen gleichzeitig und wartet, bis alle fertig sind — das Ergebnis ist eine Liste der Rückgabewerte.

asyncio.create_task: Tasks manuell steuern

import asyncio async def fetch_data(endpoint: str, delay: float): await asyncio.sleep(delay) return {"endpoint": endpoint, "status": "ok"} async def main(): # Tasks sofort starten — sie laufen im Hintergrund task1 = asyncio.create_task(fetch_data("/users", 0.3)) task2 = asyncio.create_task(fetch_data("/products", 0.5)) task3 = asyncio.create_task(fetch_data("/orders", 0.2)) # Auf alle warten r1 = await task1 r2 = await task2 r3 = await task3 print(r1, r2, r3) # Laufzeit: ~0.5s statt 1.0s! asyncio.run(main())

asyncio.gather: Der elegantere Weg

import asyncio import time async def fetch(url: str, delay: float): await asyncio.sleep(delay) return f"Response von {url}" async def main(): start = time.monotonic() # Alle gleichzeitig starten, alle Ergebnisse auf einmal results = await asyncio.gather( fetch("/api/users", 0.3), fetch("/api/products", 0.5), fetch("/api/orders", 0.2), ) elapsed = time.monotonic() - start print(f"Parallel: {elapsed:.2f}s") # ~0.5s for r in results: print(f" ✓ {r}") asyncio.run(main())

Fehlerbehandlung mit gather(return_exceptions=True)

Standardmäßig wirft gather() beim ersten Fehler eine Exception und bricht ab. Mit return_exceptions=True werden Exceptions als Ergebnisse zurückgegeben — alle Tasks laufen durch.

async def risky_fetch(url: str): if "bad" in url: raise ValueError(f"Fehler bei {url}") await asyncio.sleep(0.1) return f"OK: {url}" async def main(): results = await asyncio.gather( risky_fetch("/api/good"), risky_fetch("/api/bad"), # Wirft ValueError risky_fetch("/api/also-good"), return_exceptions=True # Alle Tasks laufen durch ) for r in results: if isinstance(r, Exception): print(f" ✗ Fehler: {r}") else: print(f" ✓ {r}") asyncio.run(main())

TaskGroup (Python 3.11+): Strukturierte Parallelität

asyncio.TaskGroup ist der moderne Weg, Tasks zu verwalten. Er garantiert, dass alle Tasks abgewartet werden — auch bei Fehlern. Exceptions werden gesammelt und als ExceptionGroup weitergeworfen.

import asyncio async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(fetch_data("/users", 0.3)) task2 = tg.create_task(fetch_data("/products", 0.5)) task3 = tg.create_task(fetch_data("/orders", 0.2)) # Hier sind alle Tasks garantiert fertig (oder Exception geworfen) print(task1.result()) print(task2.result()) print(task3.result())

Task Cancellation

async def long_running(): try: await asyncio.sleep(60) except asyncio.CancelledError: print("Task wurde abgebrochen — Cleanup...") raise # WICHTIG: CancelledError immer weiterwerfen! async def main(): task = asyncio.create_task(long_running()) await asyncio.sleep(2) # 2s warten task.cancel() # Task abbrechen try: await task except asyncio.CancelledError: print("Task erfolgreich abgebrochen")
Achtung: Fange asyncio.CancelledError nur ab, um Cleanup zu machen — und wirf sie danach immer weiter. Schluckst du die Exception, bleibt der Task scheinbar "hängen" und der Event Loop kann nicht sauber beenden.

3. asyncio.Queue & Producer/Consumer Pattern

Wenn Produzenten und Konsumenten unterschiedlich schnell sind, brauchen wir eine Puffer-Strategie. asyncio.Queue ist die Thread-sichere (und Coroutinen-sichere) Lösung: Produzenten legen Elemente hinein, Konsumenten nehmen sie heraus — ohne manuelles Locking.

Queue

Wann asyncio.Queue?

Grundstruktur: Producer + Consumer

import asyncio import random async def producer(queue: asyncio.Queue, items: list): for item in items: await asyncio.sleep(random.uniform(0.05, 0.2)) await queue.put(item) print(f" [Producer] Eingereiht: {item}") print(" [Producer] Fertig!") async def consumer(queue: asyncio.Queue, worker_id: int): while True: item = await queue.get() # Blockiert bis Item verfügbar try: await asyncio.sleep(random.uniform(0.1, 0.4)) print(f" [Worker-{worker_id}] Verarbeitet: {item}") finally: queue.task_done() # PFLICHT: Queue mitteilen dass Item erledigt async def main(): queue = asyncio.Queue(maxsize=5) # maxsize=5 → Backpressure urls = [f"/api/item/{i}" for i in range(10)] # 3 parallele Consumer starten consumers = [ asyncio.create_task(consumer(queue, i)) for i in range(3) ] # Producer läuft await producer(queue, urls) # Warten bis alle Items verarbeitet await queue.join() # Consumer beenden for c in consumers: c.cancel() asyncio.run(main())

Queue-Typen: FIFO, LIFO, Priority

# Standard FIFO Queue (First In, First Out) q_fifo = asyncio.Queue() # LIFO Queue (Stack — Last In, First Out) q_lifo = asyncio.LifoQueue() # Priority Queue (kleinste Zahl = höchste Priorität) q_prio = asyncio.PriorityQueue() await q_prio.put((3, "niedrig")) await q_prio.put((1, "kritisch")) await q_prio.put((2, "normal")) prio, item = await q_prio.get() # → (1, "kritisch")

Backpressure mit maxsize

Wenn du maxsize setzt, blockiert queue.put() automatisch, sobald die Queue voll ist — der Producer wartet, bis der Consumer Platz schafft. Das ist eingebautes Backpressure ohne zusätzlichen Code.

# maxsize=0 → unbegrenzt (Gefahr: Speicherüberlauf!) q_unbounded = asyncio.Queue() # maxsize=10 → Producer wartet wenn Queue voll q_bounded = asyncio.Queue(maxsize=10) # put_nowait() wirft QueueFull statt zu blockieren try: q_bounded.put_nowait("item") except asyncio.QueueFull: print("Queue voll — Item verworfen oder umgeleitet")
Claude Code Workflow: Beschreibe dein Producer/Consumer-Problem in natürlicher Sprache. Claude Code generiert die Queue-Struktur, schlägt die richtige Anzahl Worker vor und warnt dich vor typischen Fallen wie dem Vergessen von task_done().

4. aiohttp & httpx: Async HTTP-Requests

Das häufigste I/O in Python-Projekten sind HTTP-Requests. Die synchrone requests-Bibliothek blockiert den gesamten Thread — für Async-Code brauchen wir aiohttp oder httpx.

aiohttp

aiohttp vs httpx: Wann was?

Kriteriumaiohttphttpx
Async-First✅ Vollständig async✅ Async + Sync API
requests-kompatibel❌ Andere API✅ Drop-in für requests
HTTP/2
WebSocket✅ Native
Performance (HTTP/1.1)Sehr hochHoch
BeliebtheitSehr verbreitetWachsend

aiohttp: ClientSession korrekt nutzen

Wichtigste Regel: Erstelle eine ClientSession pro Anwendung — nicht pro Request. Sessions wiederverwendet Verbindungen (Connection Pooling) und sind deutlich effizienter.

import asyncio import aiohttp async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict: async with session.get(url) as response: response.raise_for_status() return await response.json() async def main(): urls = [ "https://jsonplaceholder.typicode.com/posts/1", "https://jsonplaceholder.typicode.com/posts/2", "https://jsonplaceholder.typicode.com/posts/3", ] # Session EINMAL erstellen, alle Requests darüber timeout = aiohttp.ClientTimeout(total=30, connect=5) async with aiohttp.ClientSession(timeout=timeout) as session: tasks = [fetch_json(session, url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) for url, result in zip(urls, results): if isinstance(result, Exception): print(f" ✗ {url}: {result}") else: print(f" ✓ {url}: {result.get('title', 'N/A')}") asyncio.run(main())

httpx: Die moderne requests-Alternative

import asyncio import httpx async def parallel_requests(urls: list[str]) -> list: async with httpx.AsyncClient( timeout=httpx.Timeout(10.0, connect=3.0), limits=httpx.Limits(max_connections=100, max_keepalive_connections=20), http2=True, # HTTP/2 aktivieren ) as client: tasks = [client.get(url) for url in urls] responses = await asyncio.gather(*tasks, return_exceptions=True) results = [] for url, resp in zip(urls, responses): if isinstance(resp, Exception): results.append({"url": url, "error": str(resp)}) else: results.append({ "url": url, "status": resp.status_code, "size": len(resp.content), }) return results async def main(): urls = [f"https://httpbin.org/delay/0"] * 10 results = await parallel_requests(urls) for r in results: print(r) asyncio.run(main())

Retry-Logik mit exponentialem Backoff

import asyncio import aiohttp async def fetch_with_retry( session: aiohttp.ClientSession, url: str, max_retries: int = 3, base_delay: float = 1.0 ) -> dict: for attempt in range(max_retries): try: async with session.get(url) as resp: if resp.status == 429: # Rate Limited retry_after = float(resp.headers.get("Retry-After", base_delay * (2 ** attempt))) await asyncio.sleep(retry_after) continue resp.raise_for_status() return await resp.json() except aiohttp.ClientError as e: if attempt == max_retries - 1: raise delay = base_delay * (2 ** attempt) print(f"Versuch {attempt + 1} fehlgeschlagen, warte {delay}s...") await asyncio.sleep(delay)

5. asyncpg & Datenbanken: Async SQL

Datenbankoperationen sind oft der größte I/O-Engpass. Mit asyncpg (PostgreSQL) nutzt du die volle Power des async Event Loops für SQL-Queries — mit Connection Pools, Prepared Statements und nativer PostgreSQL-Protocol-Unterstützung.

asyncpg

asyncpg Performance-Vorteile

asyncpg implementiert das PostgreSQL Binary Protocol direkt — ohne psycopg2-Overhead. Benchmarks zeigen bis zu 3x schnellere Queries bei hoher Parallelität. Der Connection Pool eliminiert Verbindungs-Overhead für jede Query.

asyncpg.connect: Einfache Verbindung

import asyncio import asyncpg async def main(): # Direkte Verbindung (für einfache Skripte) conn = await asyncpg.connect( host="localhost", port=5432, database="mydb", user="postgres", password="secret", ) try: # Einzelner Record user = await conn.fetchrow( "SELECT id, name, email FROM users WHERE id = $1", 42 ) print(f"User: {user['name']} ({user['email']})") # Mehrere Records users = await conn.fetch( "SELECT * FROM users WHERE active = $1 LIMIT $2", True, 100 ) for u in users: print(dict(u)) # Einzelner Wert count = await conn.fetchval("SELECT COUNT(*) FROM users") print(f"Gesamt: {count} User") # INSERT/UPDATE/DELETE await conn.execute( "UPDATE users SET last_seen = NOW() WHERE id = $1", 42 ) finally: await conn.close() asyncio.run(main())

Connection Pool: Der richtige Weg für Produktionscode

import asyncio import asyncpg # Globaler Pool (einmal erstellen, überall nutzen) pool: asyncpg.Pool | None = None async def init_db(): global pool pool = await asyncpg.create_pool( dsn="postgresql://postgres:secret@localhost/mydb", min_size=5, # Mindestens 5 offene Verbindungen max_size=20, # Maximal 20 Verbindungen command_timeout=60, ) async def get_user(user_id: int) -> dict | None: async with pool.acquire() as conn: # Aus dem Pool ausleihen row = await conn.fetchrow( "SELECT * FROM users WHERE id = $1", user_id ) return dict(row) if row else None async def main(): await init_db() try: # 50 parallele Queries tasks = [get_user(i) for i in range(1, 51)] users = await asyncio.gather(*tasks) print(f"{len([u for u in users if u])} User gefunden") finally: await pool.close() asyncio.run(main())

Transaktionen mit asyncpg

async def transfer_money( from_id: int, to_id: int, amount: float ) -> bool: async with pool.acquire() as conn: async with conn.transaction(): # Automatisches COMMIT/ROLLBACK # Beide Updates in einer Transaktion — atomar! await conn.execute( "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_id ) await conn.execute( "UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_id ) # Bei Exception → automatisches ROLLBACK return True

SQLAlchemy 2.0 mit asyncio

Wer ORM-Features braucht, nutzt SQLAlchemy 2.0 mit dem asyncio-Extension. Das AsyncSession-Objekt funktioniert wie eine normale Session, aber mit await.

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker engine = create_async_engine( "postgresql+asyncpg://user:password@localhost/mydb", pool_size=10, echo=False, ) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) async def get_users_orm(): async with AsyncSessionLocal() as session: result = await session.execute( select(User).where(User.active == True).limit(10) ) return result.scalars().all()

6. Semaphore & Rate Limiting: Kontrolle behalten

asyncio.gather() ohne Begrenzung kann Server überlasten oder Rate-Limits triggern. asyncio.Semaphore ist das Werkzeug, um gleichzeitige Operationen zu begrenzen — ein eingebauter Concurrency-Limiter ohne externe Libraries.

Semaphore

Semaphore-Konzept

Ein Semaphore mit Wert N erlaubt genau N gleichzeitige Operationen. Der 101. Request wartet automatisch, bis einer der ersten 100 fertig ist — wie ein Türsteher der nur N Gäste gleichzeitig reinlässt.

Rate-Limited Web Crawler

import asyncio import aiohttp from dataclasses import dataclass from typing import NamedTuple class CrawlResult(NamedTuple): url: str status: int size: int error: str | None = None async def crawl_url( session: aiohttp.ClientSession, semaphore: asyncio.Semaphore, url: str, delay: float = 0.1 # Rate Limiting: 100ms zwischen Requests ) -> CrawlResult: async with semaphore: # Slot belegen try: async with session.get(url, allow_redirects=True) as resp: content = await resp.read() await asyncio.sleep(delay) # Höfliche Pause return CrawlResult(url, resp.status, len(content)) except Exception as e: return CrawlResult(url, 0, 0, str(e)) async def crawl_all(urls: list[str], max_concurrent: int = 10) -> list[CrawlResult]: semaphore = asyncio.Semaphore(max_concurrent) # Max 10 gleichzeitig async with aiohttp.ClientSession( headers={"User-Agent": "SpockyBot/1.0"}, timeout=aiohttp.ClientTimeout(total=30), ) as session: tasks = [crawl_url(session, semaphore, url) for url in urls] results = await asyncio.gather(*tasks) return list(results) async def main(): urls = [f"https://example.com/page/{i}" for i in range(100)] results = await crawl_all(urls, max_concurrent=10) ok = [r for r in results if r.status == 200] errors = [r for r in results if r.error] print(f"✓ Erfolgreich: {len(ok)}, ✗ Fehler: {len(errors)}") asyncio.run(main())

Token-Bucket Rate Limiter

Für präzises Rate Limiting (z.B. "max 100 Requests pro Sekunde") implementierst du einen Token-Bucket mit asyncio. Der Bucket füllt sich kontinuierlich auf und leert sich mit jedem Request.

import asyncio import time class AsyncTokenBucket: def __init__(self, rate: float, capacity: float): self.rate = rate # Tokens/Sekunde self.capacity = capacity # Max Tokens self.tokens = capacity # Start: voll self.last_refill = time.monotonic() self._lock = asyncio.Lock() async def acquire(self, tokens: float = 1.0): async with self._lock: while True: now = time.monotonic() elapsed = now - self.last_refill self.tokens = min( self.capacity, self.tokens + elapsed * self.rate ) self.last_refill = now if self.tokens >= tokens: self.tokens -= tokens return # Warten bis genug Tokens vorhanden wait_time = (tokens - self.tokens) / self.rate await asyncio.sleep(wait_time) # Verwendung: max 10 Requests/Sekunde async def main(): bucket = AsyncTokenBucket(rate=10, capacity=20) async def rate_limited_fetch(url: str): await bucket.acquire() # ... eigentlicher Request print(f"Fetching: {url}") tasks = [rate_limited_fetch(f"/api/{i}") for i in range(50)] await asyncio.gather(*tasks)

asyncio.Semaphore für Datenbankverbindungen

# DB-Pool hat 20 Verbindungen → max 20 gleichzeitige Queries db_semaphore = asyncio.Semaphore(20) async def safe_db_query(query: str, *args): async with db_semaphore: # Sicherstellen: nie mehr als 20 async with pool.acquire() as conn: return await conn.fetch(query, *args)
Faustregel: Semaphore-Wert = min(verfügbare Ressourcen, was der Server verträgt). Für externe APIs: Semaphore(10) als Startpunkt. Für lokale Datenbanken: Semaphore(pool_size - 2) (2 Verbindungen als Reserve).

Best Practices: Async Python 2026

Async Python mit Claude Code schreiben

Claude Code hilft dir, komplexe asyncio-Patterns umzusetzen, Bugs zu finden und deinen asynchronen Code zu optimieren — von Coroutinen bis zum produktionsreifen Rate-Limiter. Starte kostenlos und erlebe, wie KI-Assisted Development deine Python-Produktivität transformiert.

Kostenlos testen — Trial starten
asyncio async/await aiohttp httpx asyncpg Semaphore Producer/Consumer Rate Limiting Python 3.12 Claude Code