Sales-агент на WebSocket

Обзор

Соберите sales-агента, который обрабатывает письма через WebSocket сразу по мере поступления. В отличие от вебхуков, не нужны ngrok и публичный URL — соединение к Agent Inbox исходящее, настройка минимальная.

Сценарий: менеджер передаёт агенту данные лида; агент ведёт переписку с клиентом и сигнализирует менеджеру о важных намерениях.

Что вы получите

  1. WebSocket — поток событий в реальном времени
  2. Письма менеджера — извлечение email клиента и персонализированный аутрич
  3. Ответы клиента — ответы ИИ с учётом контекста
  4. Уведомления менеджеру — при сильных сигналах интереса / отказа

Схема:

Manager sends email with customer info
Agent extracts customer email
Agent generates AI sales pitch → Sends to customer
Agent confirms to manager
[Customer replies]
Agent detects intent + generates AI response
If interested → Notifies manager

WebSocket и вебхук

КритерийВебхукWebSocket
Настройкаngrok + публичный URLБез лишних инструментов
АрхитектураFlask + HTTPAsync Python
ЗадержкаHTTP round-tripПоток
ФайрволОткрытый портТолько исходящие

Подробнее: справочник WebSocket и документация SDK.

Требования

Нужно:

Проект

Шаг 1: каталог

$mkdir sales-agent-websocket
$cd sales-agent-websocket

Шаг 2: main.py

