AI Workflow Automation: Business Process Automation Guide
AI workflow automation is the automation of repetitive business processes using artificial intelligence. In this guide, we examine enterprise automation strategies.
Automation Categories
1. Document Processing
- Invoice processing
- Contract analysis
- Form data extraction
2. Communication Automation
- Email classification
- Auto-response
- Ticket routing
3. Data Processing
- Data extraction
- Data transformation
- Reporting
4. Decision Automation
- Approval processes
- Risk assessment
- Anomaly detection
Document Processing Pipeline
Invoice Processing
1from openai import OpenAI 2import base64 3 4client = OpenAI() 5 6def process_invoice(image_path: str) -> dict: 7 with open(image_path, "rb") as f: 8 image_data = base64.b64encode(f.read()).decode() 9 10 response = client.chat.completions.create( 11 model="gpt-4-vision-preview", 12 response_format={"type": "json_object"}, 13 messages=[ 14 { 15 "role": "system", 16 "content": """Extract information from the invoice: 17 { 18 "vendor": "string", 19 "invoice_number": "string", 20 "date": "YYYY-MM-DD", 21 "total": number, 22 "tax": number, 23 "items": [{"description": "string", "quantity": number, "price": number}] 24 }""" 25 }, 26 { 27 "role": "user", 28 "content": [ 29 {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}} 30 ] 31 } 32 ] 33 ) 34 35 return json.loads(response.choices[0].message.content)
Contract Analysis
1def analyze_contract(contract_text: str) -> dict: 2 response = client.chat.completions.create( 3 model="gpt-4-turbo", 4 response_format={"type": "json_object"}, 5 messages=[ 6 { 7 "role": "system", 8 "content": """Analyze the contract and extract the following information: 9 { 10 "parties": ["string"], 11 "effective_date": "YYYY-MM-DD", 12 "termination_date": "YYYY-MM-DD", 13 "key_obligations": ["string"], 14 "payment_terms": "string", 15 "risks": ["string"], 16 "unusual_clauses": ["string"] 17 }""" 18 }, 19 {"role": "user", "content": contract_text} 20 ] 21 ) 22 23 return json.loads(response.choices[0].message.content)
Email Automation
Email Classification
1def classify_email(email_content: str, categories: list) -> dict: 2 response = client.chat.completions.create( 3 model="gpt-4-turbo", 4 response_format={"type": "json_object"}, 5 messages=[ 6 { 7 "role": "system", 8 "content": f"""Classify the email: 9 Categories: {categories} 10 11 Response format: 12 {{ 13 "category": "string", 14 "confidence": 0.0-1.0, 15 "priority": "high|medium|low", 16 "sentiment": "positive|neutral|negative", 17 "requires_response": boolean, 18 "summary": "string" 19 }}""" 20 }, 21 {"role": "user", "content": email_content} 22 ] 23 ) 24 25 return json.loads(response.choices[0].message.content) 26 27# Usage 28categories = ["sales_inquiry", "support_request", "complaint", "partnership", "spam"] 29result = classify_email(email_body, categories)
Auto-Reply Generation
1def generate_auto_reply(email: dict, context: str = "") -> str: 2 response = client.chat.completions.create( 3 model="gpt-4-turbo", 4 messages=[ 5 { 6 "role": "system", 7 "content": f"""Generate a professional auto-reply. 8 Tone: Friendly but professional 9 Language: English 10 11 Company info: {context} 12 13 Email category: {email['category']} 14 Priority: {email['priority']} 15 """ 16 }, 17 {"role": "user", "content": f"Email content:\n{email['content']}\n\nSummary: {email['summary']}"} 18 ] 19 ) 20 21 return response.choices[0].message.content
Workflow Orchestration
Sequential Pipeline
1class WorkflowPipeline: 2 def __init__(self): 3 self.steps = [] 4 5 def add_step(self, name: str, function: callable, config: dict = None): 6 self.steps.append({ 7 "name": name, 8 "function": function, 9 "config": config or {} 10 }) 11 12 def execute(self, input_data: dict) -> dict: 13 result = input_data 14 15 for step in self.steps: 16 print(f"Executing: {step['name']}") 17 try: 18 result = step["function"](result, **step["config"]) 19 except Exception as e: 20 return {"error": str(e), "failed_step": step["name"]} 21 22 return result 23 24# Pipeline definition 25pipeline = WorkflowPipeline() 26pipeline.add_step("extract", extract_data) 27pipeline.add_step("validate", validate_data) 28pipeline.add_step("transform", transform_data) 29pipeline.add_step("load", load_to_database) 30 31result = pipeline.execute({"file_path": "invoice.pdf"})
Conditional Workflow
1class ConditionalWorkflow: 2 def __init__(self): 3 self.routes = {} 4 self.default_route = None 5 6 def add_route(self, condition: callable, handler: callable): 7 self.routes[condition] = handler 8 9 def set_default(self, handler: callable): 10 self.default_route = handler 11 12 def execute(self, data: dict) -> dict: 13 for condition, handler in self.routes.items(): 14 if condition(data): 15 return handler(data) 16 17 if self.default_route: 18 return self.default_route(data) 19 20 raise ValueError("No matching route found") 21 22# Usage 23workflow = ConditionalWorkflow() 24workflow.add_route( 25 lambda x: x["category"] == "support", 26 create_support_ticket 27) 28workflow.add_route( 29 lambda x: x["category"] == "sales", 30 forward_to_sales 31) 32workflow.set_default(archive_email)
Data Extraction
Table Extraction
1def extract_table_from_document(document_path: str) -> list: 2 # Convert document to image or parse PDF 3 images = convert_to_images(document_path) 4 5 all_tables = [] 6 for img in images: 7 response = client.chat.completions.create( 8 model="gpt-4-vision-preview", 9 response_format={"type": "json_object"}, 10 messages=[ 11 { 12 "role": "system", 13 "content": "Extract tables from image as JSON array. Each table should be a list of dicts." 14 }, 15 { 16 "role": "user", 17 "content": [ 18 {"type": "image_url", "image_url": {"url": img}} 19 ] 20 } 21 ] 22 ) 23 24 tables = json.loads(response.choices[0].message.content) 25 all_tables.extend(tables.get("tables", [])) 26 27 return all_tables
Entity Extraction
1def extract_entities(text: str, entity_types: list) -> dict: 2 response = client.chat.completions.create( 3 model="gpt-4-turbo", 4 response_format={"type": "json_object"}, 5 messages=[ 6 { 7 "role": "system", 8 "content": f"""Extract the following entities from text: {entity_types} 9 10 Format: 11 {{ 12 "entities": [ 13 {{"type": "string", "value": "string", "confidence": 0.0-1.0}} 14 ] 15 }}""" 16 }, 17 {"role": "user", "content": text} 18 ] 19 ) 20 21 return json.loads(response.choices[0].message.content) 22 23# Usage 24entities = extract_entities( 25 "John Doe paid $5,000 on January 15, 2024.", 26 ["PERSON", "DATE", "MONEY"] 27)
Approval Workflows
1class ApprovalWorkflow: 2 def __init__(self, rules: list): 3 self.rules = rules 4 5 def evaluate(self, request: dict) -> dict: 6 for rule in self.rules: 7 if rule["condition"](request): 8 return { 9 "decision": rule["action"], 10 "rule_name": rule["name"], 11 "approvers": rule.get("approvers", []), 12 "reason": rule.get("reason", "") 13 } 14 15 return {"decision": "manual_review", "reason": "No matching rule"} 16 17 def ai_evaluate(self, request: dict) -> dict: 18 """Complex decisions with LLM""" 19 response = client.chat.completions.create( 20 model="gpt-4-turbo", 21 response_format={"type": "json_object"}, 22 messages=[ 23 { 24 "role": "system", 25 "content": f"""Approval rules: {json.dumps(self.rules)} 26 27 Evaluate the request and decide: 28 {{ 29 "decision": "approve|reject|escalate", 30 "confidence": 0.0-1.0, 31 "reasoning": "string", 32 "risk_level": "low|medium|high" 33 }}""" 34 }, 35 {"role": "user", "content": json.dumps(request)} 36 ] 37 ) 38 39 return json.loads(response.choices[0].message.content) 40 41# Rules 42rules = [ 43 { 44 "name": "auto_approve_small", 45 "condition": lambda x: x["amount"] < 1000, 46 "action": "approve", 47 "reason": "Amount below threshold" 48 }, 49 { 50 "name": "manager_approval", 51 "condition": lambda x: x["amount"] < 10000, 52 "action": "route", 53 "approvers": ["manager"] 54 } 55] 56 57workflow = ApprovalWorkflow(rules)
Error Handling & Monitoring
1class AutomationMonitor: 2 def __init__(self): 3 self.executions = [] 4 5 def log_execution(self, workflow_name: str, input_data: dict, 6 result: dict, duration: float): 7 self.executions.append({ 8 "workflow": workflow_name, 9 "timestamp": datetime.now(), 10 "success": "error" not in result, 11 "duration": duration, 12 "input_size": len(str(input_data)), 13 "error": result.get("error") 14 }) 15 16 def get_metrics(self) -> dict: 17 if not self.executions: 18 return {} 19 20 successful = [e for e in self.executions if e["success"]] 21 22 return { 23 "total_executions": len(self.executions), 24 "success_rate": len(successful) / len(self.executions), 25 "avg_duration": sum(e["duration"] for e in self.executions) / len(self.executions), 26 "error_types": self._group_errors() 27 }
Integration Patterns
Webhook Trigger
1from fastapi import FastAPI, Request 2 3app = FastAPI() 4 5@app.post("/webhook/email") 6async def handle_email_webhook(request: Request): 7 data = await request.json() 8 9 # Email processing 10 classification = classify_email(data["body"], CATEGORIES) 11 12 if classification["requires_response"]: 13 reply = generate_auto_reply(data, COMPANY_CONTEXT) 14 send_email(data["from"], reply) 15 16 # Route to appropriate workflow 17 workflow = get_workflow(classification["category"]) 18 result = workflow.execute(data) 19 20 return {"status": "processed", "result": result}
Conclusion
AI workflow automation accelerates business processes and reduces error rates. Significant efficiency gains can be achieved with document processing, email automation, and decision support systems.
At Veni AI, we develop enterprise automation solutions.
