Veni AI
API Development

LLM API Integration: Best Practices and Enterprise Guide

Comprehensive guide for OpenAI, Anthropic, Google AI API integration, rate limiting, error handling, caching, and production-ready implementation for enterprise AI systems.

Veni AI Technical TeamJanuary 10, 20256 min read
LLM API Integration: Best Practices and Enterprise Guide

LLM API Integration: Best Practices and Enterprise Guide

Integrating LLM APIs into enterprise systems is critical for reliable and scalable AI applications. In this guide, we examine production-ready integration strategies.

API Providers Comparison

OpenAI API

Models: GPT, GPT Turbo, GPT-3.5 Turbo Features:

  • Function calling
  • JSON mode
  • Vision (image analysis)
  • DALL-E (image generation)

Endpoint:

https://api.openai.com/v1/chat/completions

Anthropic API

Models: Claude 3 Opus, Sonnet, Haiku Features:

  • 200K context window
  • Constitutional AI
  • XML tag support

Endpoint:

https://api.anthropic.com/v1/messages

Google AI (Gemini)

Models: Gemini Pro, Gemini Ultra Features:

  • Multimodal (text, image, video)
  • Grounding
  • Code execution

Basic Integration Structure

OpenAI Python SDK

1from openai import OpenAI 2 3client = OpenAI(api_key="sk-...") 4 5response = client.chat.completions.create( 6 model="gpt-4-turbo", 7 messages=[ 8 {"role": "system", "content": "You are a helpful assistant."}, 9 {"role": "user", "content": "Hello!"} 10 ], 11 temperature=0.7, 12 max_tokens=1000 13) 14 15print(response.choices[0].message.content)

Anthropic Python SDK

1from anthropic import Anthropic 2 3client = Anthropic(api_key="sk-ant-...") 4 5message = client.messages.create( 6 model="claude-3-opus-20240229", 7 max_tokens=1024, 8 messages=[ 9 {"role": "user", "content": "Hello!"} 10 ] 11) 12 13print(message.content[0].text)

Streaming Response

1# OpenAI Streaming 2stream = client.chat.completions.create( 3 model="gpt-4-turbo", 4 messages=[{"role": "user", "content": "Tell me a long story"}], 5 stream=True 6) 7 8for chunk in stream: 9 if chunk.choices[0].delta.content: 10 print(chunk.choices[0].delta.content, end="")

Error Handling

Error Types

Error CodeDescriptionSolution
400Invalid requestCheck request format
401Invalid API keyVerify API key
429Rate limitRetry with backoff
500Server errorRetry with exponential backoff
503Service unavailableWait and retry

Robust Error Handling

1import time 2from openai import RateLimitError, APIError, APIConnectionError 3 4def call_llm_with_retry(messages, max_retries=3): 5 for attempt in range(max_retries): 6 try: 7 response = client.chat.completions.create( 8 model="gpt-4-turbo", 9 messages=messages 10 ) 11 return response.choices[0].message.content 12 13 except RateLimitError: 14 wait_time = 2 ** attempt # Exponential backoff 15 print(f"Rate limited. Waiting {wait_time}s...") 16 time.sleep(wait_time) 17 18 except APIConnectionError: 19 print("Connection error. Retrying...") 20 time.sleep(1) 21 22 except APIError as e: 23 print(f"API error: {e}") 24 if attempt == max_retries - 1: 25 raise 26 27 raise Exception("Max retries exceeded")

Exponential Backoff with Jitter

1import random 2 3def exponential_backoff(attempt, base=1, max_wait=60): 4 wait = min(base * (2 ** attempt), max_wait) 5 jitter = random.uniform(0, wait * 0.1) 6 return wait + jitter

Rate Limiting Management

Rate Limit Types

  • RPM (Requests Per Minute): Number of requests per minute
  • TPM (Tokens Per Minute): Number of tokens per minute
  • RPD (Requests Per Day): Number of requests per day

Token Bucket Algorithm

