返回博客

如何使用LlamaIndex和网页抓取构建生产就绪的RAG(2025指南)

当生产RAG依赖过时的静态知识时就会失败。本指南向您展示如何构建抓取实时网络数据、与LlamaIndex集成并在生产中实际存活的RAG系统。您将学习架构弹性抓取管道、为数百万文档优化向量存储,以及部署大规模提供实时智能的系统。

Zilvinas Tamulis

12月 10日, 2025年

16 分钟阅读

为什么传统RAG系统在生产中挣扎

让我们直击要害。RAG不是魔法。

RAG系统结合了两件事: 查找相关信息的搜索系统和使用该信息回答问题的LLM。当有人提问时,您的系统会搜索您的知识库,提取最相关的块,并将它们馈送给LLM。LLM根据该上下文生成答案。

听起来很简单,对吧?问题是: 您的答案只能和您的数据一样好。换句话说,过时的数据意味着过时的答案。在冻结数据集上训练的LLM无法告诉您上周的产品发布、今天早上的突发新闻或现在正在发生的市场变化。

这就是网页抓取发挥重大作用的地方。

网页抓取将RAG从静态研究助手转变为实时智能引擎。通过持续将抓取的内容馈送到您的管道中,您可以确保答案保持准确、相关和可信。

这种方法优于单独使用LLM,因为:

  • 您获得基于真实数据的准确答案
  • 您可以为每个声明引用来源
  • 您可以在不重新训练模型的情况下更新知识
  • 您控制LLM可以访问的信息

在构建用于生产的RAG系统时,使用的最佳工具之一是LlamaIndex。LlamaIndex为您处理所有烦人的基础设施工作。它管理文档加载、文本分块、嵌入生成、向量存储和查询处理。您无需从头开始构建这些组件或弄清楚它们如何组合在一起。

LlamaIndex支持主要的向量数据库,如PineconeWeaviateChromaQdrant。它与OpenAI、Anthropic和本地模型集成,同时包括真正理解文档结构的智能分块工具。

该框架旨在从原型扩展到生产,无需重大重写。您可以在开发期间从本地存储开始,然后使用最少的代码更改切换到生产向量数据库。

规划您的生产RAG架构

想知道为什么大多数RAG系统在大规模崩溃吗?它们从一开始就没有为此架构。将生产需求改造到原型上的成本是从一开始正确设计的10倍。为了避免这个陷阱,让我们正确规划我们的系统。

系统设计要点

您的RAG架构需要四个核心组件协同工作。

  • 数据摄取管道. 设计处理实时流和批处理的管道。每个阶段都应该独立失败,因此一个损坏的抓取器不应该关闭整个系统
  • 向量存储. 规划索引大小为您初始估计的10倍,因为它们总是比预期增长得更快。在托管服务(更易于维护)或自托管(更多控制,更多工作)之间进行选择
  • 查询处理. 映射您的整个工作流程: 嵌入生成、相似性搜索、重新排序、上下文组装、LLM调用、响应流式传输。每个步骤都会增加延迟,因此单独分析它们,这样您就知道在哪里优化
  • 监控. 从一开始就使用管道健康、数据新鲜度、检索质量、查询延迟和每次查询成本的指标进行检测。构建在用户注意到问题之前捕获问题的仪表板

网页抓取基础设施

您的抓取系统需要比检索层更具弹性,因为如果抓取中断,您的整个知识库就会过时。

为每个目标选择适当的代理类型并主动实施轮换逻辑。尊重速率限制、robots.txt,并在遇到429错误时指数退避。耐心的抓取无限期运行,而激进的抓取永久被阻止。

在抓取时验证内容并实施带有死信队列的断路器以增强可靠性。

可扩展性规划

从一开始就设计水平扩展,使用无状态工作器。通过域对抓取目标进行分区,使用负载均衡分配查询处理,并通过内容类型或时间戳对索引进行分片

跟踪每个组件的成本并首先优化最昂贵的操作,通常是嵌入生成或LLM调用。从第一天起就为规模架构,因为将生产需求改造到原型上的成本是从一开始正确设计它们的10倍。

既然您已经规划了架构,让我们设置一个支持它的开发环境。

设置您的环境

让我们构建这个东西。从干净、可重现的设置开始。

首先,确保您有Python 3.9或更高版本。接下来,创建一个虚拟环境并安装核心包:

python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install llama-index llama-index-vector-stores-chroma llama-index-embeddings-openai \
llama-index-embeddings-huggingface llama-index-llms-openai \
fastapi uvicorn chromadb sentence-transformers \
python-dotenv requests beautifulsoup4

安装Ollama(用于本地LLM):

ollama pull llama3.2:3b

组织您的项目,使代码、配置和数据清晰分离:

your-project/
├─ rag_system.py # your FastAPI app
├─ scraper.py # your scraping app
├─ .env
└─ requirements.txt

使用以下设置创建您的.env文件:

# API
RAG_API_KEY=your-secret-api-key
HOST=0.0.0.0
PORT=8000
# Vector store
CHROMA_PERSIST_DIR=./chroma_db
# Decodo API (used by scraper.py)
DECODO_USERNAME=your-decodo-username
DECODO_PASSWORD=your-decodo-password
# LLM (optional, set ENABLE_LLM=true to use)
ENABLE_LLM=false
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_MODEL=gpt-3.5-turbo

创建一个如下所示的requirements.txt:

# Web Framework
fastapi==0.115.*
uvicorn[standard]==0.30.*
# Vector Database
chromadb==0.5.*
# Embeddings
sentence-transformers==2.7.*
# Web Scraping
requests==2.32.*
beautifulsoup4==4.12.*
# Environment Configuration
python-dotenv==1.0.*
# Data Validation (FastAPI dependency)
pydantic==2.5.*
# Optional for full RAG
openai==1.40.*

构建网页抓取数据管道

您的RAG系统只能和馈送它的数据一样好。让我们构建一个不会崩溃的抓取器。

生产抓取器需要三样东西: 代理轮换以避免被禁止、适当的错误处理以应对出错情况,以及验证以在坏数据进入系统之前捕获它。

这是一个使用Decodo网页抓取API的生产级抓取器:

import os
import requests
from bs4 import BeautifulSoup
import logging
from typing import Optional
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
class DecodoScraper:
"""Web scraper using Decodo's Web Scraping API"""
def __init__(self):
self.username = os.getenv("DECODO_USERNAME")
self.password = os.getenv("DECODO_PASSWORD")
self.base_url = "https://scraper-api.decodo.com/v2/scrape"
self.session = requests.Session()
if self.username and self.password:
self.session.auth = (self.username, self.password)
self.session.headers.update({'Content-Type': 'application/json'})
else:
logger.warning("Decodo credentials not found in environment variables")
def scrape_url(self, url: str) -> str:
"""Scrape a webpage using Decodo's Web Scraping API"""
try:
logger.info(f"Scraping URL: {url}")
task_params = {
'url': url,
'parse': True # Enable automatic parsing
}
response = self.session.post(
self.base_url,
json=task_params,
timeout=30
)
logger.info(f"Response status: {response.status_code}")
if response.status_code == 200:
data = response.json()
logger.info(f"Response data keys: {list(data.keys())}")
logger.info(f"Full response structure: {data}")
# Check if we have results
if 'results' in data and len(data['results']) > 0:
result = data['results'][0]
logger.info(f"Result keys: {list(result.keys())}")
# Try to extract the actual HTML content
if 'content' in result:
content = result['content']
logger.info(f"Content keys: {list(content.keys()) if isinstance(content, dict) else 'Not a dict'}")
# If we have HTML content directly
if 'html' in content:
soup = BeautifulSoup(content['html'], 'html.parser')
text = soup.get_text()
return text
# If we have text content directly
elif 'text' in content:
return content['text']
# If we have description
elif 'description' in content:
return content['description']
# If content is a string
elif isinstance(content, str):
return content
# If we have parsed content
elif 'results' in content:
return str(content['results'])
else:
return str(content)
# If result has HTML directly
elif 'html' in result:
soup = BeautifulSoup(result['html'], 'html.parser')
text = soup.get_text()
return text
# If result has text directly
elif 'text' in result:
return result['text']
# If result has description
elif 'description' in result:
return result['description']
else:
# Return the entire result as string for debugging
logger.warning(f"Unexpected result structure: {result}")
return str(result)
else:
raise Exception(f"No results found in response: {data}")
else:
raise Exception(f"HTTP error {response.status_code}: {response.text}")
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
raise Exception(f"Request failed: {e}")
except Exception as e:
logger.error(f"Scraping failed: {e}")
raise Exception(f"Scraping failed: {e}")
def is_ready(self) -> bool:
"""Check if scraper is ready (has credentials)"""
return bool(self.username and self.password)
def test_connection(self) -> bool:
"""Test the connection to Decodo API"""
try:
# Test with a simple URL
test_url = "https://httpbin.org/html"
content = self.scrape_url(test_url)
return len(content) > 0
except Exception as e:
logger.error(f"Connection test failed: {e}")
return False
# Global scraper instance
_scraper_instance = None
def get_scraper() -> DecodoScraper:
"""Get the global scraper instance"""
global _scraper_instance
if _scraper_instance is None:
_scraper_instance = DecodoScraper()
return _scraper_instance
# Test function
if __name__ == "__main__":
import asyncio
import logging
# Set up logging to see debug information
logging.basicConfig(level=logging.INFO)
async def test_scraper():
scraper = get_scraper()
if not scraper.is_ready():
print("Scraper not ready - missing credentials")
print("Make sure to set DECODO_USERNAME and DECODO_PASSWORD in .env file")
return
print("Scraper ready")
try:
# Test with a simple URL
test_url = "https://httpbin.org/html"
print(f"Testing with URL: {test_url}")
content = scraper.scrape_url(test_url)
print(f"Successfully scraped {len(content)} characters")
print(f"Content preview: {content[:200]}...")
# Also test with a more complex URL
print("\n" + "="*50)
test_url2 = "https://example.com"
print(f"Testing with URL: {test_url2}")
content2 = scraper.scrape_url(test_url2)
print(f"Successfully scraped {len(content2)} characters")
print(f"Content preview: {content2[:200]}...")
except Exception as e:
print(f"Scraping failed: {e}")
asyncio.run(test_scraper())

这个抓取器自动从.env文件加载您的Decodo凭据并将它们用于所有请求。它通过Decodo网页抓取API处理JavaScript渲染、代理轮换和反机器人措施。您只需传入URL并获得干净的文本。

抓取器包括适当的错误处理、日志记录和test_connection()方法,以在开始抓取真实数据之前验证一切是否正常。get_scraper()函数提供全局实例,因此您无需为每个请求创建新连接。

