Veni AI
Desarrollo de API

Integración de API con LLM: Mejores Prácticas y Guía para Empresas

Guía completa para la integración de APIs de OpenAI, Anthropic y Google AI, limitación de velocidad, manejo de errores, almacenamiento en caché y una implementación lista para producción en sistemas de IA empresariales.

Veni AI Technical Team10 Ocak 20257 dk okuma
Integración de API con LLM: Mejores Prácticas y Guía para Empresas

Integración de API de LLM: Mejores prácticas y guía para empresas

Integrar APIs de LLM en sistemas empresariales es fundamental para aplicaciones de IA fiables y escalables. En esta guía, examinamos estrategias de integración listas para producción.

Comparación de proveedores de API

OpenAI API

Modelos: GPT, GPT Turbo, GPT-3.5 Turbo
Características:

  • Function calling
  • JSON mode
  • Vision (análisis de imágenes)
  • DALL-E (generación de imágenes)

Endpoint:

https://api.openai.com/v1/chat/completions

Anthropic API

Modelos: Claude 3 Opus, Sonnet, Haiku
Características:

  • Context window de 200K
  • Constitutional AI
  • Compatibilidad con etiquetas XML

Endpoint:

https://api.anthropic.com/v1/messages

Google AI (Gemini)

Modelos: Gemini Pro, Gemini Ultra
Características:

  • Multimodal (texto, imagen, video)
  • Grounding
  • Ejecución de código

Estructura básica de integración

OpenAI Python SDK

1from openai import OpenAI 2 3client = OpenAI(api_key="sk-...") 4 5response = client.chat.completions.create( 6 model="gpt-4-turbo", 7 messages=[ 8 {"role": "system", "content": "You are a helpful assistant."}, 9 {"role": "user", "content": "Hello!"} 10 ], 11 temperature=0.7, 12 max_tokens=1000 13) 14 15print(response.choices[0].message.content)

Anthropic Python SDK

1from anthropic import Anthropic 2 3client = Anthropic(api_key="sk-ant-...") 4 5message = client.messages.create( 6 model="claude-3-opus-20240229", 7 max_tokens=1024, 8 messages=[ 9 {"role": "user", "content": "Hello!"} 10 ] 11) 12 13print(message.content[0].text)

Respuesta en streaming

1# OpenAI Streaming 2stream = client.chat.completions.create( 3 model="gpt-4-turbo", 4 messages=[{"role": "user", "content": "Tell me a long story"}], 5 stream=True 6) 7 8for chunk in stream: 9 if chunk.choices[0].delta.content: 10 print(chunk.choices[0].delta.content, end="")

Manejo de errores

Tipos de error

Código de errorDescripciónSolución
400Solicitud inválidaRevisar el formato de la solicitud
401API key inválidaVerificar la API key
429Rate limitReintentar con backoff
500Error del servidorReintentar con backoff exponencial
503Servicio no disponibleEsperar y reintentar

Manejo de errores robusto

1import time 2from openai import RateLimitError, APIError, APIConnectionError 3 4def call_llm_with_retry(messages, max_retries=3): 5 for attempt in range(max_retries): 6 try: 7 response = client.chat.completions.create( 8 model="gpt-4-turbo", 9 messages=messages 10 ) 11 return response.choices[0].message.content 12 13 except RateLimitError: 14 wait_time = 2 ** attempt # Exponential backoff 15 print(f"Rate limited. Waiting {wait_time}s...") 16 time.sleep(wait_time) 17 18 except APIConnectionError: 19 print("Connection error. Retrying...") 20 time.sleep(1) 21 22 except APIError as e: 23 print(f"API error: {e}") 24 if attempt == max_retries - 1: 25 raise 26 27 raise Exception("Max retries exceeded")

Backoff exponencial con jitter

