OpenClaw + LLM 电商平台集成指南

First Post:

Last Update:

OpenClaw + LLM 电商平台集成指南

📦 依赖安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 基础依赖
pip install openclaw-python-sdk
pip install langchain
pip install openai # 或 anthropic, cohere

# 向量数据库(选一个)
pip install pinecone-client
# 或
pip install qdrant-client
# 或
pip install chromadb

# 数据处理
pip install pandas numpy
pip install faiss-cpu # 或 faiss-gpu

🗄️ 数据准备

1. 商品数据向量化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone

# 准备商品数据
products = [
{
"id": "p001",
"name": "Apple iPhone 15 Pro",
"description": "A17 Pro芯片,钛金属设计,48MP相机",
"price": 8999,
"category": "手机"
},
# ... 更多商品
]

# 生成向量 embeddings
embeddings = OpenAIEmbeddings()
vectorstore = Pinecone.from_documents(
documents=[text_to_doc(p) for p in products],
embedding=embeddings,
index_name="ecommerce-products"
)

2. 构建知识库

1
2
3
4
5
6
7
8
9
10
11
knowledge_base = {
"faq": [
"Q: 如何退换货?\nA: 支持7天无理由退换货...",
"Q: 物流多久能到?\nA: 大部分地区2-3天..."
],
"policies": [
"退款政策: ...",
"隐私政策: ..."
],
"product_details": products
}

🤖 OpenClaw Agent 配置

基础配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from openclaw import Agent

# 创建电商智能客服 Agent
ecommerce_agent = Agent(
name="电商智能助手",
instructions="""
你是一个专业的电商智能客服助手,负责:
1. 回答用户关于商品、订单、物流的问题
2. 根据用户需求推荐合适商品
3. 处理售后问题和投诉
4. 引导用户完成购买流程

重要规则:
- 始终基于真实商品数据回答
- 推荐时要考虑用户历史偏好
- 遇到无法确定的问题,引导用户联系人工客服
- 保持友好、专业的语气
""",
tools=[search_products, get_order_status, recommend_products],
model="gpt-4", # 或其他 LLM
)

🔌 工具函数实现

工具1:商品搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import List, Dict

@ecommerce_agent.tool
def search_products(query: str, category: str = None, price_range: tuple = None) -> List[Dict]:
"""
搜索商品

Args:
query: 搜索关键词
category: 商品类别(可选)
price_range: 价格范围 (min, max)(可选)

Returns:
匹配的商品列表
"""
# 从向量数据库搜索
results = vectorstore.similarity_search(query, k=10)

# 过滤条件
filtered = []
for item in results:
product = item.metadata
if category and product.get('category') != category:
continue
if price_range:
min_p, max_p = price_range
if not (min_p <= product['price'] <= max_p):
continue
filtered.append(product)

return filtered

工具2:获取订单状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@ecommerce_agent.tool
def get_order_status(order_id: str, user_id: str) -> Dict:
"""
查询订单状态

Args:
order_id: 订单号
user_id: 用户ID

Returns:
订单详细信息
"""
# 连接电商平台数据库
order = ecommerce_db.query(
"SELECT * FROM orders WHERE id = ? AND user_id = ?",
(order_id, user_id)
)

if not order:
return {"error": "订单不存在"}

return {
"order_id": order.id,
"status": order.status,
"products": order.items,
"total": order.total_amount,
"shipping_address": order.address,
"estimated_delivery": order.estimated_date
}

工具3:智能推荐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@ecommerce_agent.tool
def recommend_products(user_id: str, limit: int = 5) -> List[Dict]:
"""
基于用户历史和对话上下文的商品推荐

Args:
user_id: 用户ID
limit: 推荐数量

Returns:
推荐商品列表
"""
# 获取用户历史行为
user_history = get_user_history(user_id)

# 提取偏好特征
preferences = extract_preferences(user_history)

# 协同过滤 + 内容过滤
recommendations = []

# 方法1: 基于相似用户(协同过滤)
similar_users = find_similar_users(user_id, top_k=20)
for similar_user in similar_users:
products = get_user_purchases(similar_user['user_id'])
recommendations.extend(products)

