Serverless-Anwendungen haben ein fundamentales Problem: Jede Funktion startet ohne Zustand, ohne persistente Verbindungen, ohne Shared Memory. Upstash löst dieses Problem elegant mit einem HTTP-basierten Redis, das genau für serverlose Umgebungen gebaut wurde. In diesem Artikel zeigen wir, wie Claude Code dir hilft, Upstash Redis effektiv in TypeScript-Projekten einzusetzen — von einfachen Cache-Operationen bis hin zu vektorbasierter Suche für RAG-Pipelines.
Ob Next.js App Router, Vercel Edge Functions oder AWS Lambda — Upstash bietet eine einheitliche API, die überall funktioniert. Kein Verbindungspool-Management, kein "too many connections"-Fehler, pay-per-request statt pay-per-instance.
1. Upstash Setup & Grundoperationen
Der Einstieg mit Upstash ist bewusst einfach gehalten. Ein Upstash-Konto, eine neue Redis-Datenbank, und zwei Umgebungsvariablen — das reicht, um loszulegen.
Nach der Registrierung auf upstash.com erstellst du eine neue Redis-Datenbank. Wähle die Region, die deiner Vercel- oder Lambda-Deployment-Region am nächsten ist, um Latenz zu minimieren. Upstash gibt dir dann zwei Werte: UPSTASH_REDIS_REST_URL und UPSTASH_REDIS_REST_TOKEN. Diese kommen in deine .env.local:
Setup
# .env.local
UPSTASH_REDIS_REST_URL=https://us1-superb-robin-12345.upstash.io
UPSTASH_REDIS_REST_TOKEN=AXXXXXXXbXXXXXXXXXXXXX
# Installation
npm install @upstash/redis
Claude Code erkennt sofort das Muster mit Redis.fromEnv() und schlägt den korrekten Import vor. Der Client liest die Umgebungsvariablen automatisch ein — kein manuelles Konfigurieren:
TypeScript
import { Redis } from '@upstash/redis'
// Automatisches Einlesen der UPSTASH_REDIS_REST_URL und _REST_TOKEN
const redis = Redis.fromEnv()
// Basis-Operationen
async function basicOps() {
// SET mit TTL (Expiry in Sekunden)
await redis.set('user:42:name', 'Alice', { ex: 3600 })
// SET nur wenn Key noch nicht existiert (NX flag)
await redis.set('lock:job:123', 'processing', { nx: true, ex: 30 })
// SET nur wenn Key bereits existiert (XX flag)
await redis.set('session:abc', 'refreshed', { xx: true, ex: 900 })
// GET — gibt null zurück wenn nicht vorhanden
const name = await redis.get<string>('user:42:name')
console.log(name) // 'Alice' oder null
// DEL — löscht einen oder mehrere Keys
await redis.del('user:42:name', 'lock:job:123')
// EXISTS — prüft ob Key vorhanden
const exists = await redis.exists('session:abc')
// INCR / DECR für atomare Counter
const views = await redis.incr('page:blog:views')
const remaining = await redis.decr('credits:user:42')
// EXPIRE — TTL nachträglich setzen
await redis.expire('session:abc', 1800)
// TTL abfragen
const ttl = await redis.ttl('session:abc')
console.log(`Session läuft in ${ttl} Sekunden ab`)
}
Claude Code Tipp: Wenn du Claude Code "Upstash Redis Client mit Fehlerbehandlung erstellen" sagst, generiert es automatisch Wrapper-Funktionen mit Try-Catch und Fallback-Logik. Perfekt für Produktionscode, wo ein Redis-Ausfall den Request nicht komplett blockieren soll.
Pipeline für Batch-Operationen
Statt mehrere HTTP-Requests hintereinander zu machen, fasst eine Pipeline alle Operationen in einem einzigen Request zusammen. Das ist besonders in Edge-Umgebungen wichtig:
async function pipelineExample() {
const pipeline = redis.pipeline()
// Alle Operationen werden gesammelt
pipeline.set('key1', 'value1', { ex: 3600 })
pipeline.set('key2', 'value2', { ex: 3600 })
pipeline.get('key1')
pipeline.incr('counter')
// Ein einziger HTTP-Request für alle Operationen
const results = await pipeline.exec()
// results: ['OK', 'OK', 'value1', 1]
console.log(results)
}
// Typische Caching-Pattern für Next.js API Routes
export async function GET(request: Request) {
const cacheKey = 'api:products:all'
// Cache-First Strategy
const cached = await redis.get<Product[]>(cacheKey)
if (cached) {
return Response.json(cached, {
headers: { 'X-Cache': 'HIT' }
})
}
// Cache-Miss: Daten holen und cachen
const products = await fetchProductsFromDB()
await redis.set(cacheKey, products, { ex: 300 }) // 5 Min Cache
return Response.json(products, {
headers: { 'X-Cache': 'MISS' }
})
}
Kostenmodell: Upstash berechnet pro Command — nicht pro Verbindungszeit. Der Free Plan bietet 10.000 Commands/Tag. Pay-as-you-go ab $0,20 pro 100.000 Commands. Für die meisten Hobby-Projekte bleibt der Free Plan dauerhaft ausreichend.
2. Rate Limiting mit @upstash/ratelimit
Rate Limiting ist einer der häufigsten Use Cases für Redis in Web-APIs. Upstash bietet dafür ein dediziertes Package, das verschiedene Algorithmen implementiert und sich nahtlos in Next.js Middleware integriert.
Installation
npm install @upstash/ratelimit @upstash/redis
Algorithmen im Vergleich
Upstash Ratelimit bietet drei Algorithmen für unterschiedliche Szenarien:
- Sliding Window: Gleichmäßige Verteilung, kein Burst am Fensterende
- Fixed Window: Einfach, günstig, aber erlaubt Bursts an Fenstergrenzen
- Token Bucket: Erlaubt kontrollierten Burst, dann gleichmäßige Ratendrosselung
Rate Limiting
import { Ratelimit } from '@upstash/ratelimit'
import { Redis } from '@upstash/redis'
const redis = Redis.fromEnv()
// Sliding Window: 10 Requests pro 10 Sekunden (gleitendes Fenster)
const slidingLimiter = new Ratelimit({
redis,
limiter: Ratelimit.slidingWindow(10, '10 s'),
analytics: true, // Upstash Dashboard Analytics
prefix: 'rl:api:sliding',
})
// Fixed Window: 100 Requests pro Minute (fixiertes Fenster)
const fixedLimiter = new Ratelimit({
redis,
limiter: Ratelimit.fixedWindow(100, '1 m'),
analytics: true,
prefix: 'rl:api:fixed',
})
// Token Bucket: 20 Tokens, 10 Tokens/Sekunde Regeneration
const tokenBucket = new Ratelimit({
redis,
limiter: Ratelimit.tokenBucket(20, '10 s', 20),
analytics: true,
prefix: 'rl:api:bucket',
})
Next.js Middleware Integration
Die eleganteste Art, Rate Limiting zu implementieren: als Next.js Middleware, die vor jedem Request ausgeführt wird. So schützt eine einzige Datei alle API-Routen:
// middleware.ts
import { NextRequest, NextResponse } from 'next/server'
import { Ratelimit } from '@upstash/ratelimit'
import { Redis } from '@upstash/redis'
const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(20, '60 s'),
analytics: true,
prefix: 'rl:global',
})
export async function middleware(request: NextRequest) {
// IP-Adresse aus verschiedenen Headers
const ip =
request.headers.get('x-forwarded-for')?.split(',')[0] ??
request.headers.get('x-real-ip') ??
'anonymous'
const { success, limit, remaining, reset } = await ratelimit.limit(ip)
// Rate-Limit-Headers für Client-Transparenz
const headers = {
'X-RateLimit-Limit': limit.toString(),
'X-RateLimit-Remaining': remaining.toString(),
'X-RateLimit-Reset': reset.toString(),
}
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Bitte warte kurz.' },
{ status: 429, headers }
)
}
// Request durchlassen, Headers anhängen
const response = NextResponse.next()
Object.entries(headers).forEach(([k, v]) => response.headers.set(k, v))
return response
}
// Nur API-Routen schützen
export const config = {
matcher: ['/api/:path*'],
}
Differenziertes Rate Limiting
Verschiedene Nutzergruppen brauchen verschiedene Limits. Pro-Nutzer sollen mehr Requests machen dürfen als anonyme Besucher:
// lib/ratelimit.ts — differenzierte Limits
import { Ratelimit } from '@upstash/ratelimit'
import { Redis } from '@upstash/redis'
const redis = Redis.fromEnv()
export const rateLimiters = {
// Anonyme Nutzer: 5 Requests / Minute
anonymous: new Ratelimit({
redis,
limiter: Ratelimit.slidingWindow(5, '60 s'),
prefix: 'rl:anon',
}),
// Free-Tier: 30 Requests / Minute
free: new Ratelimit({
redis,
limiter: Ratelimit.slidingWindow(30, '60 s'),
prefix: 'rl:free',
}),
// Pro-Tier: 200 Requests / Minute
pro: new Ratelimit({
redis,
limiter: Ratelimit.tokenBucket(200, '60 s', 200),
prefix: 'rl:pro',
}),
}
export async function checkRateLimit(
identifier: string,
tier: 'anonymous' | 'free' | 'pro'
) {
const limiter = rateLimiters[tier]
return limiter.limit(identifier)
}
Claude Code Workflow: Sage Claude Code "Füge differenziertes Rate Limiting für meine Next.js API hinzu, mit Unterstützung für anonymous, free und pro Tier". Claude Code analysiert deine bestehende Auth-Logik und integriert das Rate Limiting nahtlos in deinen Request-Handler.
3. Session Storage & JWT-Caching
Serverlose Funktionen haben keinen gemeinsamen Speicher. Redis als zentraler Session-Store löst dieses Problem elegant und skaliert auf Millionen gleichzeitiger Sessions ohne Aufwand.
Das Hash-Datenstruktur (HSET/HGET) ist ideal für Session-Daten, weil du einzelne Felder updaten kannst, ohne die gesamte Session neu zu schreiben:
Session
import { Redis } from '@upstash/redis'
const redis = Redis.fromEnv()
// Session-Typen
interface UserSession {
userId: string
email: string
role: 'admin' | 'user' | 'pro'
loginAt: number
preferences: {
language: string
theme: 'light' | 'dark'
notifications: boolean
}
}
// Session erstellen (nach Login)
async function createSession(
sessionId: string,
session: UserSession
): Promise<void> {
const key = `session:${sessionId}`
// Gesamte Session als JSON speichern
await redis.set(key, JSON.stringify(session), {
ex: 86400, // 24 Stunden TTL
})
}
// Session lesen
async function getSession(sessionId: string): Promise<UserSession | null> {
const key = `session:${sessionId}`
const data = await redis.get<string>(key)
if (!data) return null
try {
return JSON.parse(data) as UserSession
} catch {
return null
}
}
// Session-Präferenzen updaten (nur Teilupdate)
async function updatePreferences(
sessionId: string,
preferences: Partial<UserSession['preferences']>
): Promise<void> {
const session = await getSession(sessionId)
if (!session) throw new Error('Session not found')
// Partial merge
session.preferences = { ...session.preferences, ...preferences }
await redis.set(`session:${sessionId}`, JSON.stringify(session), {
keepTtl: true, // TTL beibehalten
})
}
// Session löschen (Logout)
async function deleteSession(sessionId: string): Promise<void> {
await redis.del(`session:${sessionId}`)
}
JWT Token Blacklist
JWTs sind stateless — einmal ausgestellt, gelten sie bis zum Ablauf. Redis ermöglicht eine effiziente Token-Blacklist für sofortige Invalidierung bei Logout oder Sicherheitsvorfällen:
JWT Cache
import { Redis } from '@upstash/redis'
import { jwtVerify } from 'jose'
const redis = Redis.fromEnv()
const BLACKLIST_PREFIX = 'jwt:blacklist:'
const CACHE_PREFIX = 'jwt:cache:'
// JWT zur Blacklist hinzufügen (bei Logout)
async function blacklistToken(token: string, expiresAt: number): Promise<void> {
const ttl = expiresAt - Math.floor(Date.now() / 1000)
if (ttl <= 0) return // Token bereits abgelaufen
// Token-Hash als Key (kein Plain-Token speichern)
const tokenHash = await hashToken(token)
await redis.set(`${BLACKLIST_PREFIX}${tokenHash}`, 1, { ex: ttl })
}
// Prüfen ob Token auf Blacklist steht
async function isTokenBlacklisted(token: string): Promise<boolean> {
const tokenHash = await hashToken(token)
const exists = await redis.exists(`${BLACKLIST_PREFIX}${tokenHash}`)
return exists === 1
}
// JWT validieren mit Cache (vermeidet wiederholte Krypto-Ops)
async function validateAndCacheJWT(token: string) {
const tokenHash = await hashToken(token)
const cacheKey = `${CACHE_PREFIX}${tokenHash}`
// Erst Cache prüfen
const cached = await redis.get<{ userId: string; role: string }>(cacheKey)
if (cached) return cached
// Dann Blacklist prüfen
if (await isTokenBlacklisted(token)) {
throw new Error('Token wurde invalidiert')
}
// JWT verifizieren
const { payload } = await jwtVerify(token, new TextEncoder().encode(
process.env.JWT_SECRET!
))
const userData = {
userId: payload.sub as string,
role: payload.role as string,
}
// 5 Minuten cachen (kurze TTL für Security)
await redis.set(cacheKey, userData, { ex: 300 })
return userData
}
async function hashToken(token: string): Promise<string> {
const buffer = await crypto.subtle.digest('SHA-256', new TextEncoder().encode(token))
return Array.from(new Uint8Array(buffer)).map(b => b.toString(16).padStart(2, '0')).join('')
}
Performance-Insight: JWT-Validierung mit kryptographischen Operationen kostet 2-5ms. Mit Redis-Cache reduzierst du das auf <1ms für 95% der Requests. Bei einer API mit 1 Mio. Requests/Tag sparst du mehrere Stunden CPU-Zeit täglich.
4. Sorted Sets & Leaderboards
Sorted Sets sind eine der mächtigsten Redis-Datenstrukturen. Jedes Element hat einen numerischen Score — Redis hält die Liste automatisch sortiert. Perfekt für Leaderboards, Analytics und Ranglistensysteme.
Sorted Sets
import { Redis } from '@upstash/redis'
const redis = Redis.fromEnv()
interface LeaderboardEntry {
userId: string
score: number
rank: number
}
// Score hinzufügen/aktualisieren
async function addScore(
leaderboardId: string,
userId: string,
score: number
): Promise<void> {
// zadd: Hinzufügen oder Aktualisieren
await redis.zadd(`lb:${leaderboardId}`, {
score,
member: userId,
})
}
// Score erhöhen (atomare Operation)
async function incrementScore(
leaderboardId: string,
userId: string,
increment: number
): Promise<number> {
// zincrby: atomares Inkrement
return redis.zincrby(`lb:${leaderboardId}`, increment, userId)
}
// Top N Einträge (absteigend sortiert)
async function getTopEntries(
leaderboardId: string,
count: number = 10
): Promise<LeaderboardEntry[]> {
// zrevrange: höchste Scores zuerst, mit Scores
const entries = await redis.zrevrange(
`lb:${leaderboardId}`,
0,
count - 1,
{ withScores: true }
)
// Entries sind abwechselnd: [member, score, member, score, ...]
const result: LeaderboardEntry[] = []
for (let i = 0; i < entries.length; i += 2) {
result.push({
userId: entries[i] as string,
score: Number(entries[i + 1]),
rank: i / 2 + 1,
})
}
return result
}
// Rang eines Nutzers abfragen
async function getUserRank(
leaderboardId: string,
userId: string
): Promise<{ rank: number; score: number } | null> {
const [rank, score] = await Promise.all([
redis.zrevrank(`lb:${leaderboardId}`, userId), // 0-based
redis.zscore(`lb:${leaderboardId}`, userId),
])
if (rank === null || score === null) return null
return { rank: rank + 1, score: Number(score) }
}
Real-Time Leaderboard API
// app/api/leaderboard/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { Redis } from '@upstash/redis'
const redis = Redis.fromEnv()
// GET /api/leaderboard?game=racing&limit=20
export async function GET(request: NextRequest) {
const { searchParams } = new URL(request.url)
const game = searchParams.get('game') ?? 'global'
const limit = Math.min(Number(searchParams.get('limit')) || 10, 100)
const cacheKey = `cache:lb:${game}:top${limit}`
// Kurzes Caching (30 Sekunden) für Leaderboard-API
const cached = await redis.get(cacheKey)
if (cached) return NextResponse.json(cached)
// Leaderboard aus Sorted Set lesen
const entries = await redis.zrevrange(
`lb:${game}`, 0, limit - 1, { withScores: true }
)
const leaderboard = []
for (let i = 0; i < entries.length; i += 2) {
leaderboard.push({
rank: i / 2 + 1,
userId: entries[i],
score: Number(entries[i + 1]),
})
}
const result = {
game,
updatedAt: new Date().toISOString(),
entries: leaderboard,
}
// 30 Sekunden cachen
await redis.set(cacheKey, result, { ex: 30 })
return NextResponse.json(result)
}
// POST /api/leaderboard — Score submittieren
export async function POST(request: NextRequest) {
const { game, userId, score } = await request.json()
if (!game || !userId || typeof score !== 'number') {
return NextResponse.json({ error: 'Invalid payload' }, { status: 400 })
}
// NX: nur setzen wenn neuer Score besser ist (MAX-Option)
await redis.zadd(`lb:${game}`, { score, member: userId, ch: true })
const rank = await redis.zrevrank(`lb:${game}`, userId)
return NextResponse.json({ rank: (rank ?? 0) + 1, score })
}
Zeitbasierte Analytics mit Sorted Sets
// Seitenaufrufe als Sorted Set (Score = Timestamp)
async function trackPageView(page: string, userId: string) {
const now = Date.now()
const key = `analytics:pageviews:${page}`
// Score = Unix-Timestamp → automatisch zeitlich sortiert
await redis.zadd(key, { score: now, member: `${userId}:${now}` })
// Alte Einträge löschen (>7 Tage)
const weekAgo = now - 7 * 24 * 60 * 60 * 1000
await redis.zremrangebyscore(key, 0, weekAgo)
}
// Views der letzten Stunde zählen
async function getRecentViews(page: string, hours: number = 1): Promise<number> {
const since = Date.now() - hours * 60 * 60 * 1000
return redis.zcount(`analytics:pageviews:${page}`, since, '+')
}
5. QStash Message Queue
QStash ist Upstashs HTTP-basierter Message Queue Service. Er ermöglicht zuverlässige asynchrone Verarbeitung, Scheduling und Retry-Logik für serverlose Funktionen — ohne eigene Queue-Infrastruktur zu verwalten.
Typische Use Cases für QStash:
- E-Mail-Versand nach Registrierung (ohne den User zu blockieren)
- Bild-Verarbeitung und Video-Transcoding nach Upload
- Geplante Berichte und Cronjobs in serverlosen Umgebungen
- Webhook-Weiterleitung mit automatischen Retries bei Fehlern
QStash
npm install @upstash/qstash
// lib/qstash.ts
import { Client } from '@upstash/qstash'
export const qstash = new Client({
token: process.env.QSTASH_TOKEN!,
})
// Sofortige Nachricht senden
export async function sendWelcomeEmail(userId: string, email: string) {
await qstash.publishJSON({
url: `${process.env.APP_URL}/api/jobs/send-email`,
body: {
type: 'welcome',
userId,
email,
},
retries: 3, // Automatisch 3x wiederholen bei Fehler
delay: 5, // 5 Sekunden Verzögerung
})
}
// Verzögerter Job (in 1 Stunde ausführen)
export async function scheduleFollowUpEmail(userId: string) {
await qstash.publishJSON({
url: `${process.env.APP_URL}/api/jobs/follow-up`,
body: { userId },
delay: 3600, // 1 Stunde in Sekunden
retries: 5,
})
}
// Geplante Ausführung (Cron-ähnlich)
export async function createWeeklyReportSchedule() {
await qstash.schedules.create({
destination: `${process.env.APP_URL}/api/jobs/weekly-report`,
cron: '0 9 * * 1', // Jeden Montag um 9:00 Uhr UTC
body: JSON.stringify({ type: 'weekly_summary' }),
headers: { 'Content-Type': 'application/json' },
})
}
Webhook Handler mit Signatur-Verifikation
QStash signiert alle Nachrichten. Dein Webhook-Handler muss die Signatur verifizieren, um sicherzustellen, dass die Nachricht wirklich von QStash kommt:
// app/api/jobs/send-email/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { verifySignatureAppRouter } from '@upstash/qstash/nextjs'
// Wrapper für automatische Signatur-Verifikation
export const POST = verifySignatureAppRouter(async (request: NextRequest) => {
const body = await request.json()
const { type, userId, email } = body
console.log(`[Job] Processing: ${type} for user ${userId}`)
switch (type) {
case 'welcome':
await sendWelcomeEmailViaSendgrid(email, userId)
break
case 'follow-up':
await sendFollowUpEmail(email, userId)
break
default:
console.warn(`Unknown job type: ${type}`)
}
// HTTP 200 signalisiert QStash: Erfolgreich verarbeitet
return NextResponse.json({ success: true })
})
// Batch-Verarbeitung mit mehreren Empfängern
async function sendBatchNotifications(userIds: string[]) {
const { qstash } = await import('@/lib/qstash')
// Batch API: Mehrere Nachrichten in einem Request
await qstash.batch(
userIds.map(userId => ({
url: `${process.env.APP_URL}/api/jobs/notify`,
body: JSON.stringify({ userId }),
headers: { 'Content-Type': 'application/json' },
retries: 3,
delay: Math.floor(Math.random() * 60), // Random delay 0-60s (Rate Limit schonen)
}))
)
}
async function sendWelcomeEmailViaSendgrid(email: string, userId: string) {
// Sendgrid / Resend / Postmark Integration hier
console.log(`Sending welcome email to ${email}`)
}
async function sendFollowUpEmail(email: string, userId: string) {
console.log(`Sending follow-up to ${email}`)
}
QStash Retry-Strategie: QStash verwendet exponentielles Backoff für Retries. Nach dem ersten Fehler wartet es 1 Minute, dann 2 Minuten, dann 4 Minuten usw. Mit retries: 5 hat dein Handler bis zu 31 Minuten Zeit, sich zu erholen — ohne dass du dich um Retry-Logik kümmern musst.
Dead Letter Queue Pattern
// QStash DLQ: Fehlgeschlagene Jobs tracken
import { Client } from '@upstash/qstash'
import { Redis } from '@upstash/redis'
const qstash = new Client({ token: process.env.QSTASH_TOKEN! })
const redis = Redis.fromEnv()
// Failed Events aus QStash DLQ lesen und in Redis loggen
export async function processDLQ() {
const events = await qstash.dlq.listMessages()
for (const event of events.messages) {
// In Redis für Monitoring tracken
await redis.lpush('jobs:failed', JSON.stringify({
messageId: event.messageId,
url: event.url,
failedAt: event.notBefore,
responseStatus: event.responseStatus,
}))
// Nach 100 Einträgen trimmen
await redis.ltrim('jobs:failed', 0, 99)
}
}
6. Upstash Vector für RAG
Upstash Vector ist ein serverloser Vektordatenbank-Service — perfekt für RAG-Pipelines (Retrieval-Augmented Generation) in KI-Anwendungen. Kombiniert mit Claude Code entsteht eine vollständige Embedding- und Retrieval-Pipeline ohne eigene Infrastruktur.
Vector Store
npm install @upstash/vector
// lib/vector.ts
import { Index } from '@upstash/vector'
// Index initialisieren (Dimensionen passen zur Embedding-Größe)
export const vectorIndex = new Index({
url: process.env.UPSTASH_VECTOR_REST_URL!,
token: process.env.UPSTASH_VECTOR_REST_TOKEN!,
})
interface DocumentChunk {
id: string
text: string
embedding: number[]
metadata: {
source: string
chunkIndex: number
documentId: string
category: string
createdAt: string
}
}
// Dokument-Chunks in Vektordatenbank einfügen
async function upsertDocuments(chunks: DocumentChunk[]) {
// Batch-Upsert für bessere Performance
await vectorIndex.upsert(
chunks.map(chunk => ({
id: chunk.id,
vector: chunk.embedding,
data: chunk.text, // Originaler Text für Retrieval
metadata: chunk.metadata,
}))
)
}
// Ähnliche Dokumente finden
async function queryVector(
queryEmbedding: number[],
options: {
topK?: number
namespace?: string
filter?: string
includeMetadata?: boolean
} = {}
) {
const {
topK = 5,
namespace = '',
filter,
includeMetadata = true,
} = options
return vectorIndex.query({
vector: queryEmbedding,
topK,
includeMetadata,
includeData: true, // Originaler Text zurückgeben
filter, // Metadata-Filter: "category = 'docs'"
namespace, // Namespaces für Mandantentrennung
})
}
Vollständige RAG-Pipeline
Eine RAG-Pipeline besteht aus zwei Phasen: Indexierung (Dokumente einlesen und als Vektoren speichern) und Retrieval (relevante Chunks zur Query finden und an das LLM weitergeben):
// lib/rag-pipeline.ts
import { OpenAI } from 'openai'
import { vectorIndex } from './vector'
import { Redis } from '@upstash/redis'
const openai = new OpenAI()
const redis = Redis.fromEnv()
// Phase 1: Indexierung
async function indexDocument(
documentId: string,
text: string,
category: string
) {
// Text in Chunks aufteilen (einfache Version)
const chunks = splitIntoChunks(text, 512)
// Embeddings für alle Chunks generieren
const embeddingResponse = await openai.embeddings.create({
model: 'text-embedding-3-small',
input: chunks,
})
// Vektoren in Upstash einfügen
await vectorIndex.upsert(
chunks.map((chunk, i) => ({
id: `${documentId}-chunk-${i}`,
vector: embeddingResponse.data[i].embedding,
data: chunk,
metadata: {
documentId,
chunkIndex: i,
category,
source: 'uploaded',
createdAt: new Date().toISOString(),
},
}))
)
console.log(`Indexed ${chunks.length} chunks for document ${documentId}`)
}
// Phase 2: Retrieval + Generation
export async function ragQuery(
question: string,
category?: string
): Promise<string> {
// Cache-Check für häufige Fragen
const cacheKey = `rag:cache:${Buffer.from(question).toString('base64').slice(0, 32)}`
const cached = await redis.get<string>(cacheKey)
if (cached) return cached
// Query-Embedding erstellen
const queryEmbedding = await openai.embeddings.create({
model: 'text-embedding-3-small',
input: question,
})
// Relevante Chunks aus Upstash Vector suchen
const filter = category ? `category = '${category}'` : undefined
const results = await vectorIndex.query({
vector: queryEmbedding.data[0].embedding,
topK: 5,
includeData: true,
includeMetadata: true,
filter,
})
// Kontext aus relevanten Chunks zusammenbauen
const context = results
.filter(r => r.score > 0.7) // Nur Ergebnisse über Similarity-Threshold
.map(r => r.data)
.join('\n\n---\n\n')
if (!context) {
return 'Keine relevanten Informationen zu dieser Frage gefunden.'
}
// Claude / GPT-4 mit Kontext anfragen
const completion = await openai.chat.completions.create({
model: 'gpt-4o-mini',
messages: [
{
role: 'system',
content: 'Beantworte die Frage basierend auf dem gegebenen Kontext. Antworte auf Deutsch.',
},
{
role: 'user',
content: `Kontext:\n${context}\n\nFrage: ${question}`,
},
],
})
const answer = completion.choices[0].message.content ?? ''
// Antwort für 1 Stunde cachen
await redis.set(cacheKey, answer, { ex: 3600 })
return answer
}
// Hilfsfunktion: Text in Chunks aufteilen
function splitIntoChunks(text: string, maxLength: number): string[] {
const sentences = text.split(/(?<=[.!?])\s+/)
const chunks: string[] = []
let current = ''
for (const sentence of sentences) {
if (current.length + sentence.length > maxLength) {
if (current) chunks.push(current.trim())
current = sentence
} else {
current += ' ' + sentence
}
}
if (current) chunks.push(current.trim())
return chunks
}
Hybrid Search: Vektor + Keyword
// Hybrid Search: Upstash Vector + Redis Full-Text via SCAN
export async function hybridSearch(
query: string,
embedding: number[],
namespace: string = ''
) {
// Parallel: Semantische Suche + Metadaten-Filter
const [semanticResults, keywordMatches] = await Promise.all([
vectorIndex.query({
vector: embedding,
topK: 10,
namespace,
includeMetadata: true,
includeData: true,
}),
// Keyword-basierte Vorfilterung via Redis-Cache
redis.smembers(`search:keywords:${query.toLowerCase().replace(/\s+/g, ':')}`),
])
// Re-ranking: Semantischen Score + Keyword-Boost kombinieren
const keywordSet = new Set(keywordMatches)
const reranked = semanticResults
.map(result => ({
...result,
finalScore: result.score * (1 + (keywordSet.has(result.id) ? 0.2 : 0)),
}))
.sort((a, b) => b.finalScore - a.finalScore)
return reranked.slice(0, 5)
}
// Namespace-Strategie für Multi-Tenant
async function queryByTenant(
tenantId: string,
query: number[],
filter: string
) {
// Jeder Mandant hat seinen eigenen Namespace
return vectorIndex.query({
vector: query,
topK: 5,
namespace: `tenant-${tenantId}`,
filter, // z.B. "createdAt > '2026-01-01'"
includeData: true,
includeMetadata: true,
})
}
Upstash Vector Kostenmodell: Free Plan: 10.000 Queries/Monat, 200 MB Storage. Pay-as-you-go: $0,40 pro 100K Queries. Für eine typische RAG-Anwendung mit 5.000 täglichen Nutzern entstehen etwa $6-12/Monat — ein Bruchteil der Kosten einer verwalteten Vektordatenbank wie Pinecone.
Fazit: Upstash als Full-Stack Serverless Backend
Upstash hat sich von einem einfachen Redis-Service zu einem vollständigen serverlosen Backend entwickelt. Rate Limiting, Session Storage, Message Queuing, Analytics und Vektorsuche — alles aus einer konsistenten HTTP-basierten API, die in jeder Serverless-Umgebung funktioniert.
Claude Code beschleunigt die Arbeit mit Upstash erheblich: Es kennt die API-Patterns, generiert TypeScript-Typen korrekt und schlägt passende TTL-Werte, Retry-Strategien und Caching-Patterns für deinen konkreten Use Case vor. Was früher einen halben Tag Dokumentations-Lektüre gekostet hat, ist mit Claude Code in einer Stunde implementiert und getestet.
Die wichtigsten Erkenntnisse aus diesem Artikel:
- Redis.fromEnv() ist der empfohlene Einstiegspunkt — automatisch, sicher, überall lauffähig
- Rate Limiting mit Sliding Window ist für die meisten APIs die beste Wahl
- Session Storage mit JSON.stringify ist einfacher als HSET und ausreichend für 99% der Anwendungsfälle
- QStash löst das serverlose Cron/Queue-Problem elegant ohne eigene Infrastruktur
- Upstash Vector mit Namespace-Strategie ist die kostengünstige Alternative zu Pinecone für RAG-Anwendungen
Caching-Modul im Kurs
Im Claude Code Mastery Kurs: vollständiges Upstash-Modul mit Rate Limiting, QStash, Sorted Sets und Vector Store für serverlose Web-Anwendungen.
14 Tage kostenlos testen →