1import random 2 3def exponential_backoff(attempt, base=1, max_wait=60): 4 wait = min(base * (2 ** attempt), max_wait) 5 jitter = random.uniform(0, wait * 0.1) 6 return wait + jitter 7## Gestión de Limitación de Tasa 8 9### Tipos de Limitación de Tasa 10 11- **RPM (Requests Per Minute):** Número de solicitudes por minuto 12- **TPM (Tokens Per Minute):** Número de tokens por minuto 13- **RPD (Requests Per Day):** Número de solicitudes por día 14 15### Algoritmo de Token Bucket 16 17```python 18import time 19from threading import Lock 20 21class TokenBucket: 22 def __init__(self, tokens_per_second, max_tokens): 23 self.tokens_per_second = tokens_per_second 24 self.max_tokens = max_tokens 25 self.tokens = max_tokens 26 self.last_update = time.time() 27 self.lock = Lock() 28 29 def acquire(self, tokens=1): 30 with self.lock: 31 now = time.time() 32 elapsed = now - self.last_update 33 self.tokens = min( 34 self.max_tokens, 35 self.tokens + elapsed * self.tokens_per_second 36 ) 37 self.last_update = now 38 39 if self.tokens >= tokens: 40 self.tokens -= tokens 41 return True 42 return False 43 44 def wait_and_acquire(self, tokens=1): 45 while not self.acquire(tokens): 46 time.sleep(0.1) 47 48# Usage 49rate_limiter = TokenBucket(tokens_per_second=10, max_tokens=100) 50rate_limiter.wait_and_acquire() 51# API call...

Estrategias de Caching

Caching de Respuestas

1import hashlib 2import json 3from functools import lru_cache 4import redis 5 6redis_client = redis.Redis(host='localhost', port=6379, db=0) 7 8def get_cache_key(messages, model, temperature): 9 content = json.dumps({ 10 "messages": messages, 11 "model": model, 12 "temperature": temperature 13 }, sort_keys=True) 14 return hashlib.md5(content.encode()).hexdigest() 15 16def cached_llm_call(messages, model="gpt-4", temperature=0.7, ttl=3600): 17 cache_key = get_cache_key(messages, model, temperature) 18 19 # Check cache 20 cached = redis_client.get(cache_key) 21 if cached: 22 return json.loads(cached) 23 24 # API call 25 response = client.chat.completions.create( 26 model=model, 27 messages=messages, 28 temperature=temperature 29 ) 30 result = response.choices[0].message.content 31 32 # Save to cache 33 redis_client.setex(cache_key, ttl, json.dumps(result)) 34 35 return result

Caching Semántico

Servir consultas similares desde el cache:

1def semantic_cache_lookup(query, threshold=0.95): 2 query_embedding = get_embedding(query) 3 4 # Search similar query in Vector DB 5 results = vector_db.search( 6 vector=query_embedding, 7 top_k=1, 8 filter={"type": "cache"} 9 ) 10 11 if results and results[0].score >= threshold: 12 return results[0].metadata["response"] 13 14 return None

Gestión de Tokens

Conteo de Tokens

1import tiktoken 2 3def count_tokens(text, model="gpt-4"): 4 encoding = tiktoken.encoding_for_model(model) 5 return len(encoding.encode(text)) 6 7def count_message_tokens(messages, model="gpt-4"): 8 encoding = tiktoken.encoding_for_model(model) 9 tokens = 0 10 11 for message in messages: 12 tokens += 4 # message overhead 13 for key, value in message.items(): 14 tokens += len(encoding.encode(value)) 15 16 tokens += 2 # reply overhead 17 return tokens

Gestión de la Ventana de Contexto

1def truncate_messages(messages, max_tokens=4000, model="gpt-4"): 2 total_tokens = count_message_tokens(messages, model) 3 4 while total_tokens > max_tokens and len(messages) > 2: 5 # Preserve System message, delete oldest user/assistant 6 messages.pop(1) 7 total_tokens = count_message_tokens(messages, model) 8 9 return messages 10## Operaciones Asíncronas 11 12### Cliente Asíncrono 13 14```python 15import asyncio 16from openai import AsyncOpenAI 17 18async_client = AsyncOpenAI(api_key="sk-...") 19 20async def async_llm_call(prompt): 21 response = await async_client.chat.completions.create( 22 model="gpt-4-turbo", 23 messages=[{"role": "user", "content": prompt}] 24 ) 25 return response.choices[0].message.content 26 27async def batch_process(prompts): 28 tasks = [async_llm_call(p) for p in prompts] 29 results = await asyncio.gather(*tasks) 30 return results 31 32# Usage 33prompts = ["Question 1", "Question 2", "Question 3"] 34results = asyncio.run(batch_process(prompts))