# 方法2: 基于内容相似度(内容过滤)
for purchase in user_history:
similar_products = find_similar_products(
purchase['product_id'],
top_k=3
)
recommendations.extend(similar_products)

# 去重和排序
recommendations = dedup_and_rank(recommendations, preferences)

return recommendations[:limit]

🔄 对话管理

会话状态管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class ConversationManager:
def __init__(self):
self.sessions = {}

def create_session(self, user_id: str) -> str:
"""创建新会话"""
session_id = generate_uuid()
self.sessions[session_id] = {
"user_id": user_id,
"messages": [],
"context": {},
"start_time": datetime.now()
}
return session_id

def add_message(self, session_id: str, role: str, content: str):
"""添加消息到会话"""
if session_id in self.sessions:
self.sessions[session_id]["messages"].append({
"role": role,
"content": content,
"timestamp": datetime.now()
})

def get_context(self, session_id: str) -> Dict:
"""获取会话上下文"""
return self.sessions.get(session_id, {}).get("context", {})

def update_context(self, session_id: str, key: str, value: any):
"""更新会话上下文"""
if session_id in self.sessions:
self.sessions[session_id]["context"][key] = value

处理用户请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def handle_user_message(session_id: str, user_message: str):
"""处理用户消息"""

# 1. 添加用户消息到会话
conv_manager.add_message(session_id, "user", user_message)

# 2. 获取会话历史和上下文
messages = conv_manager.sessions[session_id]["messages"]
context = conv_manager.get_context(session_id)

# 3. 调用 OpenClaw Agent
response = await ecommerce_agent.arun(
message=user_message,
context=context,
conversation_history=messages
)

# 4. 添加助手回复到会话
conv_manager.add_message(session_id, "assistant", response)

# 5. 提取并更新上下文
extracted_info = extract_entities(user_message, response)
for key, value in extracted_info.items():
conv_manager.update_context(session_id, key, value)

return response

🌐 API 集成

FastAPI 后端接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class ChatRequest(BaseModel):
user_id: str
message: str
session_id: str = None

class ChatResponse(BaseModel):
response: str
session_id: str
suggestions: List[str] = []