1"""
2Sales Agent using Agent Inbox WebSocket
3
4This is a simple example showing how to:
5- Connect to Agent Inbox via WebSocket for real-time email processing
6- Use OpenAI to handle sales conversations
7- Send emails to customers and respond to replies
8"""
9
10import asyncio
11import os
12import re
13from dotenv import load_dotenv
14from agentinbox import AsyncAgentinbox, Subscribe, Subscribed, MessageReceivedEvent
15from openai import AsyncOpenAI
16
17# Load environment variables
18load_dotenv()
19
20# Initialize clients
21agentinbox = AsyncAgentinbox(api_key=os.getenv("AGENTINBOX_API_KEY"))
22openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
23
24# Simple conversation history (thread_id -> messages)
25conversations = {}
26
27# Store manager email for notifications
28manager_email = None
29
30
31def extract_email(from_field):
32 """Extract email address from 'Name <email@example.com>' format"""
33 match = re.search(r'<(.+?)>', from_field)
34 return match.group(1) if match else from_field
35
36
37def is_from_manager(email_body):
38 """Simple check if email is from sales manager (contains customer info)"""
39 keywords = ['customer', 'lead', 'contact', 'reach out', 'email']
40 return any(keyword in email_body.lower() for keyword in keywords)
41
42
43def extract_customer_info(email_body):
44 """Extract customer email from manager's message"""
45 email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
46 emails = re.findall(email_pattern, email_body)
47
48 # Return the first email found (should be the customer's email in the message body)
49 if emails:
50 return emails[0]
51 return None
52
53
54async def get_ai_response(messages, system_prompt):
55 """Get response from OpenAI"""
56 try:
57 response = await openai.chat.completions.create(
58 model="gpt-4o-mini",
59 messages=[
60 {"role": "system", "content": system_prompt},
61 *messages
62 ],
63 temperature=0.7,
64 )
65 return response.choices[0].message.content
66 except Exception as e:
67 print(f"Error getting AI response: {e}")
68 return "I apologize, but I encountered an error. Please try again."
69
70
71async def send_email(inbox_id, to_email, subject, body):
72 """Send a new email"""
73 try:
74 await agentinbox.inboxes.messages.send(
75 inbox_id=inbox_id,
76 to=[to_email],
77 subject=subject,
78 text=body
79 )
80 print(f"✓ Sent email to {to_email}")
81 except Exception as e:
82 print(f"Error sending email: {e}")
83
84
85async def reply_to_email(inbox_id, message_id, to_email, body):
86 """Reply to an email"""
87 try:
88 await agentinbox.inboxes.messages.reply(
89 inbox_id=inbox_id,
90 message_id=message_id,
91 to=[to_email], # Required parameter for replies
92 text=body
93 )
94 print(f"✓ Sent reply to {to_email}")
95 except Exception as e:
96 print(f"Error replying: {e}")
97
98
99async def handle_manager_email(inbox_id, message_id, from_email, subject, body):
100 """Handle email from sales manager - extract customer and send sales pitch"""
101 global manager_email
102 manager_email = from_email # Remember manager for future notifications
103
104 print(f"\n📧 Email from MANAGER: {from_email}")
105
106 # Extract customer email
107 customer_email = extract_customer_info(body)
108 print(f"→ Extracted customer email: {customer_email}")
109
110 if not customer_email:
111 await reply_to_email(
112 inbox_id,
113 message_id,
114 from_email, # Reply back to the manager
115 "I couldn't find a customer email address. Please include it in your message."
116 )
117 return
118
119 # Generate sales pitch using AI
120 system_prompt = """You are a helpful sales agent. Generate a brief, professional sales email
121 based on the manager's request. Keep it under 150 words. Be friendly and professional."""
122
123 messages = [{"role": "user", "content": f"Create a sales email based on this: {body}"}]
124 sales_pitch = await get_ai_response(messages, system_prompt)
125
126 # Send email to customer
127 await send_email(
128 inbox_id,
129 customer_email,
130 f"Introduction: {subject}" if subject else "Quick Introduction",
131 sales_pitch
132 )
133
134 # Confirm to manager
135 await reply_to_email(
136 inbox_id,
137 message_id,
138 from_email, # Reply back to the manager
139 f"✓ I've sent an introduction email to {customer_email}.\n\nHere's what I sent:\n\n{sales_pitch}"
140 )
141
142
143async def handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body):
144 """Handle email from customer - track conversation, detect intent, and notify manager"""
145 print(f"\n📧 Email from CUSTOMER: {from_email}")
146
147 # Track conversation history
148 if thread_id not in conversations:
149 conversations[thread_id] = []
150 conversations[thread_id].append({"role": "user", "content": body})
151
152 # Detect customer intent
153 intent_keywords = {
154 'interested': ['interested', 'demo', 'meeting', 'tell me more', 'sounds good'],
155 'not_interested': ['not interested', 'no thank', 'not right now', 'maybe later'],
156 'question': ['?', 'how', 'what', 'when', 'why', 'can you']
157 }
158
159 body_lower = body.lower()
160 intent = 'question' # default
161 for key, keywords in intent_keywords.items():
162 if any(keyword in body_lower for keyword in keywords):
163 intent = key
164 break
165
166 # Generate AI response
167 system_prompt = """You are a helpful sales agent. Answer customer questions professionally
168 and helpfully. Keep responses brief (under 100 words). Be friendly but professional."""
169
170 response = await get_ai_response(conversations[thread_id], system_prompt)
171
172 # Reply to customer
173 await reply_to_email(inbox_id, message_id, from_email, response)
174
175 # Notify manager if strong intent signal
176 if manager_email and intent in ['interested', 'not_interested']:
177 status = "showing interest" if intent == 'interested' else "not interested at this time"
178 await send_email(
179 inbox_id,
180 manager_email,
181 f"Update: {from_email}",
182 f"Customer {from_email} is {status}.\n\nTheir message:\n{body}\n\nMy response:\n{response}"
183 )
184 print(f"→ Notified manager about customer's {intent}")
185
186 # Update conversation history
187 conversations[thread_id].append({"role": "assistant", "content": response})
188
189
190async def handle_new_email(message):
191 """Process incoming email from WebSocket"""
192 try:
193 # Extract message data using object attributes
194 inbox_id = message.inbox_id
195 message_id = message.message_id
196 thread_id = message.thread_id
197 from_field = message.from_ or "" # SDK uses from_
198 from_email = extract_email(from_field)
199 subject = message.subject or ""
200 body = message.text or "" # SDK uses text for the body
201
202 print(f"\n{'='*60}")
203 print(f"New email from: {from_email}")
204 print(f"Subject: {subject}")
205 print(f"{'='*60}")
206
207 # Determine if from manager or customer
208 if is_from_manager(body):
209 await handle_manager_email(inbox_id, message_id, from_email, subject, body)
210 else:
211 await handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body)
212
213 except Exception as e:
214 print(f"Error handling email: {e}")
215
216
217async def main():
218 """Main WebSocket loop"""
219 inbox_username = os.getenv("INBOX_USERNAME", "sales-agent")
220 inbox_id = f"{inbox_username}@agentinbox.space"
221
222 print(f"\nSales Agent starting...")
223 print(f"Inbox: {inbox_id}")
224 print(f"✓ Connecting to Agent Inbox WebSocket...")
225
226 # Connect to WebSocket
227 try:
228 async with agentinbox.websockets.connect() as socket:
229 print(f"✓ Connected! Listening for emails...\n")
230
231 # Subscribe to inbox
232 await socket.send_subscribe(Subscribe(inbox_ids=[inbox_id]))
233
234 # Listen for events
235 async for event in socket:
236 if isinstance(event, Subscribed):
237 print(f"✓ Subscribed to: {event.inbox_ids}\n")
238
239 elif isinstance(event, MessageReceivedEvent):
240 print(f"📨 New email received!")
241 await handle_new_email(event.message)
242
243 except (KeyboardInterrupt, asyncio.CancelledError):
244 print("\n\nShutting down gracefully...")
245 except Exception as e:
246 print(f"\nError: {e}")
247
248
249def run():
250 """Run the main function"""
251 try:
252 asyncio.run(main())
253 except KeyboardInterrupt:
254 print("\n✓ Shutdown complete")
255
256
257if __name__ == "__main__":
258 run()