原始抓取的数据在进入向量数据库之前需要清理。首先使用Beautiful Soup或正则表达式剥离HTML标签、脚本、广告和不相关的部分,然后修复间距和编码问题。

清理后,将大文本分成500到1000个令牌的块,大小足够小以适应LLM上下文窗口,但又足够大以保留含义。使用源URL、时间戳和节标题等元数据丰富每个块,以便稍后过滤结果并提供引用。最后,通过保留最新版本或维护按源标记的多个版本来删除重复内容。

此预处理步骤至关重要,因为摄取期间的干净数据可以节省数周的调试时间,这些时间原本会花在由索引中的垃圾引起的检索质量差上。

厌倦了管理代理池和应对验证码?

Decodo的动态住宅代理在195+个地区覆盖1.15亿+IP地址,成功率达99.95%,响应时间低于0.6秒。当您专注于RAG架构时,您的抓取管道仍能保持弹性运行。

将网页抓取数据与LlamaIndex集成

现在到了好的部分。将您干净的抓取数据馈送到LlamaIndex并查询它。

生产RAG循环有三个部分: 摄取新鲜的网络数据、高效索引它以及精确查询。此实现使用FastAPI作为端点、ChromaDB作为向量存储、LlamaIndex用于所有AI繁重工作。

数据摄取工作流程

您更新数据的频率应该与源更改的频率相匹配。实时摄取减少了信息发布和用户在系统中找到它之间的延迟。这对于新闻源和市场数据等时间敏感的内容最重要,分钟可以产生差异。

首先,您的RAG系统需要能够通过后台任务处理抓取,因此API请求立即返回,而抓取异步进行。这可以防止抓取大型站点时出现超时问题:

# rag_system.py
import os
import asyncio
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime, timezone
import uuid
# Core dependencies
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl, Field
import uvicorn
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# LlamaIndex imports
try:
from llama_index.core import Document, VectorStoreIndex, StorageContext
from llama_index.core import Settings as LISettings
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.llms.openai import OpenAI as LIOpenAI
import chromadb
from chromadb.config import Settings as ChromaSettings
LLAMAINDEX_AVAILABLE = True
except ImportError as e:
LLAMAINDEX_AVAILABLE = False
logger.warning(f"LlamaIndex not available: {e}")
class Settings:
"""Application settings from environment variables"""
# API Configuration
API_KEY = os.getenv("RAG_API_KEY", "your-secret-api-key")
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", 8000))
# Database Configuration
CHROMA_PERSIST_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
# LLM Configuration
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
ENABLE_LLM = os.getenv("ENABLE_LLM", "true").lower() == "true"
# Embedding Configuration
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "openai")
HUGGINGFACE_MODEL = os.getenv("HUGGINGFACE_MODEL", "BAAI/bge-small-en-v1.5")
# Chunking Configuration
CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", 1024))
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", 200))

配置从环境变量加载,使其易于在不同环境中部署而无需更改代码。您可以将API密钥添加到.env中。

向量存储和索引

LlamaIndex与ChromaDB集成以处理具有自动持久性的向量存储。系统让LlamaIndex管理分块、嵌入和索引。

