Veni AI
Développement d'API

Intégration d’API LLM : Bonnes pratiques et guide pour les entreprises

Guide complet pour l’intégration des API OpenAI, Anthropic et Google AI, la limitation de débit, la gestion des erreurs, la mise en cache et les implémentations prêtes pour la production dans les systèmes d’IA d’entreprise.

Veni AI Technical Team10 Ocak 20257 dk okuma
Intégration d’API LLM : Bonnes pratiques et guide pour les entreprises

Intégration d’API LLM : Bonnes pratiques et guide pour l’entreprise

L’intégration des API LLM dans les systèmes d’entreprise est essentielle pour des applications d’IA fiables et évolutives. Dans ce guide, nous examinons des stratégies d’intégration prêtes pour la production.

Comparaison des fournisseurs d’API

OpenAI API

Modèles : GPT, GPT Turbo, GPT-3.5 Turbo
Fonctionnalités :

  • Function calling
  • Mode JSON
  • Vision (analyse d’images)
  • DALL-E (génération d’images)

Endpoint :

https://api.openai.com/v1/chat/completions

Anthropic API

Modèles : Claude 3 Opus, Sonnet, Haiku
Fonctionnalités :

  • Fenêtre de contexte de 200K
  • Constitutional AI
  • Support des balises XML

Endpoint :

https://api.anthropic.com/v1/messages

Google AI (Gemini)

Modèles : Gemini Pro, Gemini Ultra
Fonctionnalités :

  • Multimodal (texte, image, vidéo)
  • Grounding
  • Exécution de code

Structure d’intégration de base

OpenAI Python SDK

1from openai import OpenAI 2 3client = OpenAI(api_key="sk-...") 4 5response = client.chat.completions.create( 6 model="gpt-4-turbo", 7 messages=[ 8 {"role": "system", "content": "You are a helpful assistant."}, 9 {"role": "user", "content": "Hello!"} 10 ], 11 temperature=0.7, 12 max_tokens=1000 13) 14 15print(response.choices[0].message.content)

Anthropic Python SDK

1from anthropic import Anthropic 2 3client = Anthropic(api_key="sk-ant-...") 4 5message = client.messages.create( 6 model="claude-3-opus-20240229", 7 max_tokens=1024, 8 messages=[ 9 {"role": "user", "content": "Hello!"} 10 ] 11) 12 13print(message.content[0].text)

Réponse en streaming

1# OpenAI Streaming 2stream = client.chat.completions.create( 3 model="gpt-4-turbo", 4 messages=[{"role": "user", "content": "Tell me a long story"}], 5 stream=True 6) 7 8for chunk in stream: 9 if chunk.choices[0].delta.content: 10 print(chunk.choices[0].delta.content, end="")

Gestion des erreurs

Types d’erreurs

Code erreurDescriptionSolution
400Requête invalideVérifier le format de la requête
401Clé API invalideVérifier la clé API
429Limite de débitRéessayer avec backoff
500Erreur serveurRéessayer avec backoff exponentiel
503Service indisponibleAttendre et réessayer

Gestion robuste des erreurs

1import time 2from openai import RateLimitError, APIError, APIConnectionError 3 4def call_llm_with_retry(messages, max_retries=3): 5 for attempt in range(max_retries): 6 try: 7 response = client.chat.completions.create( 8 model="gpt-4-turbo", 9 messages=messages 10 ) 11 return response.choices[0].message.content 12 13 except RateLimitError: 14 wait_time = 2 ** attempt # Exponential backoff 15 print(f"Rate limited. Waiting {wait_time}s...") 16 time.sleep(wait_time) 17 18 except APIConnectionError: 19 print("Connection error. Retrying...") 20 time.sleep(1) 21 22 except APIError as e: 23 print(f"API error: {e}") 24 if attempt == max_retries - 1: 25 raise 26 27 raise Exception("Max retries exceeded")

Backoff exponentiel avec jitter