Шаг 3: requirements.txt

agentinbox>=0.0.19
openai>=1.0.0
python-dotenv>=1.0.0

Шаг 4: установка

$pip install -r requirements.txt
$# or with pyproject.toml
$pip install .

Шаг 5: .env

1# Agent Inbox Configuration
2AGENTINBOX_API_KEY=your_agentinbox_api_key_here
3
4# OpenAI Configuration
5OPENAI_API_KEY=your_openai_api_key_here
6
7# Inbox Settings
8INBOX_USERNAME=sales-agent

Замечание: в отличие от вебхуков, ngrok и публичный URL не нужны. WebSocket — исходящее соединение, работает за файрволом без проброса портов.

Разбор кода

Схема

┌─────────────────────────────────────────────────────────────┐
│ Your Python Script │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ AsyncAgent │────▶│ WebSocket │────▶│ Event │ │
│ │ Mail Client │ │ Connection │ │ Handler │ │
│ └──────────────┘ └──────────────┘ └─────────────┘ │
│ │ ▲ │ │
│ │ │ ▼ │
│ │ Real-time ┌─────────────┐ │
│ │ Events │ OpenAI │ │
│ │ │ Integration │ │
│ ▼ └─────────────┘ │
│ ┌──────────────┐ │
│ │ Send/Reply │◀──────────────────────────────────────────│
│ │ Emails │ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌──────────────────┐
│ Agent Inbox │
│ Cloud │
└──────────────────┘

1. Подключение WebSocket

Основа агента — постоянное соединение:

1from agentinbox import AsyncAgentinbox, Subscribe, Subscribed, MessageReceivedEvent
2
3# Initialize the async client
4agentinbox = AsyncAgentinbox(api_key=os.getenv("AGENTINBOX_API_KEY"))
5
6# Connect and subscribe
7async with agentinbox.websockets.connect() as socket:
8 await socket.send_subscribe(Subscribe(inbox_ids=[inbox_id]))

Кратко:

  • AsyncAgentinbox — асинхронный клиент
  • agentinbox.websockets.connect() — WebSocket
  • Subscribe — какие inbox слушать

2. Цикл событий

Асинхронный итератор по событиям:

1async for event in socket:
2 if isinstance(event, Subscribed):
3 print(f"✓ Subscribed to: {event.inbox_ids}")
4
5 elif isinstance(event, MessageReceivedEvent):
6 await handle_new_email(event.message)

Типы событий:

  • Subscribed — подписка подтверждена
  • MessageReceivedEvent — новое письмо

3. Маршрутизация писем

По содержимому письма:

1async def handle_new_email(message):
2 # Extract fields from the message object
3 inbox_id = message.inbox_id
4 message_id = message.message_id
5 thread_id = message.thread_id
6 from_field = message.from_ or "" # Note: SDK uses from_
7 from_email = extract_email(from_field)
8 subject = message.subject or ""
9 body = message.text or ""
10
11 # Route based on email content
12 if is_from_manager(body):
13 await handle_manager_email(inbox_id, message_id, from_email, subject, body)
14 else:
15 await handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body)

Разбор From:

1def extract_email(from_field):
2 """Extract email from 'Name <email@example.com>' format"""
3 match = re.search(r'<(.+?)>', from_field)
4 return match.group(1) if match else from_field

4. Письмо от менеджера

Если в теле есть данные лида:

1async def handle_manager_email(inbox_id, message_id, from_email, subject, body):
2 global manager_email
3 manager_email = from_email # Remember for notifications
4
5 # Extract customer email using regex
6 customer_email = extract_customer_info(body)
7
8 if not customer_email:
9 await reply_to_email(inbox_id, message_id, from_email,
10 "I couldn't find a customer email. Please include it.")
11 return
12
13 # Generate AI sales pitch
14 sales_pitch = await get_ai_response(
15 [{"role": "user", "content": f"Create a sales email based on: {body}"}],
16 "You are a helpful sales agent. Generate a brief, professional email..."
17 )
18
19 # Send to customer
20 await send_email(inbox_id, customer_email, f"Introduction: {subject}", sales_pitch)
21
22 # Confirm to manager
23 await reply_to_email(inbox_id, message_id, from_email,
24 f"✓ Sent email to {customer_email}.\n\nContent:\n{sales_pitch}")

5. Клиент и намерение

История треда и грубая классификация намерения по ключевым словам:

1async def handle_customer_email(inbox_id, message_id, thread_id, from_email, subject, body):
2 # Track conversation history per thread
3 if thread_id not in conversations:
4 conversations[thread_id] = []
5 conversations[thread_id].append({"role": "user", "content": body})
6
7 # Detect intent with keyword matching
8 intent_keywords = {
9 'interested': ['interested', 'demo', 'meeting', 'tell me more'],
10 'not_interested': ['not interested', 'no thank', 'maybe later'],
11 'question': ['?', 'how', 'what', 'when', 'why']
12 }
13
14 intent = 'question' # default
15 for key, keywords in intent_keywords.items():
16 if any(kw in body.lower() for kw in keywords):
17 intent = key
18 break
19
20 # Generate contextual AI response using conversation history
21 response = await get_ai_response(conversations[thread_id], system_prompt)
22
23 # Reply to customer
24 await reply_to_email(inbox_id, message_id, from_email, response)
25
26 # Notify manager of strong signals
27 if manager_email and intent in ['interested', 'not_interested']:
28 await send_email(inbox_id, manager_email, f"Update: {from_email}",
29 f"Customer is {intent}.\n\nTheir message:\n{body}")

6. Ответы OpenAI

Генерация текста:

1async def get_ai_response(messages, system_prompt):
2 try:
3 response = await openai.chat.completions.create(
4 model="gpt-4o-mini",
5 messages=[
6 {"role": "system", "content": system_prompt},
7 *messages # Include conversation history
8 ],
9 temperature=0.7,
10 )
11 return response.choices[0].message.content
12 except Exception as e:
13 print(f"Error: {e}")
14 return "I apologize, but I encountered an error."

Важно: в промпт попадает история переписки; при ошибке API — запасной текст.

Запуск

$python main.py

Ожидаемый вывод:

Sales Agent starting...
Inbox: sales-agent@agentinbox.space
✓ Connecting to Agent Inbox WebSocket...
✓ Connected! Listening for emails...
✓ Subscribed to: ['sales-agent@agentinbox.space']

Готово. Агент слушает почту в реальном времени. Не закрывайте терминал.

Проверка

Сценарий: запрос менеджера

Отправьте с личной почты:

To: sales-agent@agentinbox.space
Subject: New lead - AI startup
Body: Please reach out to this customer: customer-email@gmail.com
They're interested in our API platform.

Консоль:

============================================================
New email from: your-email@gmail.com
Subject: New lead - AI startup
============================================================
📧 Email from MANAGER: your-email@gmail.com
→ Extracted customer email: customer-email@gmail.com
✓ Sent email to customer-email@gmail.com
✓ Sent reply to your-email@gmail.com

В почте: подтверждение с текстом, отправленным клиенту.

Работает: события в реальном времени, ответы ИИ, уведомление менеджера при интересе.

Настройка

Ключевые слова намерения

В handle_customer_email() правьте intent_keywords:

1intent_keywords = {
2 'interested': ['interested', 'demo', 'meeting', 'pricing', 'sign up'],
3 'not_interested': ['not interested', 'unsubscribe', 'remove me'],
4 'question': ['?', 'how', 'what', 'when', 'can you'],
5 'urgent': ['urgent', 'asap', 'immediately'] # Add new intent
6}

Несколько inbox

Подписка на несколько ящиков:

1await socket.send_subscribe(Subscribe(inbox_ids=[
2 "sales-agent@agentinbox.space",
3 "support-agent@agentinbox.space",
4 "info@yourdomain.com"
5]))

Устранение неполадок

Частые случаи

Проблема: нет соединения с WebSocket Agent Inbox.

Что проверить:

  1. Проверьте, что API-ключ верный:
1client = AsyncAgentinbox(api_key="your-key")
2print(await client.inboxes.list()) # Должно выполниться без ошибки
  1. Проверьте интернет и настройки файрвола
  2. Убедитесь, что установлен agentinbox>=0.0.19 с поддержкой WebSocket:
$pip show agentinbox

Проблема: процесс запущен, событий нет.

Чеклист:

  1. Убедитесь, что inbox существует:
1client = AsyncAgentinbox()
2print(await client.inboxes.get("sales-agent@agentinbox.space"))
  1. Проверьте в консоли подтверждение подписки
  2. Отправьте тестовое письмо на правильный адрес inbox
  3. Убедитесь, что письмо не попадает в спам

Проблема: исключения в async-коде.

Что сделать:

  1. Убедитесь, что установлен Python 3.11+:
$python --version
  1. Не смешивайте синхронный и асинхронный код некорректно
  2. Используйте asyncio.run(main()) как точку входа

Идеи и проекты на Agent Inbox — info@patternautomation.com.