1import time 2from threading import Lock 3 4class TokenBucket: 5 def __init__(self, tokens_per_second, max_tokens): 6 self.tokens_per_second = tokens_per_second 7 self.max_tokens = max_tokens 8 self.tokens = max_tokens 9 self.last_update = time.time() 10 self.lock = Lock() 11 12 def acquire(self, tokens=1): 13 with self.lock: 14 now = time.time() 15 elapsed = now - self.last_update 16 self.tokens = min( 17 self.max_tokens, 18 self.tokens + elapsed * self.tokens_per_second 19 ) 20 self.last_update = now 21 22 if self.tokens >= tokens: 23 self.tokens -= tokens 24 return True 25 return False 26 27 def wait_and_acquire(self, tokens=1): 28 while not self.acquire(tokens): 29 time.sleep(0.1) 30 31# Usage 32rate_limiter = TokenBucket(tokens_per_second=10, max_tokens=100) 33rate_limiter.wait_and_acquire() 34# API call...

Caching Strategies

Response Caching

1import hashlib 2import json 3from functools import lru_cache 4import redis 5 6redis_client = redis.Redis(host='localhost', port=6379, db=0) 7 8def get_cache_key(messages, model, temperature): 9 content = json.dumps({ 10 "messages": messages, 11 "model": model, 12 "temperature": temperature 13 }, sort_keys=True) 14 return hashlib.md5(content.encode()).hexdigest() 15 16def cached_llm_call(messages, model="gpt-4", temperature=0.7, ttl=3600): 17 cache_key = get_cache_key(messages, model, temperature) 18 19 # Check cache 20 cached = redis_client.get(cache_key) 21 if cached: 22 return json.loads(cached) 23 24 # API call 25 response = client.chat.completions.create( 26 model=model, 27 messages=messages, 28 temperature=temperature 29 ) 30 result = response.choices[0].message.content 31 32 # Save to cache 33 redis_client.setex(cache_key, ttl, json.dumps(result)) 34 35 return result

Semantic Caching

Serve similar queries from cache:

1def semantic_cache_lookup(query, threshold=0.95): 2 query_embedding = get_embedding(query) 3 4 # Search similar query in Vector DB 5 results = vector_db.search( 6 vector=query_embedding, 7 top_k=1, 8 filter={"type": "cache"} 9 ) 10 11 if results and results[0].score >= threshold: 12 return results[0].metadata["response"] 13 14 return None

Token Management

Token Counting

1import tiktoken 2 3def count_tokens(text, model="gpt-4"): 4 encoding = tiktoken.encoding_for_model(model) 5 return len(encoding.encode(text)) 6 7def count_message_tokens(messages, model="gpt-4"): 8 encoding = tiktoken.encoding_for_model(model) 9 tokens = 0 10 11 for message in messages: 12 tokens += 4 # message overhead 13 for key, value in message.items(): 14 tokens += len(encoding.encode(value)) 15 16 tokens += 2 # reply overhead 17 return tokens

Context Window Management

1def truncate_messages(messages, max_tokens=4000, model="gpt-4"): 2 total_tokens = count_message_tokens(messages, model) 3 4 while total_tokens > max_tokens and len(messages) > 2: 5 # Preserve System message, delete oldest user/assistant 6 messages.pop(1) 7 total_tokens = count_message_tokens(messages, model) 8 9 return messages

Async Operations

Async Client

1import asyncio 2from openai import AsyncOpenAI 3 4async_client = AsyncOpenAI(api_key="sk-...") 5 6async def async_llm_call(prompt): 7 response = await async_client.chat.completions.create( 8 model="gpt-4-turbo", 9 messages=[{"role": "user", "content": prompt}] 10 ) 11 return response.choices[0].message.content 12 13async def batch_process(prompts): 14 tasks = [async_llm_call(p) for p in prompts] 15 results = await asyncio.gather(*tasks) 16 return results 17 18# Usage 19prompts = ["Question 1", "Question 2", "Question 3"] 20results = asyncio.run(batch_process(prompts))