class LlamaIndexVectorDB:
"""Vector DB via LlamaIndex on top of Chroma"""
def __init__(self, persist_directory: str):
self.persist_directory = persist_directory
self.client = None
self.is_initialized = False
async def initialize(self, settings: Settings):
if not LLAMAINDEX_AVAILABLE:
raise Exception("LlamaIndex not available. Install with: pip install llama-index")
os.makedirs(self.persist_directory, exist_ok=True)
self.client = chromadb.PersistentClient(
path=self.persist_directory,
settings=ChromaSettings(anonymized_telemetry=False, allow_reset=True),
)
# Configure embedding model through LlamaIndex
if settings.EMBEDDING_MODEL.lower() == "openai" and settings.OPENAI_API_KEY:
LISettings.embed_model = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=settings.OPENAI_API_KEY
)
else:
LISettings.embed_model = HuggingFaceEmbedding(
model_name=settings.HUGGINGFACE_MODEL
)
# Configure chunking through LlamaIndex
LISettings.node_parser = SentenceSplitter(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP
)
self.is_initialized = True
logger.info("LlamaIndexVectorDB initialized")
async def add_document(self, collection_name: str, content: str, metadata: Dict[str, Any]):
collection = self.client.get_or_create_collection(collection_name)
vector_store = ChromaVectorStore(chroma_collection=collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# Let LlamaIndex handle chunking automatically
docs = [Document(text=content, metadata=metadata)]
VectorStoreIndex.from_documents(docs, storage_context=storage_context)
logger.info(f"Indexed document into '{collection_name}'")
async def query(self, collection_name: str, query_text: str, n_results: int = 5) -> Dict[str, Any]:
collection = self.client.get_or_create_collection(collection_name)
vector_store = ChromaVectorStore(chroma_collection=collection)
index = VectorStoreIndex.from_vector_store(vector_store)
qe = index.as_query_engine(similarity_top_k=n_results)
resp = await asyncio.to_thread(qe.query, query_text)
docs, metas, dists = [], [], []
for sn in getattr(resp, "source_nodes", []):
docs.append(sn.get_text())
metas.append(sn.node.metadata or {})
# sn.score is similarity (higher is better)
relevance_score = max(0.0, min(1.0, sn.score or 0.0))
dists.append(relevance_score)
return {"documents": docs, "metadatas": metas, "distances": dists}

使用这种方法有几个原因:

  • 它有灵活的嵌入模型. 您可以在OpenAI嵌入(高质量,付费)或HuggingFace嵌入(免费,本地)之间进行选择,并通过环境变量配置它们而无需更改代码
  • 有智能分块. LlamaIndex的SentenceSplitter在达到目标块大小的同时尊重句子边界。默认值是1024个令牌,重叠200个令牌以实现最佳上下文保留
  • 它有持久存储. ChromaDB自动保存到磁盘,因此您的索引在重启后仍然存在,无需手动保存
  • 有基于集合的组织. 每个抓取的站点或项目都有自己的集合,查询文档独立于博客帖子,产品数据独立于支持票据

现在我们将所有内容联系在一起。在一个系统中抓取、索引和查询:

class LlamaIndexRAGSystem:
"""Production-ready RAG system using LlamaIndex"""
def __init__(self, settings: Settings):
self.settings = settings
self.vector_db = LlamaIndexVectorDB(settings.CHROMA_PERSIST_DIR)
self.is_initialized = False
async def initialize(self):
"""Initialize LlamaIndex components"""
try:
await self.vector_db.initialize(self.settings)
# Configure LlamaIndex LLM
if self.settings.ENABLE_LLM and self.settings.OPENAI_API_KEY:
LISettings.llm = LIOpenAI(
model=self.settings.OPENAI_MODEL,
api_key=self.settings.OPENAI_API_KEY
)
logger.info(f"LlamaIndex LLM configured: {self.settings.OPENAI_MODEL}")
self.is_initialized = True
logger.info("LlamaIndex RAG system initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize LlamaIndex RAG system: {e}")
raise
async def add_document(self, content: str, metadata: Dict[str, Any], collection_name: str = None):
"""Add a document to LlamaIndex"""
try:
if not self.is_initialized:
raise Exception("RAG system not initialized")
await self.vector_db.add_document(
collection_name or "default",
content,
metadata
)
logger.info(f"Added document to LlamaIndex with {len(content)} characters")
except Exception as e:
logger.error(f"Failed to add document: {e}")
raise
async def query(self, query_text: str, collection_name: str = None) -> Dict[str, Any]:
"""Query the LlamaIndex RAG system"""
try:
if not self.is_initialized:
raise Exception("RAG system not initialized")
# Query with LlamaIndex
results = await self.vector_db.query(
collection_name or "default",
query_text,
n_results=5
) if results["documents"]:
context = "\n\n".join(results["documents"])
# Try to use LlamaIndex LLM for response generation
if hasattr(LISettings, 'llm') and LISettings.llm:
try:
prompt = f"""Based on the following context, please answer the question.
Context:
{context}
Question: {query_text}
Answer:"""
response = await asyncio.to_thread(LISettings.llm.complete, prompt)
answer = str(response)
except Exception as e:
logger.warning(f"LlamaIndex LLM generation failed, using simple assembly: {e}")
answer = f"Based on the context: {context[:500]}..."
else:
# Simple assembly (R+A mode)
answer = f"Based on the context: {context[:500]}..."
sources = []
for i, (doc, metadata) in enumerate(zip(results["documents"], results["metadatas"])):
sources.append({
"content": doc[:200] + "..." if len(doc) > 200 else doc,
"metadata": metadata,
"relevance_score": results["distances"][i] if i < len(results["distances"]) else 0
})
mode = "LlamaIndex RAG" if getattr(LISettings, "llm", None) else "LlamaIndex R+A"
return {
"answer": answer,
"sources": sources,
"mode": mode
}
else:
mode = "LlamaIndex RAG" if getattr(LISettings, "llm", None) else "LlamaIndex R+A"
return {
"answer": "No relevant information found.",
"sources": [],
"mode": mode
}
except Exception as e:
logger.error(f"Query failed: {e}")
raise
async def scrape_and_store(self, url: str, collection_name: str = None, max_content_length: int = 1_000_000) -> str:
"""Scrape a URL and store the content in LlamaIndex"""
try:
from scraper import get_scraper
scraper = get_scraper()
content = scraper.scrape_url(url)
# Guard against empty pages
if not content or len(content.strip()) < 50:
raise Exception(f"Scraped content too short or empty: {len(content)} characters")
# Size limit check
if len(content) > max_content_length:
logger.warning(f"Content truncated from {len(content)} to {max_content_length} characters")
content = content[:max_content_length]
# Create metadata
metadata = {
"url": url,
"scraped_at": datetime.now(timezone.utc).isoformat(),
"task_id": uuid.uuid4().hex,
"content_length": len(content)
}
await self.add_document(content, metadata, collection_name)
logger.info(f"Successfully scraped and stored content from {url}")
return collection_name or "default"
except Exception as e:
logger.error(f"Failed to scrape and store: {e}")
raise

LlamaIndexRAGSystem编排完整的RAG管道,初始化向量数据库并可选地配置OpenAI LLM以生成答案。它以两种模式运行: 完整RAG模式(当LLM可用时)从检索的上下文生成自然语言答案,而R+A模式(后备)只是组装检索的上下文而不生成。

每个搜索结果包括0到1之间的相关性分数,源文档被截断为200个字符以供显示。系统包括scrape_and_store方法,该方法获取网络内容,验证它是否满足最小长度要求,并使用URL和时间戳等元数据进行索引。大多数错误通过异常向上传播,当LLM生成失败时发生主要的优雅后备。

现在,是时候创建FastAPI端点来公开您的RAG系统了:

# 初始化FastAPI应用
app = FastAPI(
title="LlamaIndex RAG系统",
description="具有LlamaIndex和网页抓取的生产就绪RAG系统",
version="1.0.0"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 安全
security = HTTPBearer()
# 初始化设置和RAG系统
settings = Settings()
rag_system = LlamaIndexRAGSystem(settings)
# API模型
class ScrapeRequest(BaseModel):
url: HttpUrl
collection_name: Optional[str] = Field(default=None, description="自定义集合名称")
max_content_length: Optional[int] = Field(default=1000000, ge=1000, le=5000000, description="最大内容长度")
class QueryRequest(BaseModel):
query: str = Field(..., min_length=1, max_length=1000)
collection_name: Optional[str] = Field(default=None)
class QueryResponse(BaseModel):
answer: str
sources: List[Dict[str, Any]]
timestamp: datetime
mode: str
# 身份验证
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证API密钥"""
if credentials.credentials != settings.API_KEY:
raise HTTPException(status_code=401, detail="无效的API密钥")
return True
# API端点
@app.get("/")
async def root():
"""根端点"""
return {"message": "具有网页抓取的LlamaIndex RAG系统"}
@app.post("/scrape", status_code=status.HTTP_202_ACCEPTED)
async def scrape_url(
request: ScrapeRequest,
background_tasks: BackgroundTasks,
_: bool = Depends(verify_api_key)
):
"""抓取URL并存储在LlamaIndex中"""
try:
task_id = uuid.uuid4().hex
background_tasks.add_task(
process_scraping_task,
task_id,
str(request.url),
request.collection_name,
request.max_content_length
)
return {
"task_id": task_id,
"status": "accepted",
"message": "后台抓取已启动"
}
except Exception as e:
logger.error(f"抓取请求失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/query", response_model=QueryResponse)
async def query_rag(
request: QueryRequest,
_: bool = Depends(verify_api_key)
):
"""查询LlamaIndex RAG系统"""
try:
result = await rag_system.query(
query_text=request.query,
collection_name=request.collection_name
)
return QueryResponse(
answer=result["answer"],
sources=result["sources"],
timestamp=datetime.now(timezone.utc),
mode=result["mode"]
)
except Exception as e:
logger.error(f"查询失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""健康检查端点"""
from scraper import get_scraper
scraper = get_scraper()
return {
"status": "healthy",
"timestamp": datetime.now(timezone.utc),
"mode": "LlamaIndex RAG",
"services": {
"rag_system": "healthy" if rag_system.is_initialized else "unhealthy",
"scraper": "healthy" if scraper.is_ready() else "unhealthy",
"llamaindex": "enabled" if LLAMAINDEX_AVAILABLE else "disabled"
}
}

通过此设置,抓取在后台运行,因此您的API请求立即返回任务ID,而不是让您等待大型站点完成。Bearer令牌身份验证还通过阻止未经授权的用户来保护您的API安全。


系统包括健康检查端点,用于监控RAG系统和抓取器状态,使您可以轻松查看一切是否正常运行。所有响应通过Pydantic模型使用结构化格式,确保一致性和自动数据验证。

运行您的RAG API

最后,您可以启动服务器:

python rag_system.py

API在http://localhost:8000上运行。您将看到LlamaIndex初始化、ChromaDB连接以及您的嵌入模型加载。一旦您看到"LlamaIndex RAG系统启动成功",您就可以开始了。

直接从终端访问API:

# 抓取URL
curl -X POST "http://localhost:8000/scrape" \
-H "Authorization: Bearer your-secret-api-key" \
-H "Content-Type: application/json" \
-d '{"url": "https://example.com/docs", "collection_name": "my_docs"}'
# 查询抓取的内容
curl -X POST "http://localhost:8000/query" \
-H "Authorization: Bearer your-secret-api-key" \
-H "Content-Type: application/json" \
-d '{"query": "主要功能是什么?", "collection_name": "my_docs"}'
# 检查健康
curl "http://localhost:8000/health"

/scrape端点立即返回任务ID。您的抓取在后台运行,而您的API保持响应。没有超时,没有阻塞的请求。

与您的Python应用程序集成:

import requests
API_URL = "http://localhost:8000"
API_KEY = "your-secret-api-key"
headers = {"Authorization": f"Bearer {API_KEY}"}
# 抓取URL
scrape_response = requests.post(
f"{API_URL}/scrape",
json={"url": "https://example.com/docs", "collection_name": "docs"},
headers=headers
)
print(scrape_response.json())
# 抓取完成后查询
query_response = requests.post(
f"{API_URL}/query",
json={
"query": "如何开始?",
"collection_name": "docs"
},
headers=headers
)
print(query_response.json())

查询响应包含您所需的一切:

{
"answer": "To get started, install the required packages...",
"sources": [
{
"content": "Installation instructions: First, create a virtual environment...",
"metadata": {
"url": "https://example.com/docs/getting-started",
"scraped_at": "2025-10-17T10:30:00Z",
"content_length": 5234
},
"relevance_score": 0.89
}
],
"timestamp": "2025-10-17T10:35:00Z",
"mode": "LlamaIndex RAG"
}

“mode”字段用于指示当前运行模式:是采用LLM生成答案的完整RAG模式,还是使用组合上下文的R+A模式。“relevance_score”则显示每个信息源与查询的匹配程度。

目前系统全程采用异步操作,可同时处理多个请求且互不阻塞。长时运行的抓取任务在后台执行,用户无需等待即可获得即时响应。

ChromaDB自动将所有数据保存至磁盘,确保即使服务器重启数据依然安全。每个项目拥有独立数据集,实现数据隔离与精准检索。该生产级系统融合了可靠的抓取功能与简洁的API接口,FastAPI自动生成文档(http://localhost:8000/docs),便于您轻松测试并集成至其他应用。

API运行后,您可轻松抓取并整理多源数据至独立数据集:

import requests
API_URL = "http://localhost:8000"
API_KEY = "your-secret-api-key"
headers = {"Authorization": f"Bearer {API_KEY}"}
# Scrape multiple sources into organized collections
sources = {
"documentation": [
"https://example.com/docs/getting-started",
"https://example.com/docs/api-reference"
],
"blog": [
"https://example.com/blog/latest",
"https://example.com/blog/tutorials"
]
}
for collection, urls in sources.items():
for url in urls:
response = requests.post(
f"{API_URL}/scrape",
json={"url": url, "collection_name": collection},
headers=headers
)
print(f"Scraping {url} into {collection}: {response.json()}")
# Query specific collections
docs_query = requests.post(
f"{API_URL}/query",
json={
"query": "How do I authenticate?",
"collection_name": "documentation"
},
headers=headers
)
print(docs_query.json())

要启用基于LLM生成的完整RAG模式答案,请更新您的.env文件:

# Enable LLM mode
ENABLE_LLM=true
OPENAI_API_KEY=sk-your-actual-api-key-here
OPENAI_MODEL=gpt-3.5-turbo # or gpt-4

重启您的RAG API。健康检查接口现应显示“mode”: “LlamaIndex RAG”,服务状态将确认LLM已启用。在RAG模式下,LLM会读取上下文并生成自然语言回答。若LLM生成因任何原因失败(API错误、超时、速率限制),系统将自动回退至R+A模式。

为RAG系统选择大型语言模型时,您有三种主要方案:

  • 最简方案是在自有硬件本地运行Ollama,既免除API费用又能保障数据隐私,适用于开发及中等生产负载场景。
  • 若需更快速响应与更高质量答案,OpenAI或Anthropic等云端API能满足高吞吐量生产需求,但需按查询次数付费。
  • 追求最大控制权时,可选择vLLM或TGI等自托管方案,在自有GPU服务器上运行大型模型——前期投入较高,但后续查询次数不受限制。

请根据预算、查询量及质量需求选择方案。需注意:不同服务商的代码结构保持一致,后续切换将十分便捷。

生产部署注意事项

部署揭示了您的规划是否扎实或充满捷径。在生产中没有第二次机会。

基础设施需求

处理每天100到1000次查询的生产RAG系统需要8到16个CPU核心、32到64GB RAM和SSD存储。如果您大量生成嵌入,请添加GPU。当您需要更多容量时,通过添加工作器而不是升级单机进行水平扩展,以获得更好的容错性。

您的数据库设置同样关键。使用连接池高效重用连接,设置读取副本以分配查询负载,并配置具有时间点恢复的自动备份。当查询变慢时,在投入硬件之前重建索引。

接下来是安全性。使用TLS加密、API身份验证、速率限制、CORS策略和负载均衡器保护您的网络。对于抓取基础设施,部署具有凭据轮换和后备策略的动态住宅代理。Decodo的动态住宅代理在195多个地区的1.15亿多个IP上实现99.95%的成功率,平均响应时间不到一秒,为您处理复杂性。

最后,为失败做准备。自动化向量数据库备份,在版本控制中保留配置,保存用于回滚的生产快照,并在需要之前测试恢复程序。

监控和可观察性

您看不到的东西无法修复。首先跟踪第50、95和99百分位的查询延迟,以及嵌入生成时间和吞吐量。这些指标帮助您在用户注意到之前捕获瓶颈。

通过检查内容陈旧度、检索准确性和答案相关性持续监控数据质量。设置警报,在抓取的数据过时或质量下降时警告您。

除了系统指标之外,观察用户如何与您的RAG系统交互。记录错误、抓取失败、API速率限制和性能问题;还跟踪用户满意度分数、查询模式、响应时间和反馈以了解实际性能。

安全和合规

从基础开始,加密静态和传输中的数据。实施基于角色的访问控制以限制谁可以访问数据、API端点和管理功能。保留系统访问、数据修改和配置更改的详细审计日志,用于安全审查和事件调查。

对于监管合规性,实施数据保留策略、用户数据删除工作流程和同意管理。记录您的数据源和处理活动以满足GDPR、CCPA或其他相关法规。

常见问题排查

即使是构建良好的系统也会失败。将生产就绪的RAG与原型区分开来的是您的系统如何优雅地处理失败以及您可以多快地调试问题。

管道失败

您的抓取基础设施面临持续挑战,因为目标网站更改结构、速率限制意外触发、代理失败以及网络问题中断请求。通过以下方式处理这些问题:

  • 对错误进行分类以区分瞬时故障和永久故障
  • 实施具有指数退避的重试逻辑
  • 部署在错误率飙升时暂停抓取的断路器
  • 维护需要手动审查的URL的死信队列

当抓取持续失败时,检查目标是否更改了其HTML结构、更新了反机器人措施、实施了新的速率限制或阻止了您的IP。监控验证失败率以在编码问题和损坏的内容提取破坏索引之前捕获它们。

除了抓取之外,向量数据库变得不可用,LLM API速率限制请求,身份验证令牌过期。为所有依赖项实施健康检查、具有退避的自动重试、回退到缓存数据以及在失败持续时发出警报。

性能问题

常见瓶颈包括随索引大小退化的向量数据库查询延迟、限制吞吐量的嵌入生成、大文档的内存耗尽以及网络带宽约束。在部署前在现实负载下进行分析。

通过流式传输大文档、配置带有LRU驱逐的缓存限制以及定期重启工作器来解决内存问题。通过减少检索的块、缓存结果、使用更快的嵌入模型和流式响应来优化慢查询。

监控和维护

实施验证功能的综合查询、检查数据库响应能力并监控数据新鲜度。首先部署到暂存环境,使用蓝绿部署实现零停机更新,并维护回滚程序。随时间跟踪检索相关性分数、查询成功率和用户反馈,在开发期间而不是生产期间捕获问题。

真实世界的RAG实现示例

客户支持知识库

AI助手可以抓取您的产品文档、FAQ页面和票据日志以按需提供答案。例如,如果客户问支持机器人"如何重置我的设备?"系统会检索相关的手册部分(从您的文档站点抓取)并准确回答。对文档的任何更改(新固件)都会立即抓取和索引。

ServiceNow的Now Assist in AI Search通过使用RAG检索相关知识文章并为客户支持查询生成可操作的问答卡来展示这种方法。系统从客户的知识库中检索文章,使用来自排名靠前的内容的上下文增强查询,并生成引用来源的答案,为上周推出的功能提供"答案而不是链接"。

市场研究自动化

您还可以构建一个RAG系统,不断抓取新闻网站、竞争对手博客和社交媒体源以获取您行业的提及。分析师可以查询它以获取趋势或竞争对手动向。例如,"X公司本周宣布了什么?"触发对最新抓取的新闻稿和新闻文章的搜索,产生摘要。

AlphaSense使用RAG技术处理超过5亿份高级商业文档,大规模提供竞争情报。其生成搜索功能像分析师一样解释自然语言查询,并提供引用的响应以最小化幻觉。

该平台自动化竞争基准测试、跟踪价格和产品变化,并实时监控市场趋势,将数小时的研究浓缩为几秒钟。每周添加数百个新的专家访谈,它使公司能够发现需求变化、供应链中断和新兴机会。

内容和洞察管道

RAG系统让您聚合和分析公共数据(例如社交媒体帖子、评论网站、论坛)。RAG模型可以通过检索抓取的用户评论并总结情绪来回答"产品Y的常见投诉是什么?"等问题。实时监控(抓取Twitter或Reddit)让系统在公众意见发生变化时发出警报。

Microsoft Copilot Studio使用RAG监控公共网站并通过从指定域检索相关内容并提供引用摘要来生成对话响应。系统对检索的内容执行基础检查、来源验证和语义相似性分析,同时应用内容审核来过滤不当内容。

知识源可以包括公共网站和内部SharePoint站点,使组织能够聚合新闻、监控情绪并综合来自多个来源的洞察。该平台通过从内部和外部来源查找和呈现信息作为主要响应或在预定义主题无法处理查询时的后备来减少手动工作。

最佳实践与经验教训

  • 在生产环境部署前全面测试。对抓取逻辑运行单元测试,对管道阶段执行集成测试,对查询进行端到端测试,并使用真实数据量进行负载测试。能处理1000份文档的系统,面对100万文档可能崩溃。
  • 开发阶段模拟故障场景。构建适用于不同网站结构、内容边界情况、数据库故障及API失效的测试框架。问题应在早期发现,而非生产环境爆发。
  • 通过持续集成自动化质量检查。配置管道自动检测每次代码变更、排查安全漏洞并部署至测试环境。这能即时发现缺陷,而非数日后才察觉。
  • 彻底审查代码。核查抓取逻辑、错误处理、日志记录及文档。新视角能发现原始开发者遗漏的问题。
  • 清晰记录所有内容。详细记录抓取目标细节、管道转换流程、部署步骤及故障排除指南。将个人知识转化为团队知识。
  • 制定应急预案。定义严重性等级与响应时限,制定常见问题处理手册,复盘每起事件以防重蹈覆辙。从失败中学习是优秀团队与卓越团队的分水岭。
  • 主动监控增长与扩展。追踪数据量、查询负载及资源使用趋势,在资源耗尽前预先扩容而非高峰期补救。
  • 定期评估性能。安排周期性检查系统速度、成本、数据质量及用户反馈,将优化重点聚焦于关键领域。
  • 在团队内共享知识。记录架构决策,分享事件教训,轮值值班任务,并举办知识交接会议。系统的可靠性取决于团队的可靠性。

结论和后续步骤

构建生产就绪的RAG应用程序是关于从第一天起就设计弹性、规模和持续新鲜度。通过集成网页抓取,您将应用程序转变为以互联网速度适应的实时智能引擎。

LlamaIndex为扩展向量存储、检索和优化提供了骨干,但您的数据管道只有其最弱的抓取器那么强。通过Decodo的网页抓取API自动处理代理轮换、动态渲染和反机器人挑战,您可以专注于构建和优化RAG架构,而不是扑灭抓取器的火灾。

如果您的下一步是从原型转向生产,道路很清晰:

  • 从一开始就为规模架构
  • 保持管道干净和弹性
  • 优化检索以提高性能和成本
  • 利用Decodo以全球规模可靠地抓取

生产RAG不再是"它能工作吗",而是"它能存活吗?“通过正确的设计选择和Decodo为您的数据管道提供动力,答案是"是的”。

准备好构建可投入生产使用的RAG了吗?

别再为抓取工具故障和过期索引烦恼。Decodo为您提供开箱即用的可靠、可扩展的网页抓取基础设施。

关于作者

Zilvinas Tamulis

技术文案

作为一名拥有 4 年以上工作经验的技术作家,Žilvinas 将自己在多媒体和计算机设计方面的学习与创建用户手册、指南和技术文档方面的实际专业知识相结合。他的工作包括利用 JavaScript、PHP 和 Python 的实践经验,开发每天有数百人使用的网络项目。


通过 LinkedIn 与 Žilvinas 联系。

Decodo 博客上的所有信息均按原样提供,仅供参考。对于您使用 Decodo 博客上的任何信息或其中可能链接的任何第三方网站,我们不作任何陈述,也不承担任何责任。

常见问题

什么是RAG,网页抓取如何改进它?

检索增强生成将LLM与外部知识库相结合,通过在回答问题之前检索相关上下文来生成准确、可验证的响应。网页抓取通过使知识库与新鲜数据保持最新,将RAG从静态转变为动态,使您的系统能够回答有关现在正在发生的事情的问题,而不是依赖过时的信息。

如何使用LlamaIndex构建RAG管道?

安装LlamaIndex,选择向量数据库,加载文档,将它们分成512个令牌块,重叠50个令牌,生成嵌入,索引块,并创建查询引擎,检索相关内容并将其传递给LLM。通过调度获取新内容并增量更新索引的作业来添加网页抓取。

在生产中实施RAG的最佳实践是什么?

从第一天起就为水平可扩展性架构,为查询延迟和数据新鲜度实施全面监控,构建强大的错误处理,使用任务队列而不是cron作业,在索引之前验证数据质量,缓存常见查询,使用生产规模数据进行测试,维护文档,并主动规划容量增长。

如何优化生产中RAG的块大小?

从512个令牌和50个令牌重叠作为默认值开始,然后使用您的特定内容进行测试以找到上下文保留和检索精度之间的正确平衡。技术文档通常需要更大的块(1024个令牌),而新闻文章与较小的块(256个令牌)配合良好,因此测量检索质量并根据答案质量进行调整。

生产RAG系统需要什么基础设施?

从8到16个CPU核心、32到64GB RAM和SSD存储开始,具有足够的网络带宽以维持数据摄取。在托管服务(如Pinecone)之间选择简单性或自托管选项(如Weaviate)以获得更多控制,如果嵌入生成成为瓶颈则添加GPU实例,并设计水平扩展。

如何处理RAG应用程序中的数据新鲜度?

实施与内容更新模式匹配的计划抓取(新闻每小时,文档每天),使用增量更新仅处理更改的内容,跟踪文档修改时间,验证新鲜度,实施版本控制以实现回滚能力,并根据需求和成本平衡实时与批处理。

在实际应用中实施RAG时的主要挑战是什么?

数据质量是隐藏的挑战,因为抓取的内容包含降低检索质量的噪声。可扩展性比预期更难,当文档数从数千跃升至数十万时系统会失败。性能需要持续调整嵌入和块大小,成本在规模上迅速上升,运营复杂性显著增加。

如何将网页抓取数据集成到RAG系统中?

构建按计划抓取内容、验证质量、清理和规范化文本、适当分块、生成嵌入并使用Celery等任务调度程序增量更新索引的管道。为时间敏感的来源实施实时摄取或为大容量来源实施批处理,添加QA检查点,监控管道健康,并考虑使用Decodo的API处理代理轮换和反机器人措施。

RAG应用程序中网页抓取的最佳实践是什么?

尊重请求之间有延迟的速率限制,使用具有自动轮换的动态住宅代理,为失败实施指数退避,在摄取前验证抓取的内容,监控robots.txt,添加带有死信队列的错误处理,并记录活动以进行调试。考虑使用Decodo的动态住宅代理,成功率为99.95%,以消除基础设施复杂性,同时保持可靠的访问。

相关文章

Cloudflare 服务中断:事件始末、故障原因及应对措施

Cloudflare正遭遇全球性服务中断,影响全球数亿用户。包括X、OpenAI和Downdetector在内的主要平台均受波及。本文将解析当前Cloudflare的运行状况、导致大规模500错误的根源,并为受Cloudflare停机影响的用户及网站所有者提供可操作的应对方案。

Zilvinas Tamulis

11月 18日, 2025年

5 分钟阅读

使用 Python 中的Beautiful Soup进行网络数据解析的完整指南

Beautiful Soup 是一个广泛使用的 Python 库,在数据提取方面发挥着重要作用。它为解析 HTML 和 XML 文档提供了强大的工具,使从网页中轻松提取有价值的数据成为可能。该库简化了处理互联网上非结构化内容的复杂过程,使您可以将原始网页数据转换为结构化的可用格式。


HTML 文档解析在信息世界中起着举足轻重的作用。HTML 数据可进一步用于数据整合、分析和自动化,涵盖从商业智能到研究等各个方面。网络是一个庞大的地方,充满了有价值的信息;因此,在本指南中,我们将使用各种工具和脚本来探索浩瀚的海洋,并教它们带回所有数据。

James Keenan

1月 27日, 2025年

14 分钟阅读

© 2018-2025 decodo.cn(原名 smartproxy.com)。版权所有 津ICP备2022004334号-2