@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""智能客服对话接口"""
try:
# 创建或获取会话
if not request.session_id:
session_id = conv_manager.create_session(request.user_id)
else:
session_id = request.session_id

# 处理消息
response = await handle_user_message(session_id, request.message)

# 生成建议回复(快速回复按钮)
suggestions = generate_suggestions(response)

return ChatResponse(
response=response,
session_id=session_id,
suggestions=suggestions
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/recommendations")
async def get_recommendations(user_id: str, limit: int = 5):
"""获取商品推荐"""
try:
recommendations = recommend_products(user_id, limit)
return {"recommendations": recommendations}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/search")
async def search_products_api(
query: str,
category: str = None,
min_price: float = None,
max_price: float = None
):
"""商品搜索接口"""
try:
results = search_products(
query=query,
category=category,
price_range=(min_price, max_price) if min_price and max_price else None
)
return {"results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

🎨 前端集成示例

React 组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import React, { useState, useEffect } from 'react';

function EcommerceChat() {
const [messages, setMessages] = useState([]);
const [input, setInput] = useState('');
const [sessionId, setSessionId] = useState('');
const [loading, setLoading] = useState(false);

useEffect(() => {
// 初始化会话
initializeChat();
}, []);

const initializeChat = async () => {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
user_id: getUserId(),
message: "你好,我想购物"
})
});
const data = await response.json();
setSessionId(data.session_id);
setMessages([
{ role: 'assistant', content: data.response }
]);
};

const sendMessage = async () => {
if (!input.trim()) return;

setLoading(true);

// 添加用户消息
const newMessages = [...messages, { role: 'user', content: input }];
setMessages(newMessages);

try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
user_id: getUserId(),
message: input,
session_id: sessionId
})
});
const data = await response.json();

// 添加助手回复
setMessages([
...newMessages,
{ role: 'assistant', content: data.response }
]);
setInput('');
} catch (error) {
console.error('Error:', error);
} finally {
setLoading(false);
}
};

return (
<div className="chat-container">
<div className="messages">
{messages.map((msg, idx) => (
<div key={idx} className={`message ${msg.role}`}>
{msg.content}
</div>
))}
</div>
<div className="input-area">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendMessage()}
placeholder="输入您的问题..."
/>
<button onClick={sendMessage} disabled={loading}>
{loading ? '发送中...' : '发送'}
</button>
</div>
</div>
);
}

export default EcommerceChat;

📊 监控和分析

性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from prometheus_client import Counter, Histogram

# 指标定义
chat_requests_total = Counter(
'chat_requests_total',
'Total chat requests',
['user_id', 'status']
)

chat_duration = Histogram(
'chat_duration_seconds',
'Chat request duration'
)

recommendation_clicks = Counter(
'recommendation_clicks_total',
'Total recommendation clicks',
['product_id', 'user_id']
)

# 使用示例
@chat_duration.time()
def handle_chat_request(user_id: str, message: str):
try:
response = process_message(message)
chat_requests_total.labels(
user_id=user_id,
status='success'
).inc()
return response
except Exception:
chat_requests_total.labels(
user_id=user_id,
status='error'
).inc()
raise

数据分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def analyze_user_conversations(user_id: str):
"""分析用户对话数据"""
conversations = get_user_conversations(user_id)

# 意图分析
intents = []
for conv in conversations:
intent = classify_intent(conv['messages'])
intents.append(intent)

# 主题分析
topics = extract_topics(conversations)

# 情感分析
sentiments = analyze_sentiments(conversations)

return {
"intents": intents,
"topics": topics,
"sentiments": sentiments,
"engagement_rate": calculate_engagement(conversations)
}

🚀 部署建议

容器化部署

1
2
3
4
5
6
7
8
9
10
11
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
version: '3.8'

services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=${DATABASE_URL}
depends_on:
- redis
- postgres

redis:
image: redis:alpine

postgres:
image: postgres:15
environment:
- POSTGRES_DB=ecommerce
- POSTGRES_USER=admin
- POSTGRES_PASSWORD=secret
volumes:
- postgres_data:/var/lib/postgresql/data

volumes:
postgres_data:

📈 优化策略

1. 缓存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from functools import lru_cache
from redis import Redis

redis_client = Redis()

@lru_cache(maxsize=1000)
def get_product_info(product_id: str):
"""商品信息缓存"""
# 先查 Redis
cached = redis_client.get(f"product:{product_id}")
if cached:
return json.loads(cached)

# 查数据库
product = db.query_product(product_id)

# 写入缓存
redis_client.setex(
f"product:{product_id}",
3600, # 1小时过期
json.dumps(product)
)

return product

2. 异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)

async def batch_recommend(users: List[str]):
"""批量异步推荐"""
tasks = [
asyncio.get_event_loop().run_in_executor(
executor,
recommend_products,
user_id
)
for user_id in users
]
results = await asyncio.gather(*tasks)
return results

3. 成本优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 使用较小的模型处理简单请求
def route_request(message: str):
"""智能路由"""
complexity = estimate_complexity(message)

if complexity < 0.3:
# 简单请求,使用小模型
return gpt_3_5_turbo_model
else:
# 复杂请求,使用大模型
return gpt_4_model

# 缓存常见问题的答案
COMMON_QUESTIONS_CACHE = {
"退货流程": "支持7天无理由退换货...",
"物流时间": "大部分地区2-3天...",
}

def get_answer(message: str):
"""优先使用缓存"""
for question, answer in COMMON_QUESTIONS_CACHE.items():
if question in message:
return answer
return call_llm(message)

⚠️ 注意事项

1. 数据安全

  • 加密存储用户对话
  • 遵守 GDPR/隐私法规
  • 定期审计数据访问日志

2. 性能优化

  • 目标响应时间 < 2秒
  • 使用 CDN 加速静态资源
  • 实施负载均衡

3. 成本控制

  • 监控 API 调用量
  • 使用缓存减少重复请求
  • 考虑使用本地模型降低成本

4. 持续优化

  • A/B 测试不同 prompt
  • 收集用户反馈改进
  • 定期重新训练推荐模型