1import random 2 3def exponential_backoff(attempt, base=1, max_wait=60): 4 wait = min(base * (2 ** attempt), max_wait) 5 jitter = random.uniform(0, wait * 0.1) 6 return wait + jitter 7## Gestion de la Limitation de Débit 8 9### Types de Limitation de Débit 10 11- **RPM (Requests Per Minute) :** Nombre de requêtes par minute 12- **TPM (Tokens Per Minute) :** Nombre de tokens par minute 13- **RPD (Requests Per Day) :** Nombre de requêtes par jour 14 15### Algorithme du Seau à Jetons (Token Bucket) 16 17```python 18import time 19from threading import Lock 20 21class TokenBucket: 22 def __init__(self, tokens_per_second, max_tokens): 23 self.tokens_per_second = tokens_per_second 24 self.max_tokens = max_tokens 25 self.tokens = max_tokens 26 self.last_update = time.time() 27 self.lock = Lock() 28 29 def acquire(self, tokens=1): 30 with self.lock: 31 now = time.time() 32 elapsed = now - self.last_update 33 self.tokens = min( 34 self.max_tokens, 35 self.tokens + elapsed * self.tokens_per_second 36 ) 37 self.last_update = now 38 39 if self.tokens >= tokens: 40 self.tokens -= tokens 41 return True 42 return False 43 44 def wait_and_acquire(self, tokens=1): 45 while not self.acquire(tokens): 46 time.sleep(0.1) 47 48# Usage 49rate_limiter = TokenBucket(tokens_per_second=10, max_tokens=100) 50rate_limiter.wait_and_acquire() 51# API call...

Stratégies de Mise en Cache

Mise en Cache des Réponses

1import hashlib 2import json 3from functools import lru_cache 4import redis 5 6redis_client = redis.Redis(host='localhost', port=6379, db=0) 7 8def get_cache_key(messages, model, temperature): 9 content = json.dumps({ 10 "messages": messages, 11 "model": model, 12 "temperature": temperature 13 }, sort_keys=True) 14 return hashlib.md5(content.encode()).hexdigest() 15 16def cached_llm_call(messages, model="gpt-4", temperature=0.7, ttl=3600): 17 cache_key = get_cache_key(messages, model, temperature) 18 19 # Check cache 20 cached = redis_client.get(cache_key) 21 if cached: 22 return json.loads(cached) 23 24 # API call 25 response = client.chat.completions.create( 26 model=model, 27 messages=messages, 28 temperature=temperature 29 ) 30 result = response.choices[0].message.content 31 32 # Save to cache 33 redis_client.setex(cache_key, ttl, json.dumps(result)) 34 35 return result

Mise en Cache Sémantique

Servir des requêtes similaires depuis le cache :

1def semantic_cache_lookup(query, threshold=0.95): 2 query_embedding = get_embedding(query) 3 4 # Search similar query in Vector DB 5 results = vector_db.search( 6 vector=query_embedding, 7 top_k=1, 8 filter={"type": "cache"} 9 ) 10 11 if results and results[0].score >= threshold: 12 return results[0].metadata["response"] 13 14 return None

Gestion des Tokens

Comptage des Tokens

1import tiktoken 2 3def count_tokens(text, model="gpt-4"): 4 encoding = tiktoken.encoding_for_model(model) 5 return len(encoding.encode(text)) 6 7def count_message_tokens(messages, model="gpt-4"): 8 encoding = tiktoken.encoding_for_model(model) 9 tokens = 0 10 11 for message in messages: 12 tokens += 4 # message overhead 13 for key, value in message.items(): 14 tokens += len(encoding.encode(value)) 15 16 tokens += 2 # reply overhead 17 return tokens

Gestion de la Fenêtre de Contexte

1def truncate_messages(messages, max_tokens=4000, model="gpt-4"): 2 total_tokens = count_message_tokens(messages, model) 3 4 while total_tokens > max_tokens and len(messages) > 2: 5 # Preserve System message, delete oldest user/assistant 6 messages.pop(1) 7 total_tokens = count_message_tokens(messages, model) 8 9 return messages 10## Opérations Asynchrones 11 12### Client Asynchrone 13 14```python 15import asyncio 16from openai import AsyncOpenAI 17 18async_client = AsyncOpenAI(api_key="sk-...") 19 20async def async_llm_call(prompt): 21 response = await async_client.chat.completions.create( 22 model="gpt-4-turbo", 23 messages=[{"role": "user", "content": prompt}] 24 ) 25 return response.choices[0].message.content 26 27async def batch_process(prompts): 28 tasks = [async_llm_call(p) for p in prompts] 29 results = await asyncio.gather(*tasks) 30 return results 31 32# Usage 33prompts = ["Question 1", "Question 2", "Question 3"] 34results = asyncio.run(batch_process(prompts))

