LLM API Entegrasyonu: Best Practices ve Kurumsal Rehber
LLM API'lerinin kurumsal sistemlere entegrasyonu, güvenilir ve ölçeklenebilir AI uygulamaları için kritik öneme sahiptir. Bu rehberde production-ready entegrasyon stratejilerini inceliyoruz.
API Providers Karşılaştırması
OpenAI API
Modeller: GPT, GPT Turbo, GPT-3.5 Turbo Özellikler:
- Function calling
- JSON mode
- Vision (görüntü analizi)
- DALL-E (görüntü üretimi)
Endpoint:
https://api.openai.com/v1/chat/completions
Anthropic API
Modeller: Claude 3 Opus, Sonnet, Haiku Özellikler:
- 200K context window
- Constitutional AI
- XML tag desteği
Endpoint:
https://api.anthropic.com/v1/messages
Google AI (Gemini)
Modeller: Gemini Pro, Gemini Ultra Özellikler:
- Multimodal (metin, görüntü, video)
- Grounding
- Code execution
Temel Entegrasyon Yapısı
OpenAI Python SDK
1from openai import OpenAI 2 3client = OpenAI(api_key="sk-...") 4 5response = client.chat.completions.create( 6 model="gpt-4-turbo", 7 messages=[ 8 {"role": "system", "content": "Sen yardımcı bir asistansın."}, 9 {"role": "user", "content": "Merhaba!"} 10 ], 11 temperature=0.7, 12 max_tokens=1000 13) 14 15print(response.choices[0].message.content)
Anthropic Python SDK
1from anthropic import Anthropic 2 3client = Anthropic(api_key="sk-ant-...") 4 5message = client.messages.create( 6 model="claude-3-opus-20240229", 7 max_tokens=1024, 8 messages=[ 9 {"role": "user", "content": "Merhaba!"} 10 ] 11) 12 13print(message.content[0].text)
Streaming Response
1# OpenAI Streaming 2stream = client.chat.completions.create( 3 model="gpt-4-turbo", 4 messages=[{"role": "user", "content": "Uzun bir hikaye anlat"}], 5 stream=True 6) 7 8for chunk in stream: 9 if chunk.choices[0].delta.content: 10 print(chunk.choices[0].delta.content, end="")
Error Handling
Hata Tipleri
| Hata Kodu | Açıklama | Çözüm |
|---|---|---|
| 400 | Invalid request | Request formatını kontrol et |
| 401 | Invalid API key | API key'i doğrula |
| 429 | Rate limit | Backoff ile retry |
| 500 | Server error | Retry with exponential backoff |
| 503 | Service unavailable | Bekle ve retry |
Robust Error Handling
1import time 2from openai import RateLimitError, APIError, APIConnectionError 3 4def call_llm_with_retry(messages, max_retries=3): 5 for attempt in range(max_retries): 6 try: 7 response = client.chat.completions.create( 8 model="gpt-4-turbo", 9 messages=messages 10 ) 11 return response.choices[0].message.content 12 13 except RateLimitError: 14 wait_time = 2 ** attempt # Exponential backoff 15 print(f"Rate limited. Waiting {wait_time}s...") 16 time.sleep(wait_time) 17 18 except APIConnectionError: 19 print("Connection error. Retrying...") 20 time.sleep(1) 21 22 except APIError as e: 23 print(f"API error: {e}") 24 if attempt == max_retries - 1: 25 raise 26 27 raise Exception("Max retries exceeded")
Exponential Backoff with Jitter
1import random 2 3def exponential_backoff(attempt, base=1, max_wait=60): 4 wait = min(base * (2 ** attempt), max_wait) 5 jitter = random.uniform(0, wait * 0.1) 6 return wait + jitter
Rate Limiting Yönetimi
Rate Limit Türleri
- RPM (Requests Per Minute): Dakikadaki istek sayısı
- TPM (Tokens Per Minute): Dakikadaki token sayısı
- RPD (Requests Per Day): Günlük istek sayısı
Token Bucket Algorithm
1import time 2from threading import Lock 3 4class TokenBucket: 5 def __init__(self, tokens_per_second, max_tokens): 6 self.tokens_per_second = tokens_per_second 7 self.max_tokens = max_tokens 8 self.tokens = max_tokens 9 self.last_update = time.time() 10 self.lock = Lock() 11 12 def acquire(self, tokens=1): 13 with self.lock: 14 now = time.time() 15 elapsed = now - self.last_update 16 self.tokens = min( 17 self.max_tokens, 18 self.tokens + elapsed * self.tokens_per_second 19 ) 20 self.last_update = now 21 22 if self.tokens >= tokens: 23 self.tokens -= tokens 24 return True 25 return False 26 27 def wait_and_acquire(self, tokens=1): 28 while not self.acquire(tokens): 29 time.sleep(0.1) 30 31# Kullanım 32rate_limiter = TokenBucket(tokens_per_second=10, max_tokens=100) 33rate_limiter.wait_and_acquire() 34# API call...
Caching Stratejileri
Response Caching
1import hashlib 2import json 3from functools import lru_cache 4import redis 5 6redis_client = redis.Redis(host='localhost', port=6379, db=0) 7 8def get_cache_key(messages, model, temperature): 9 content = json.dumps({ 10 "messages": messages, 11 "model": model, 12 "temperature": temperature 13 }, sort_keys=True) 14 return hashlib.md5(content.encode()).hexdigest() 15 16def cached_llm_call(messages, model="gpt-4", temperature=0.7, ttl=3600): 17 cache_key = get_cache_key(messages, model, temperature) 18 19 # Cache'den kontrol 20 cached = redis_client.get(cache_key) 21 if cached: 22 return json.loads(cached) 23 24 # API call 25 response = client.chat.completions.create( 26 model=model, 27 messages=messages, 28 temperature=temperature 29 ) 30 result = response.choices[0].message.content 31 32 # Cache'e kaydet 33 redis_client.setex(cache_key, ttl, json.dumps(result)) 34 35 return result
Semantic Caching
Benzer sorguları cache'den karşılama:
1def semantic_cache_lookup(query, threshold=0.95): 2 query_embedding = get_embedding(query) 3 4 # Vector DB'de benzer sorgu ara 5 results = vector_db.search( 6 vector=query_embedding, 7 top_k=1, 8 filter={"type": "cache"} 9 ) 10 11 if results and results[0].score >= threshold: 12 return results[0].metadata["response"] 13 14 return None
Token Management
Token Sayımı
1import tiktoken 2 3def count_tokens(text, model="gpt-4"): 4 encoding = tiktoken.encoding_for_model(model) 5 return len(encoding.encode(text)) 6 7def count_message_tokens(messages, model="gpt-4"): 8 encoding = tiktoken.encoding_for_model(model) 9 tokens = 0 10 11 for message in messages: 12 tokens += 4 # message overhead 13 for key, value in message.items(): 14 tokens += len(encoding.encode(value)) 15 16 tokens += 2 # reply overhead 17 return tokens
Context Window Yönetimi
1def truncate_messages(messages, max_tokens=4000, model="gpt-4"): 2 total_tokens = count_message_tokens(messages, model) 3 4 while total_tokens > max_tokens and len(messages) > 2: 5 # System message'ı koru, en eski user/assistant'ı sil 6 messages.pop(1) 7 total_tokens = count_message_tokens(messages, model) 8 9 return messages
Async İşlemler
Async Client
1import asyncio 2from openai import AsyncOpenAI 3 4async_client = AsyncOpenAI(api_key="sk-...") 5 6async def async_llm_call(prompt): 7 response = await async_client.chat.completions.create( 8 model="gpt-4-turbo", 9 messages=[{"role": "user", "content": prompt}] 10 ) 11 return response.choices[0].message.content 12 13async def batch_process(prompts): 14 tasks = [async_llm_call(p) for p in prompts] 15 results = await asyncio.gather(*tasks) 16 return results 17 18# Kullanım 19prompts = ["Soru 1", "Soru 2", "Soru 3"] 20results = asyncio.run(batch_process(prompts))
Concurrent Rate-Limited Requests
1import asyncio 2from asyncio import Semaphore 3 4async def rate_limited_call(semaphore, prompt): 5 async with semaphore: 6 response = await async_client.chat.completions.create( 7 model="gpt-4-turbo", 8 messages=[{"role": "user", "content": prompt}] 9 ) 10 return response.choices[0].message.content 11 12async def batch_with_rate_limit(prompts, max_concurrent=5): 13 semaphore = Semaphore(max_concurrent) 14 tasks = [rate_limited_call(semaphore, p) for p in prompts] 15 return await asyncio.gather(*tasks)
Monitoring ve Logging
Request Logging
1import logging 2import time 3from functools import wraps 4 5logging.basicConfig(level=logging.INFO) 6logger = logging.getLogger(__name__) 7 8def log_llm_call(func): 9 @wraps(func) 10 def wrapper(*args, **kwargs): 11 start_time = time.time() 12 13 try: 14 result = func(*args, **kwargs) 15 duration = time.time() - start_time 16 17 logger.info(f"LLM Call Success", extra={ 18 "duration": duration, 19 "model": kwargs.get("model"), 20 "tokens_used": result.usage.total_tokens 21 }) 22 23 return result 24 25 except Exception as e: 26 duration = time.time() - start_time 27 logger.error(f"LLM Call Failed", extra={ 28 "duration": duration, 29 "error": str(e) 30 }) 31 raise 32 33 return wrapper
Metrics Collection
1from prometheus_client import Counter, Histogram 2 3llm_requests_total = Counter( 4 'llm_requests_total', 5 'Total LLM API requests', 6 ['model', 'status'] 7) 8 9llm_latency = Histogram( 10 'llm_request_latency_seconds', 11 'LLM request latency', 12 ['model'] 13) 14 15llm_tokens = Counter( 16 'llm_tokens_total', 17 'Total tokens used', 18 ['model', 'type'] # input, output 19)
Güvenlik Best Practices
API Key Yönetimi
1import os 2from dotenv import load_dotenv 3 4load_dotenv() 5 6# Environment variable'dan al 7api_key = os.getenv("OPENAI_API_KEY") 8 9# Asla hardcode etme! 10# ❌ api_key = "sk-..."
Input Validation
1def validate_input(text, max_length=10000): 2 if not text or not isinstance(text, str): 3 raise ValueError("Invalid input") 4 5 if len(text) > max_length: 6 raise ValueError(f"Input too long: {len(text)} > {max_length}") 7 8 # Injection kontrolü 9 dangerous_patterns = ["<script>", "{{", "{%"] 10 for pattern in dangerous_patterns: 11 if pattern in text.lower(): 12 raise ValueError("Potentially dangerous input") 13 14 return text.strip()
Output Sanitization
1import html 2 3def sanitize_output(text): 4 # HTML escape 5 text = html.escape(text) 6 7 # PII maskeleme 8 text = mask_pii(text) 9 10 return text
Production Architecture
1┌──────────────┐ ┌──────────────┐ ┌──────────────┐ 2│ Client │────▶│ API GW │────▶│ LLM Service │ 3└──────────────┘ │ (Rate Limit)│ └──────┬───────┘ 4 └──────────────┘ │ 5 ┌─────▼─────┐ 6 ┌──────────────┐ │ Router │ 7 │ Cache │◀───▶│ │ 8 │ (Redis) │ └─────┬─────┘ 9 └──────────────┘ │ 10 ┌──────────────────┼──────────────────┐ 11 │ │ │ 12 ┌──────▼──────┐ ┌─────▼─────┐ ┌──────▼──────┐ 13 │ OpenAI │ │ Anthropic │ │ Google │ 14 └─────────────┘ └───────────┘ └─────────────┘
Sonuç
LLM API entegrasyonu, dikkatli planlama ve robust implementasyon gerektiren kritik bir bileşendir. Rate limiting, caching, error handling ve monitoring gibi konular production ortamında hayati öneme sahiptir.
Veni AI olarak, kurumsal AI entegrasyonlarında uzman destek sunuyoruz. Projeleriniz için bizimle iletişime geçin.
