دمج واجهات LLM API: أفضل الممارسات ودليل المؤسسات
يُعد دمج واجهات LLM API في أنظمة المؤسسات أمرًا بالغ الأهمية لتطبيقات الذكاء الاصطناعي الموثوقة والقابلة للتوسع. في هذا الدليل، نستعرض استراتيجيات جاهزة للإنتاج.
مقارنة مزوّدي الـ API
OpenAI API
النماذج: GPT، GPT Turbo، GPT-3.5 Turbo
الميزات:
- Function calling
- وضع JSON
- Vision (تحليل الصور)
- DALL-E (توليد الصور)
المسار (Endpoint):
https://api.openai.com/v1/chat/completions
Anthropic API
النماذج: Claude 3 Opus، Sonnet، Haiku
الميزات:
- نافذة سياق 200K
- Constitutional AI
- دعم وسوم XML
المسار (Endpoint):
https://api.anthropic.com/v1/messages
Google AI (Gemini)
النماذج: Gemini Pro، Gemini Ultra
الميزات:
- متعدد الوسائط (نص، صورة، فيديو)
- Grounding
- تنفيذ الأكواد
الهيكل الأساسي للدمج
OpenAI Python SDK
1from openai import OpenAI 2 3client = OpenAI(api_key="sk-...") 4 5response = client.chat.completions.create( 6 model="gpt-4-turbo", 7 messages=[ 8 {"role": "system", "content": "You are a helpful assistant."}, 9 {"role": "user", "content": "Hello!"} 10 ], 11 temperature=0.7, 12 max_tokens=1000 13) 14 15print(response.choices[0].message.content)
Anthropic Python SDK
1from anthropic import Anthropic 2 3client = Anthropic(api_key="sk-ant-...") 4 5message = client.messages.create( 6 model="claude-3-opus-20240229", 7 max_tokens=1024, 8 messages=[ 9 {"role": "user", "content": "Hello!"} 10 ] 11) 12 13print(message.content[0].text)
استجابة البث (Streaming Response)
1# OpenAI Streaming 2stream = client.chat.completions.create( 3 model="gpt-4-turbo", 4 messages=[{"role": "user", "content": "Tell me a long story"}], 5 stream=True 6) 7 8for chunk in stream: 9 if chunk.choices[0].delta.content: 10 print(chunk.choices[0].delta.content, end="")
معالجة الأخطاء
أنواع الأخطاء
| Error Code | Description | Solution |
|---|---|---|
| 400 | طلب غير صالح | تحقق من تنسيق الطلب |
| 401 | مفتاح API غير صالح | تحقق من مفتاح API |
| 429 | Rate limit | أعد المحاولة مع backoff |
| 500 | خطأ في الخادم | أعد المحاولة مع exponential backoff |
| 503 | الخدمة غير متاحة | انتظر ثم أعد المحاولة |
معالجة قوية للأخطاء
1import time 2from openai import RateLimitError, APIError, APIConnectionError 3 4def call_llm_with_retry(messages, max_retries=3): 5 for attempt in range(max_retries): 6 try: 7 response = client.chat.completions.create( 8 model="gpt-4-turbo", 9 messages=messages 10 ) 11 return response.choices[0].message.content 12 13 except RateLimitError: 14 wait_time = 2 ** attempt # Exponential backoff 15 print(f"Rate limited. Waiting {wait_time}s...") 16 time.sleep(wait_time) 17 18 except APIConnectionError: 19 print("Connection error. Retrying...") 20 time.sleep(1) 21 22 except APIError as e: 23 print(f"API error: {e}") 24 if attempt == max_retries - 1: 25 raise 26 27 raise Exception("Max retries exceeded")
Exponential Backoff مع Jitter
1import random 2 3def exponential_backoff(attempt, base=1, max_wait=60): 4 wait = min(base * (2 ** attempt), max_wait) 5 jitter = random.uniform(0, wait * 0.1) 6 return wait + jitter 7## إدارة الحد من المعدل 8 9### أنواع حدود المعدل 10 11- **RPM (Requests Per Minute):** عدد الطلبات في الدقيقة 12- **TPM (Tokens Per Minute):** عدد التوكنز في الدقيقة 13- **RPD (Requests Per Day):** عدد الطلبات في اليوم 14 15### خوارزمية Token Bucket 16 17```python 18import time 19from threading import Lock 20 21class TokenBucket: 22 def __init__(self, tokens_per_second, max_tokens): 23 self.tokens_per_second = tokens_per_second 24 self.max_tokens = max_tokens 25 self.tokens = max_tokens 26 self.last_update = time.time() 27 self.lock = Lock() 28 29 def acquire(self, tokens=1): 30 with self.lock: 31 now = time.time() 32 elapsed = now - self.last_update 33 self.tokens = min( 34 self.max_tokens, 35 self.tokens + elapsed * self.tokens_per_second 36 ) 37 self.last_update = now 38 39 if self.tokens >= tokens: 40 self.tokens -= tokens 41 return True 42 return False 43 44 def wait_and_acquire(self, tokens=1): 45 while not self.acquire(tokens): 46 time.sleep(0.1) 47 48# Usage 49rate_limiter = TokenBucket(tokens_per_second=10, max_tokens=100) 50rate_limiter.wait_and_acquire() 51# API call...
استراتيجيات التخزين المؤقت
التخزين المؤقت للاستجابة
1import hashlib 2import json 3from functools import lru_cache 4import redis 5 6redis_client = redis.Redis(host='localhost', port=6379, db=0) 7 8def get_cache_key(messages, model, temperature): 9 content = json.dumps({ 10 "messages": messages, 11 "model": model, 12 "temperature": temperature 13 }, sort_keys=True) 14 return hashlib.md5(content.encode()).hexdigest() 15 16def cached_llm_call(messages, model="gpt-4", temperature=0.7, ttl=3600): 17 cache_key = get_cache_key(messages, model, temperature) 18 19 # Check cache 20 cached = redis_client.get(cache_key) 21 if cached: 22 return json.loads(cached) 23 24 # API call 25 response = client.chat.completions.create( 26 model=model, 27 messages=messages, 28 temperature=temperature 29 ) 30 result = response.choices[0].message.content 31 32 # Save to cache 33 redis_client.setex(cache_key, ttl, json.dumps(result)) 34 35 return result
التخزين المؤقت الدلالي
تقديم استعلامات مشابهة من التخزين المؤقت:
1def semantic_cache_lookup(query, threshold=0.95): 2 query_embedding = get_embedding(query) 3 4 # Search similar query in Vector DB 5 results = vector_db.search( 6 vector=query_embedding, 7 top_k=1, 8 filter={"type": "cache"} 9 ) 10 11 if results and results[0].score >= threshold: 12 return results[0].metadata["response"] 13 14 return None
إدارة التوكنز
عدّ التوكنز
1import tiktoken 2 3def count_tokens(text, model="gpt-4"): 4 encoding = tiktoken.encoding_for_model(model) 5 return len(encoding.encode(text)) 6 7def count_message_tokens(messages, model="gpt-4"): 8 encoding = tiktoken.encoding_for_model(model) 9 tokens = 0 10 11 for message in messages: 12 tokens += 4 # message overhead 13 for key, value in message.items(): 14 tokens += len(encoding.encode(value)) 15 16 tokens += 2 # reply overhead 17 return tokens
إدارة نافذة السياق
1def truncate_messages(messages, max_tokens=4000, model="gpt-4"): 2 total_tokens = count_message_tokens(messages, model) 3 4 while total_tokens > max_tokens and len(messages) > 2: 5 # Preserve System message, delete oldest user/assistant 6 messages.pop(1) 7 total_tokens = count_message_tokens(messages, model) 8 9 return messages 10## العمليات غير المتزامنة 11 12### العميل غير المتزامن 13 14```python 15import asyncio 16from openai import AsyncOpenAI 17 18async_client = AsyncOpenAI(api_key="sk-...") 19 20async def async_llm_call(prompt): 21 response = await async_client.chat.completions.create( 22 model="gpt-4-turbo", 23 messages=[{"role": "user", "content": prompt}] 24 ) 25 return response.choices[0].message.content 26 27async def batch_process(prompts): 28 tasks = [async_llm_call(p) for p in prompts] 29 results = await asyncio.gather(*tasks) 30 return results 31 32# Usage 33prompts = ["Question 1", "Question 2", "Question 3"] 34results = asyncio.run(batch_process(prompts))
الطلبات المتزامنة مع حدود المعدل
1import asyncio 2from asyncio import Semaphore 3 4async def rate_limited_call(semaphore, prompt): 5 async with semaphore: 6 response = await async_client.chat.completions.create( 7 model="gpt-4-turbo", 8 messages=[{"role": "user", "content": prompt}] 9 ) 10 return response.choices[0].message.content 11 12async def batch_with_rate_limit(prompts, max_concurrent=5): 13 semaphore = Semaphore(max_concurrent) 14 tasks = [rate_limited_call(semaphore, p) for p in prompts] 15 return await asyncio.gather(*tasks)
المراقبة والتسجيل
تسجيل الطلبات
1import logging 2import time 3from functools import wraps 4 5logging.basicConfig(level=logging.INFO) 6logger = logging.getLogger(__name__) 7 8def log_llm_call(func): 9 @wraps(func) 10 def wrapper(*args, **kwargs): 11 start_time = time.time() 12 13 try: 14 result = func(*args, **kwargs) 15 duration = time.time() - start_time 16 17 logger.info(f"LLM Call Success", extra={ 18 "duration": duration, 19 "model": kwargs.get("model"), 20 "tokens_used": result.usage.total_tokens 21 }) 22 23 return result 24 25 except Exception as e: 26 duration = time.time() - start_time 27 logger.error(f"LLM Call Failed", extra={ 28 "duration": duration, 29 "error": str(e) 30 }) 31 raise 32 33 return wrapper
جمع المقاييس
1from prometheus_client import Counter, Histogram 2 3llm_requests_total = Counter( 4 'llm_requests_total', 5 'Total LLM API requests', 6 ['model', 'status'] 7) 8 9llm_latency = Histogram( 10 'llm_request_latency_seconds', 11 'LLM request latency', 12 ['model'] 13) 14 15llm_tokens = Counter( 16 'llm_tokens_total', 17 'Total tokens used', 18 ['model', 'type'] # input, output 19)
أفضل ممارسات الأمان
إدارة مفتاح API
1import os 2from dotenv import load_dotenv 3 4load_dotenv() 5 6# Get from Environment variable 7api_key = os.getenv("OPENAI_API_KEY") 8 9# Never hardcode! 10# ❌ api_key = "sk-..."
التحقق من صحة الإدخال
1def validate_input(text, max_length=10000): 2 if not text or not isinstance(text, str): 3 raise ValueError("Invalid input") 4 5 if len(text) > max_length: 6 raise ValueError(f"Input too long: {len(text)} > {max_length}") 7 8 # Injection check 9 dangerous_patterns = ["<script>", "{{", "{%"] 10 for pattern in dangerous_patterns: 11 if pattern in text.lower(): 12 raise ValueError("Potentially dangerous input") 13 14 return text.strip()
تنقية المخرجات
1import html 2 3def sanitize_output(text): 4 # HTML escape 5 text = html.escape(text) 6 7 # PII masking 8 text = mask_pii(text) 9 10 return text 11## البنية الإنتاجية 12
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Client │────▶│ API GW │────▶│ LLM Service │ └──────────────┘ │ (Rate Limit)│ └──────┬───────┘ └──────────────┘ │ ┌─────▼─────┐ ┌──────────────┐ │ Router │ │ Cache │◀───▶│ │ │ (Redis) │ └─────┬─────┘ └──────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ┌──────▼──────┐ ┌─────▼─────┐ ┌──────▼──────┐ │ OpenAI │ │ Anthropic │ │ Google │ └─────────────┘ └───────────┘ └─────────────┘
1 2## الخلاصة 3 4يُعد دمج واجهات LLM API مكونًا بالغ الأهمية يتطلب تخطيطًا دقيقًا وتنفيذًا قويًا. تُعد قضايا مثل الحد من المعدل، والتخزين المؤقت، والتعامل مع الأخطاء، والمراقبة عناصر أساسية في بيئة الإنتاج. 5 6في Veni AI، نقدم دعمًا خبيرًا لعمليات دمج الذكاء الاصطناعي على مستوى المؤسسات. تواصل معنا لمشاريعك.