Requêtes Concurrentes avec Limitation de Débit

1import asyncio 2from asyncio import Semaphore 3 4async def rate_limited_call(semaphore, prompt): 5 async with semaphore: 6 response = await async_client.chat.completions.create( 7 model="gpt-4-turbo", 8 messages=[{"role": "user", "content": prompt}] 9 ) 10 return response.choices[0].message.content 11 12async def batch_with_rate_limit(prompts, max_concurrent=5): 13 semaphore = Semaphore(max_concurrent) 14 tasks = [rate_limited_call(semaphore, p) for p in prompts] 15 return await asyncio.gather(*tasks)

Monitoring et Journalisation

Journalisation des Requêtes

1import logging 2import time 3from functools import wraps 4 5logging.basicConfig(level=logging.INFO) 6logger = logging.getLogger(__name__) 7 8def log_llm_call(func): 9 @wraps(func) 10 def wrapper(*args, **kwargs): 11 start_time = time.time() 12 13 try: 14 result = func(*args, **kwargs) 15 duration = time.time() - start_time 16 17 logger.info(f"LLM Call Success", extra={ 18 "duration": duration, 19 "model": kwargs.get("model"), 20 "tokens_used": result.usage.total_tokens 21 }) 22 23 return result 24 25 except Exception as e: 26 duration = time.time() - start_time 27 logger.error(f"LLM Call Failed", extra={ 28 "duration": duration, 29 "error": str(e) 30 }) 31 raise 32 33 return wrapper

Collecte de Métriques

1from prometheus_client import Counter, Histogram 2 3llm_requests_total = Counter( 4 'llm_requests_total', 5 'Total LLM API requests', 6 ['model', 'status'] 7) 8 9llm_latency = Histogram( 10 'llm_request_latency_seconds', 11 'LLM request latency', 12 ['model'] 13) 14 15llm_tokens = Counter( 16 'llm_tokens_total', 17 'Total tokens used', 18 ['model', 'type'] # input, output 19)

Bonnes Pratiques de Sécurité

Gestion des Clés API

1import os 2from dotenv import load_dotenv 3 4load_dotenv() 5 6# Get from Environment variable 7api_key = os.getenv("OPENAI_API_KEY") 8 9# Never hardcode! 10# ❌ api_key = "sk-..."

Validation des Entrées

1def validate_input(text, max_length=10000): 2 if not text or not isinstance(text, str): 3 raise ValueError("Invalid input") 4 5 if len(text) > max_length: 6 raise ValueError(f"Input too long: {len(text)} > {max_length}") 7 8 # Injection check 9 dangerous_patterns = ["<script>", "{{", "{%"] 10 for pattern in dangerous_patterns: 11 if pattern in text.lower(): 12 raise ValueError("Potentially dangerous input") 13 14 return text.strip()

Assainissement des Sorties

1import html 2 3def sanitize_output(text): 4 # HTML escape 5 text = html.escape(text) 6 7 # PII masking 8 text = mask_pii(text) 9 10 return text 11## Architecture de Production 12

┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Client │────▶│ API GW │────▶│ LLM Service │ └──────────────┘ │ (Rate Limit)│ └──────┬───────┘ └──────────────┘ │ ┌─────▼─────┐ ┌──────────────┐ │ Router │ │ Cache │◀───▶│ │ │ (Redis) │ └─────┬─────┘ └──────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ┌──────▼──────┐ ┌─────▼─────┐ ┌──────▼──────┐ │ OpenAI │ │ Anthropic │ │ Google │ └─────────────┘ └───────────┘ └─────────────┘

1 2## Conclusion 3 4L’intégration d’API LLM est un élément essentiel nécessitant une planification minutieuse et une mise en œuvre robuste. Des aspects tels que le rate limiting, la mise en cache, la gestion des erreurs et la supervision sont essentiels dans un environnement de production. 5 6Chez Veni AI, nous offrons un accompagnement expert pour les intégrations d’IA en entreprise. Contactez-nous pour vos projets.

İlgili Makaleler