| 1 | """ |
| 2 | Sales Agent using Agent Inbox WebSocket |
| 3 | |
| 4 | This is a simple example showing how to: |
| 5 | - Connect to Agent Inbox via WebSocket for real-time email processing |
| 6 | - Use OpenAI to handle sales conversations |
| 7 | - Send emails to customers and respond to replies |
| 8 | """ |
| 9 | |
| 10 | import asyncio |
| 11 | import os |
| 12 | import re |
| 13 | from dotenv import load_dotenv |
| 14 | from agentinbox import AsyncAgentinbox, Subscribe, Subscribed, MessageReceivedEvent |
| 15 | from openai import AsyncOpenAI |
| 16 | |
| 17 | # Load environment variables |
| 18 | load_dotenv() |
| 19 | |
| 20 | # Initialize clients |
| 21 | agentinbox = AsyncAgentinbox(api_key=os.getenv("AGENTINBOX_API_KEY")) |
| 22 | openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
| 23 | |
| 24 | # Simple conversation history (thread_id -> messages) |
| 25 | conversations = {} |
| 26 | |
| 27 | # Store manager email for notifications |
| 28 | manager_email = None |
| 29 | |
| 30 | |
| 31 | def extract_email(from_field): |
| 32 | """Extract email address from 'Name <email@example.com>' format""" |
| 33 | match = re.search(r'<(.+?)>', from_field) |
| 34 | return match.group(1) if match else from_field |
| 35 | |
| 36 | |
| 37 | def is_from_manager(email_body): |
| 38 | """Simple check if email is from sales manager (contains customer info)""" |
| 39 | keywords = ['customer', 'lead', 'contact', 'reach out', 'email'] |
| 40 | return any(keyword in email_body.lower() for keyword in keywords) |
| 41 | |
| 42 | |
| 43 | def extract_customer_info(email_body): |
| 44 | """Extract customer email from manager's message""" |
| 45 | email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
| 46 | emails = re.findall(email_pattern, email_body) |
| 47 | |
| 48 | # Return the first email found (should be the customer's email in the message body) |
| 49 | if emails: |
| 50 | return emails[0] |
| 51 | return None |
| 52 | |
| 53 | |
| 54 | async def get_ai_response(messages, system_prompt): |
| 55 | """Get response from OpenAI""" |
| 56 | try: |
| 57 | response = await openai.chat.completions.create( |
| 58 | model="gpt-4o-mini", |
| 59 | messages=[ |
| 60 | {"role": "system", "content": system_prompt}, |
| 61 | *messages |
| 62 | ], |
| 63 | temperature=0.7, |
| 64 | ) |
| 65 | return response.choices[0].message.content |
| 66 | except Exception as e: |
| 67 | print(f"Error getting AI response: {e}") |
| 68 | return "I apologize, but I encountered an error. Please try again." |
| 69 | |
| 70 | |
| 71 | async def send_email(inbox_id, to_email, subject, body): |
| 72 | """Send a new email""" |
| 73 | try: |
| 74 | await agentinbox.inboxes.messages.send( |
| 75 | inbox_id=inbox_id, |
| 76 | to=[to_email], |
| 77 | subject=subject, |
| 78 | text=body |
| 79 | ) |
| 80 | print(f"✓ Sent email to {to_email}") |
| 81 | except Exception as e: |
| 82 | print(f"Error sending email: {e}") |
| 83 | |
| 84 | |
| 85 | async def reply_to_email(inbox_id, message_id, to_email, body): |
| 86 | """Reply to an email""" |
| 87 | try: |
| 88 | await agentinbox.inboxes.messages.reply( |
| 89 | inbox_id=inbox_id, |
| 90 | message_id=message_id, |
| 91 | to=[to_email], # Required parameter for replies |
| 92 | text=body |
| 93 | ) |
| 94 | print(f"✓ Sent reply to {to_email}") |
| 95 | except Exception as e: |
| 96 | print(f"Error replying: {e}") |
| 97 | |
| 98 | |
| 99 | async def handle_manager_email(inbox_id, message_id, from_email, subject, body): |
| 100 | """Handle email from sales manager - extract customer and send sales pitch""" |
| 101 | global manager_email |
| 102 | manager_email = from_email # Remember manager for future notifications |
| 103 | |
| 104 | print(f"\n📧 Email from MANAGER: {from_email}") |
| 105 | |
| 106 | # Extract customer email |
| 107 | customer_email = extract_customer_info(body) |
| 108 | print(f"→ Extracted customer email: {customer_email}") |
| 109 | |
| 110 | if not customer_email: |
| 111 | await reply_to_email( |
| 112 | inbox_id, |
| 113 | message_id, |
| 114 | from_email, # Reply back to the manager |
| 115 | "I couldn't find a customer email address. Please include it in your message." |
| 116 | ) |
| 117 | return |
| 118 | |
| 119 | # Generate sales pitch using AI |
| 120 | system_prompt = """You are a helpful sales agent. Generate a brief, professional sales email |
| 121 | based on the manager's request. Keep it under 150 words. Be friendly and professional.""" |
| 122 | |
| 123 | messages = [{"role": "user", "content": f"Create a sales email based on this: {body}"}] |
| 124 | sales_pitch = await get_ai_response(messages, system_prompt) |
| 125 | |
| 126 | # Send email to customer |
| 127 | await send_email( |
| 128 | inbox_id, |
| 129 | customer_email, |
| 130 | f"Introduction: {subject}" if subject else "Quick Introduction", |
| 131 | sales_pitch |
| 132 | ) |
| 133 | |
| 134 | # Confirm to manager |
| 135 | await reply_to_email( |
| 136 | inbox_id, |
| 137 | message_id, |
| 138 | from_email, # Reply back to the manager |
| 139 | f"✓ I've sent an introduction email to {customer_email}.\n\nHere's what I sent:\n\n{sales_pitch}" |
| 140 | ) |
| 141 | |
| 142 | |
| 143 | async def handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body): |
| 144 | """Handle email from customer - track conversation, detect intent, and notify manager""" |
| 145 | print(f"\n📧 Email from CUSTOMER: {from_email}") |
| 146 | |
| 147 | # Track conversation history |
| 148 | if thread_id not in conversations: |
| 149 | conversations[thread_id] = [] |
| 150 | conversations[thread_id].append({"role": "user", "content": body}) |
| 151 | |
| 152 | # Detect customer intent |
| 153 | intent_keywords = { |
| 154 | 'interested': ['interested', 'demo', 'meeting', 'tell me more', 'sounds good'], |
| 155 | 'not_interested': ['not interested', 'no thank', 'not right now', 'maybe later'], |
| 156 | 'question': ['?', 'how', 'what', 'when', 'why', 'can you'] |
| 157 | } |
| 158 | |
| 159 | body_lower = body.lower() |
| 160 | intent = 'question' # default |
| 161 | for key, keywords in intent_keywords.items(): |
| 162 | if any(keyword in body_lower for keyword in keywords): |
| 163 | intent = key |
| 164 | break |
| 165 | |
| 166 | # Generate AI response |
| 167 | system_prompt = """You are a helpful sales agent. Answer customer questions professionally |
| 168 | and helpfully. Keep responses brief (under 100 words). Be friendly but professional.""" |
| 169 | |
| 170 | response = await get_ai_response(conversations[thread_id], system_prompt) |
| 171 | |
| 172 | # Reply to customer |
| 173 | await reply_to_email(inbox_id, message_id, from_email, response) |
| 174 | |
| 175 | # Notify manager if strong intent signal |
| 176 | if manager_email and intent in ['interested', 'not_interested']: |
| 177 | status = "showing interest" if intent == 'interested' else "not interested at this time" |
| 178 | await send_email( |
| 179 | inbox_id, |
| 180 | manager_email, |
| 181 | f"Update: {from_email}", |
| 182 | f"Customer {from_email} is {status}.\n\nTheir message:\n{body}\n\nMy response:\n{response}" |
| 183 | ) |
| 184 | print(f"→ Notified manager about customer's {intent}") |
| 185 | |
| 186 | # Update conversation history |
| 187 | conversations[thread_id].append({"role": "assistant", "content": response}) |
| 188 | |
| 189 | |
| 190 | async def handle_new_email(message): |
| 191 | """Process incoming email from WebSocket""" |
| 192 | try: |
| 193 | # Extract message data using object attributes |
| 194 | inbox_id = message.inbox_id |
| 195 | message_id = message.message_id |
| 196 | thread_id = message.thread_id |
| 197 | from_field = message.from_ or "" # SDK uses from_ |
| 198 | from_email = extract_email(from_field) |
| 199 | subject = message.subject or "" |
| 200 | body = message.text or "" # SDK uses text for the body |
| 201 | |
| 202 | print(f"\n{'='*60}") |
| 203 | print(f"New email from: {from_email}") |
| 204 | print(f"Subject: {subject}") |
| 205 | print(f"{'='*60}") |
| 206 | |
| 207 | # Determine if from manager or customer |
| 208 | if is_from_manager(body): |
| 209 | await handle_manager_email(inbox_id, message_id, from_email, subject, body) |
| 210 | else: |
| 211 | await handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body) |
| 212 | |
| 213 | except Exception as e: |
| 214 | print(f"Error handling email: {e}") |
| 215 | |
| 216 | |
| 217 | async def main(): |
| 218 | """Main WebSocket loop""" |
| 219 | inbox_username = os.getenv("INBOX_USERNAME", "sales-agent") |
| 220 | inbox_id = f"{inbox_username}@agentinbox.space" |
| 221 | |
| 222 | print(f"\nSales Agent starting...") |
| 223 | print(f"Inbox: {inbox_id}") |
| 224 | print(f"✓ Connecting to Agent Inbox WebSocket...") |
| 225 | |
| 226 | # Connect to WebSocket |
| 227 | try: |
| 228 | async with agentinbox.websockets.connect() as socket: |
| 229 | print(f"✓ Connected! Listening for emails...\n") |
| 230 | |
| 231 | # Subscribe to inbox |
| 232 | await socket.send_subscribe(Subscribe(inbox_ids=[inbox_id])) |
| 233 | |
| 234 | # Listen for events |
| 235 | async for event in socket: |
| 236 | if isinstance(event, Subscribed): |
| 237 | print(f"✓ Subscribed to: {event.inbox_ids}\n") |
| 238 | |
| 239 | elif isinstance(event, MessageReceivedEvent): |
| 240 | print(f"📨 New email received!") |
| 241 | await handle_new_email(event.message) |
| 242 | |
| 243 | except (KeyboardInterrupt, asyncio.CancelledError): |
| 244 | print("\n\nShutting down gracefully...") |
| 245 | except Exception as e: |
| 246 | print(f"\nError: {e}") |
| 247 | |
| 248 | |
| 249 | def run(): |
| 250 | """Run the main function""" |
| 251 | try: |
| 252 | asyncio.run(main()) |
| 253 | except KeyboardInterrupt: |
| 254 | print("\n✓ Shutdown complete") |
| 255 | |
| 256 | |
| 257 | if __name__ == "__main__": |
| 258 | run() |