LLM API 統合: ベストプラクティスとエンタープライズ向けガイド
エンタープライズシステムへの LLM API の統合は、信頼性が高くスケーラブルな AI アプリケーションを実現するうえで重要です。本ガイドでは、プロダクション対応の統合戦略を解説します。
API プロバイダ比較
OpenAI API
モデル: GPT, GPT Turbo, GPT-3.5 Turbo
機能:
- Function calling
- JSON mode
- Vision(画像解析)
- DALL-E(画像生成)
エンドポイント:
https://api.openai.com/v1/chat/completions
Anthropic API
モデル: Claude 3 Opus, Sonnet, Haiku
機能:
- 200K コンテキストウィンドウ
- Constitutional AI
- XML タグ対応
エンドポイント:
https://api.anthropic.com/v1/messages
Google AI (Gemini)
モデル: Gemini Pro, Gemini Ultra
機能:
- マルチモーダル(テキスト、画像、動画)
- Grounding
- コード実行
基本的な統合構造
OpenAI Python SDK
1from openai import OpenAI 2 3client = OpenAI(api_key="sk-...") 4 5response = client.chat.completions.create( 6 model="gpt-4-turbo", 7 messages=[ 8 {"role": "system", "content": "You are a helpful assistant."}, 9 {"role": "user", "content": "Hello!"} 10 ], 11 temperature=0.7, 12 max_tokens=1000 13) 14 15print(response.choices[0].message.content)
Anthropic Python SDK
1from anthropic import Anthropic 2 3client = Anthropic(api_key="sk-ant-...") 4 5message = client.messages.create( 6 model="claude-3-opus-20240229", 7 max_tokens=1024, 8 messages=[ 9 {"role": "user", "content": "Hello!"} 10 ] 11) 12 13print(message.content[0].text)
ストリーミングレスポンス
1# OpenAI Streaming 2stream = client.chat.completions.create( 3 model="gpt-4-turbo", 4 messages=[{"role": "user", "content": "Tell me a long story"}], 5 stream=True 6) 7 8for chunk in stream: 9 if chunk.choices[0].delta.content: 10 print(chunk.choices[0].delta.content, end="")
エラーハンドリング
エラー種類
| Error Code | 説明 | 解決策 |
|---|---|---|
| 400 | 無効なリクエスト | リクエスト形式を確認 |
| 401 | 無効な API キー | API キーを確認 |
| 429 | レート制限 | バックオフして再試行 |
| 500 | サーバーエラー | Exponential backoff で再試行 |
| 503 | サービス利用不可 | 待機して再試行 |
ロバストなエラーハンドリング
1import time 2from openai import RateLimitError, APIError, APIConnectionError 3 4def call_llm_with_retry(messages, max_retries=3): 5 for attempt in range(max_retries): 6 try: 7 response = client.chat.completions.create( 8 model="gpt-4-turbo", 9 messages=messages 10 ) 11 return response.choices[0].message.content 12 13 except RateLimitError: 14 wait_time = 2 ** attempt # Exponential backoff 15 print(f"Rate limited. Waiting {wait_time}s...") 16 time.sleep(wait_time) 17 18 except APIConnectionError: 19 print("Connection error. Retrying...") 20 time.sleep(1) 21 22 except APIError as e: 23 print(f"API error: {e}") 24 if attempt == max_retries - 1: 25 raise 26 27 raise Exception("Max retries exceeded")
ジッター付き Exponential Backoff
1import random 2 3def exponential_backoff(attempt, base=1, max_wait=60): 4 wait = min(base * (2 ** attempt), max_wait) 5 jitter = random.uniform(0, wait * 0.1) 6 return wait + jitter 7## レート制限管理 8 9### レート制限の種類 10 11- **RPM (Requests Per Minute):** 1 分あたりのリクエスト数 12- **TPM (Tokens Per Minute):** 1 分あたりのトークン数 13- **RPD (Requests Per Day):** 1 日あたりのリクエスト数 14 15### トークンバケットアルゴリズム 16 17```python 18import time 19from threading import Lock 20 21class TokenBucket: 22 def __init__(self, tokens_per_second, max_tokens): 23 self.tokens_per_second = tokens_per_second 24 self.max_tokens = max_tokens 25 self.tokens = max_tokens 26 self.last_update = time.time() 27 self.lock = Lock() 28 29 def acquire(self, tokens=1): 30 with self.lock: 31 now = time.time() 32 elapsed = now - self.last_update 33 self.tokens = min( 34 self.max_tokens, 35 self.tokens + elapsed * self.tokens_per_second 36 ) 37 self.last_update = now 38 39 if self.tokens >= tokens: 40 self.tokens -= tokens 41 return True 42 return False 43 44 def wait_and_acquire(self, tokens=1): 45 while not self.acquire(tokens): 46 time.sleep(0.1) 47 48# Usage 49rate_limiter = TokenBucket(tokens_per_second=10, max_tokens=100) 50rate_limiter.wait_and_acquire() 51# API call...
キャッシング戦略
レスポンスキャッシング
1import hashlib 2import json 3from functools import lru_cache 4import redis 5 6redis_client = redis.Redis(host='localhost', port=6379, db=0) 7 8def get_cache_key(messages, model, temperature): 9 content = json.dumps({ 10 "messages": messages, 11 "model": model, 12 "temperature": temperature 13 }, sort_keys=True) 14 return hashlib.md5(content.encode()).hexdigest() 15 16def cached_llm_call(messages, model="gpt-4", temperature=0.7, ttl=3600): 17 cache_key = get_cache_key(messages, model, temperature) 18 19 # Check cache 20 cached = redis_client.get(cache_key) 21 if cached: 22 return json.loads(cached) 23 24 # API call 25 response = client.chat.completions.create( 26 model=model, 27 messages=messages, 28 temperature=temperature 29 ) 30 result = response.choices[0].message.content 31 32 # Save to cache 33 redis_client.setex(cache_key, ttl, json.dumps(result)) 34 35 return result
セマンティックキャッシング
類似クエリをキャッシュから提供:
1def semantic_cache_lookup(query, threshold=0.95): 2 query_embedding = get_embedding(query) 3 4 # Search similar query in Vector DB 5 results = vector_db.search( 6 vector=query_embedding, 7 top_k=1, 8 filter={"type": "cache"} 9 ) 10 11 if results and results[0].score >= threshold: 12 return results[0].metadata["response"] 13 14 return None
トークン管理
トークンカウント
1import tiktoken 2 3def count_tokens(text, model="gpt-4"): 4 encoding = tiktoken.encoding_for_model(model) 5 return len(encoding.encode(text)) 6 7def count_message_tokens(messages, model="gpt-4"): 8 encoding = tiktoken.encoding_for_model(model) 9 tokens = 0 10 11 for message in messages: 12 tokens += 4 # message overhead 13 for key, value in message.items(): 14 tokens += len(encoding.encode(value)) 15 16 tokens += 2 # reply overhead 17 return tokens
コンテキストウィンドウ管理
1def truncate_messages(messages, max_tokens=4000, model="gpt-4"): 2 total_tokens = count_message_tokens(messages, model) 3 4 while total_tokens > max_tokens and len(messages) > 2: 5 # Preserve System message, delete oldest user/assistant 6 messages.pop(1) 7 total_tokens = count_message_tokens(messages, model) 8 9 return messages 10## 非同期操作 11 12### Async クライアント 13 14```python 15import asyncio 16from openai import AsyncOpenAI 17 18async_client = AsyncOpenAI(api_key="sk-...") 19 20async def async_llm_call(prompt): 21 response = await async_client.chat.completions.create( 22 model="gpt-4-turbo", 23 messages=[{"role": "user", "content": prompt}] 24 ) 25 return response.choices[0].message.content 26 27async def batch_process(prompts): 28 tasks = [async_llm_call(p) for p in prompts] 29 results = await asyncio.gather(*tasks) 30 return results 31 32# Usage 33prompts = ["Question 1", "Question 2", "Question 3"] 34results = asyncio.run(batch_process(prompts))
同時実行とレート制限付きリクエスト
1import asyncio 2from asyncio import Semaphore 3 4async def rate_limited_call(semaphore, prompt): 5 async with semaphore: 6 response = await async_client.chat.completions.create( 7 model="gpt-4-turbo", 8 messages=[{"role": "user", "content": prompt}] 9 ) 10 return response.choices[0].message.content 11 12async def batch_with_rate_limit(prompts, max_concurrent=5): 13 semaphore = Semaphore(max_concurrent) 14 tasks = [rate_limited_call(semaphore, p) for p in prompts] 15 return await asyncio.gather(*tasks)
モニタリングとログ
リクエストログ
1import logging 2import time 3from functools import wraps 4 5logging.basicConfig(level=logging.INFO) 6logger = logging.getLogger(__name__) 7 8def log_llm_call(func): 9 @wraps(func) 10 def wrapper(*args, **kwargs): 11 start_time = time.time() 12 13 try: 14 result = func(*args, **kwargs) 15 duration = time.time() - start_time 16 17 logger.info(f"LLM Call Success", extra={ 18 "duration": duration, 19 "model": kwargs.get("model"), 20 "tokens_used": result.usage.total_tokens 21 }) 22 23 return result 24 25 except Exception as e: 26 duration = time.time() - start_time 27 logger.error(f"LLM Call Failed", extra={ 28 "duration": duration, 29 "error": str(e) 30 }) 31 raise 32 33 return wrapper
メトリクス収集
1from prometheus_client import Counter, Histogram 2 3llm_requests_total = Counter( 4 'llm_requests_total', 5 'Total LLM API requests', 6 ['model', 'status'] 7) 8 9llm_latency = Histogram( 10 'llm_request_latency_seconds', 11 'LLM request latency', 12 ['model'] 13) 14 15llm_tokens = Counter( 16 'llm_tokens_total', 17 'Total tokens used', 18 ['model', 'type'] # input, output 19)
セキュリティのベストプラクティス
API キー管理
1import os 2from dotenv import load_dotenv 3 4load_dotenv() 5 6# Get from Environment variable 7api_key = os.getenv("OPENAI_API_KEY") 8 9# Never hardcode! 10# ❌ api_key = "sk-..."
入力バリデーション
1def validate_input(text, max_length=10000): 2 if not text or not isinstance(text, str): 3 raise ValueError("Invalid input") 4 5 if len(text) > max_length: 6 raise ValueError(f"Input too long: {len(text)} > {max_length}") 7 8 # Injection check 9 dangerous_patterns = ["<script>", "{{", "{%"] 10 for pattern in dangerous_patterns: 11 if pattern in text.lower(): 12 raise ValueError("Potentially dangerous input") 13 14 return text.strip()
出力サニタイズ
1import html 2 3def sanitize_output(text): 4 # HTML escape 5 text = html.escape(text) 6 7 # PII masking 8 text = mask_pii(text) 9 10 return text 11## 本番環境アーキテクチャ 12
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Client │────▶│ API GW │────▶│ LLM Service │ └──────────────┘ │ (Rate Limit)│ └──────┬───────┘ └──────────────┘ │ ┌─────▼─────┐ ┌──────────────┐ │ Router │ │ Cache │◀───▶│ │ │ (Redis) │ └─────┬─────┘ └──────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ┌──────▼──────┐ ┌─────▼─────┐ ┌──────▼──────┐ │ OpenAI │ │ Anthropic │ │ Google │ └─────────────┘ └───────────┘ └─────────────┘
1 2## まとめ 3 4LLM API の統合は、慎重な計画と堅牢な実装が求められる重要なコンポーネントです。Rate limiting、キャッシュ、エラー処理、モニタリングなどの要素は、本番環境において極めて重要です。 5 6Veni AI では、エンタープライズ向け AI 統合の専門的なサポートを提供しています。プロジェクトに関するお問い合わせをお待ちしています。