Solicitudes Concurrentes con Límite de Tasa

1import asyncio 2from asyncio import Semaphore 3 4async def rate_limited_call(semaphore, prompt): 5 async with semaphore: 6 response = await async_client.chat.completions.create( 7 model="gpt-4-turbo", 8 messages=[{"role": "user", "content": prompt}] 9 ) 10 return response.choices[0].message.content 11 12async def batch_with_rate_limit(prompts, max_concurrent=5): 13 semaphore = Semaphore(max_concurrent) 14 tasks = [rate_limited_call(semaphore, p) for p in prompts] 15 return await asyncio.gather(*tasks)

Monitoreo y Registro

Registro de Solicitudes

1import logging 2import time 3from functools import wraps 4 5logging.basicConfig(level=logging.INFO) 6logger = logging.getLogger(__name__) 7 8def log_llm_call(func): 9 @wraps(func) 10 def wrapper(*args, **kwargs): 11 start_time = time.time() 12 13 try: 14 result = func(*args, **kwargs) 15 duration = time.time() - start_time 16 17 logger.info(f"LLM Call Success", extra={ 18 "duration": duration, 19 "model": kwargs.get("model"), 20 "tokens_used": result.usage.total_tokens 21 }) 22 23 return result 24 25 except Exception as e: 26 duration = time.time() - start_time 27 logger.error(f"LLM Call Failed", extra={ 28 "duration": duration, 29 "error": str(e) 30 }) 31 raise 32 33 return wrapper

Recolección de Métricas

1from prometheus_client import Counter, Histogram 2 3llm_requests_total = Counter( 4 'llm_requests_total', 5 'Total LLM API requests', 6 ['model', 'status'] 7) 8 9llm_latency = Histogram( 10 'llm_request_latency_seconds', 11 'LLM request latency', 12 ['model'] 13) 14 15llm_tokens = Counter( 16 'llm_tokens_total', 17 'Total tokens used', 18 ['model', 'type'] # input, output 19)

Mejores Prácticas de Seguridad

Gestión de API Keys

1import os 2from dotenv import load_dotenv 3 4load_dotenv() 5 6# Get from Environment variable 7api_key = os.getenv("OPENAI_API_KEY") 8 9# Never hardcode! 10# ❌ api_key = "sk-..."

Validación de Entrada

1def validate_input(text, max_length=10000): 2 if not text or not isinstance(text, str): 3 raise ValueError("Invalid input") 4 5 if len(text) > max_length: 6 raise ValueError(f"Input too long: {len(text)} > {max_length}") 7 8 # Injection check 9 dangerous_patterns = ["<script>", "{{", "{%"] 10 for pattern in dangerous_patterns: 11 if pattern in text.lower(): 12 raise ValueError("Potentially dangerous input") 13 14 return text.strip()

Sanitización de Salida

1import html 2 3def sanitize_output(text): 4 # HTML escape 5 text = html.escape(text) 6 7 # PII masking 8 text = mask_pii(text) 9 10 return text 11## Arquitectura de Producción 12

┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Client │────▶│ API GW │────▶│ LLM Service │ └──────────────┘ │ (Rate Limit)│ └──────┬───────┘ └──────────────┘ │ ┌─────▼─────┐ ┌──────────────┐ │ Router │ │ Cache │◀───▶│ │ │ (Redis) │ └─────┬─────┘ └──────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ┌──────▼──────┐ ┌─────▼─────┐ ┌──────▼──────┐ │ OpenAI │ │ Anthropic │ │ Google │ └─────────────┘ └───────────┘ └─────────────┘

1 2## Conclusión 3 4La integración de APIs de LLM es un componente crítico que requiere una planificación cuidadosa y una implementación sólida. Aspectos como la limitación de tasa, el caching, el manejo de errores y el monitoreo son vitales en un entorno de producción. 5 6En Veni AI, ofrecemos soporte experto en integraciones de IA para empresas. Contáctanos para tus proyectos.

İlgili Makaleler