Concurrent Rate-Limited Requests

1import asyncio 2from asyncio import Semaphore 3 4async def rate_limited_call(semaphore, prompt): 5 async with semaphore: 6 response = await async_client.chat.completions.create( 7 model="gpt-4-turbo", 8 messages=[{"role": "user", "content": prompt}] 9 ) 10 return response.choices[0].message.content 11 12async def batch_with_rate_limit(prompts, max_concurrent=5): 13 semaphore = Semaphore(max_concurrent) 14 tasks = [rate_limited_call(semaphore, p) for p in prompts] 15 return await asyncio.gather(*tasks)

Monitoring and Logging

Request Logging

1import logging 2import time 3from functools import wraps 4 5logging.basicConfig(level=logging.INFO) 6logger = logging.getLogger(__name__) 7 8def log_llm_call(func): 9 @wraps(func) 10 def wrapper(*args, **kwargs): 11 start_time = time.time() 12 13 try: 14 result = func(*args, **kwargs) 15 duration = time.time() - start_time 16 17 logger.info(f"LLM Call Success", extra={ 18 "duration": duration, 19 "model": kwargs.get("model"), 20 "tokens_used": result.usage.total_tokens 21 }) 22 23 return result 24 25 except Exception as e: 26 duration = time.time() - start_time 27 logger.error(f"LLM Call Failed", extra={ 28 "duration": duration, 29 "error": str(e) 30 }) 31 raise 32 33 return wrapper

Metrics Collection

1from prometheus_client import Counter, Histogram 2 3llm_requests_total = Counter( 4 'llm_requests_total', 5 'Total LLM API requests', 6 ['model', 'status'] 7) 8 9llm_latency = Histogram( 10 'llm_request_latency_seconds', 11 'LLM request latency', 12 ['model'] 13) 14 15llm_tokens = Counter( 16 'llm_tokens_total', 17 'Total tokens used', 18 ['model', 'type'] # input, output 19)

Security Best Practices

API Key Management

1import os 2from dotenv import load_dotenv 3 4load_dotenv() 5 6# Get from Environment variable 7api_key = os.getenv("OPENAI_API_KEY") 8 9# Never hardcode! 10# ❌ api_key = "sk-..."

Input Validation

1def validate_input(text, max_length=10000): 2 if not text or not isinstance(text, str): 3 raise ValueError("Invalid input") 4 5 if len(text) > max_length: 6 raise ValueError(f"Input too long: {len(text)} > {max_length}") 7 8 # Injection check 9 dangerous_patterns = ["<script>", "{{", "{%"] 10 for pattern in dangerous_patterns: 11 if pattern in text.lower(): 12 raise ValueError("Potentially dangerous input") 13 14 return text.strip()

Output Sanitization

1import html 2 3def sanitize_output(text): 4 # HTML escape 5 text = html.escape(text) 6 7 # PII masking 8 text = mask_pii(text) 9 10 return text

Production Architecture

1┌──────────────┐ ┌──────────────┐ ┌──────────────┐ 2│ Client │────▶│ API GW │────▶│ LLM Service │ 3└──────────────┘ │ (Rate Limit)│ └──────┬───────┘ 4 └──────────────┘ │ 5 ┌─────▼─────┐ 6 ┌──────────────┐ │ Router │ 7 │ Cache │◀───▶│ │ 8 │ (Redis) │ └─────┬─────┘ 9 └──────────────┘ │ 10 ┌──────────────────┼──────────────────┐ 11 │ │ │ 12 ┌──────▼──────┐ ┌─────▼─────┐ ┌──────▼──────┐ 13 │ OpenAI │ │ Anthropic │ │ Google │ 14 └─────────────┘ └───────────┘ └─────────────┘

Conclusion

LLM API integration is a critical component requiring careful planning and robust implementation. Issues like rate limiting, caching, error handling, and monitoring are vital in a production environment.

At Veni AI, we offer expert support in enterprise AI integrations. Contact us for your projects.

İlgili Makaleler