XHS-Downloader小红书内容采集与结构化数据提取的技术实现【免费下载链接】XHS-Downloader小红书XiaoHongShu、RedNote链接提取/作品采集工具提取账号发布、收藏、点赞、专辑作品链接提取搜索结果作品、用户链接采集小红书作品信息提取小红书作品下载地址下载小红书作品文件项目地址: https://gitcode.com/gh_mirrors/xh/XHS-DownloaderXHS-Downloader是一个基于Python的开源工具专为小红书XiaoHongShu平台的内容采集与数据提取而设计。该项目通过模块化架构实现了从链接解析、数据采集到文件下载的完整工作流为开发者、研究人员和内容创作者提供了高效的技术解决方案。项目定位与技术价值在现代内容平台生态中小红书作为重要的社交媒体平台其内容具有独特的商业价值和研究意义。然而平台API的封闭性和动态加载技术为数据采集带来了技术挑战。XHS-Downloader通过逆向工程分析网络请求模式实现了对小红书平台数据的结构化提取。项目的技术价值体现在三个层面一是提供了标准化的数据采集接口降低了技术门槛二是支持多种部署模式满足不同场景需求三是实现了完整的数据处理流水线从原始数据到结构化存储的全流程自动化。核心架构设计原理模块化分层架构XHS-Downloader采用清晰的分层架构设计将不同功能解耦为独立的处理单元source/ ├── application/ # 应用层核心业务逻辑 │ ├── app.py # 主应用接口 │ ├── explore.py # 数据提取引擎 │ ├── request.py # 网络请求管理 │ ├── download.py # 文件下载处理 │ ├── image.py # 图片处理模块 │ └── video.py # 视频处理模块 ├── module/ # 业务模块层 │ ├── manager.py # 配置与资源管理 │ ├── recorder.py # 数据记录管理 │ └── tools.py # 工具函数集合 └── expansion/ # 扩展功能层 ├── converter.py # 数据格式转换 ├── cleaner.py # 内容清洗处理 └── namespace.py # 命名空间管理这种分层设计使得每个模块都可以独立测试和优化同时也为二次开发提供了清晰的接口定义。异步处理与并发控制项目采用异步编程模型基于Python的asyncio库实现高效的并发处理。核心下载模块支持断点续传和大文件分块下载通过连接池管理优化网络资源利用率。# 异步下载核心实现示例 async def download_with_retry(self, url: str, file_path: Path, max_retries: int 3): 带重试机制的异步下载函数 for attempt in range(max_retries): try: async with aiohttp.ClientSession() as session: async with session.get(url, timeoutself.timeout) as response: if response.status 200: total_size int(response.headers.get(content-length, 0)) downloaded 0 with open(file_path, wb) as f: async for chunk in response.content.iter_chunked(self.chunk_size): f.write(chunk) downloaded len(chunk) # 更新进度显示 self._update_progress(downloaded, total_size) return True except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt max_retries - 1: await asyncio.sleep(2 ** attempt) # 指数退避策略 else: raise DownloadError(f下载失败: {e}) return False关键技术实现详解数据提取引擎小红书平台采用动态加载技术传统HTML解析方法难以获取完整数据。XHS-Downloader通过分析网络请求模式实现了对平台API的模拟调用class DataExtractor: def __init__(self, cookie: str None, proxy: str None): self.request Html(cookiecookie, proxyproxy) self.converter Converter() async def extract_note_data(self, url: str) - dict: 提取作品数据 # 1. 解析URL获取作品ID note_id self._extract_note_id(url) # 2. 构建API请求参数 api_params self._build_api_params(note_id) # 3. 发送请求获取原始数据 raw_html await self.request.request_url(api_params) # 4. 提取JSON数据 json_data self.converter._extract_object(raw_html) # 5. 结构化处理 structured_data self._parse_data(json_data) return structured_data def _extract_note_id(self, url: str) - str: 从多种链接格式中提取作品ID patterns [ rexplore/([a-zA-Z0-9]), rdiscovery/item/([a-zA-Z0-9]), ruser/profile/[^/]/([a-zA-Z0-9]) ] for pattern in patterns: match re.search(pattern, url) if match: return match.group(1) raise ValueError(无效的小红书链接格式)文件下载管理系统下载模块实现了智能的文件管理机制包括格式转换、断点续传和完整性验证class DownloadManager: def __init__(self, manager: Manager): self.manager manager self.download_record DownloadRecorder(manager) async def download_files(self, urls: List[str], save_path: Path, file_type: str auto) - List[Path]: 批量下载文件 downloaded_files [] for i, url in enumerate(urls): # 检查下载记录避免重复 if self.download_record.is_downloaded(url): continue # 确定文件格式 file_format self._determine_format(url, file_type) # 生成文件名 filename self._generate_filename(i, file_format) file_path save_path / filename # 执行下载 success await self._download_single_file(url, file_path) if success: downloaded_files.append(file_path) self.download_record.add_record(url) return downloaded_files def _determine_format(self, url: str, preferred_format: str) - str: 智能确定文件格式 if preferred_format AUTO: # 根据URL和内容类型自动判断 if webp in url.lower(): return WEBP elif heic in url.lower(): return HEIC else: return self._detect_from_content(url) return preferred_format图XHS-Downloader的图形用户界面提供直观的操作体验多格式内容支持项目支持多种内容格式的下载和处理内容类型支持格式特点图片作品PNG, WEBP, JPEG, HEIC支持格式转换自动选择最优质量视频作品MP4, MOV支持分辨率优先、码率优先、文件大小优先策略LivePhoto动态图片支持动态图片的完整下载图文混合多种格式组合自动识别并下载所有媒体资源部署与集成方案命令行模式对于自动化脚本和批量处理场景命令行模式提供了最大的灵活性# 基础下载命令 python main.py --url https://www.xiaohongshu.com/explore/67b3a8d9000000001e03abcd # 批量处理配置 python main.py \ --url 链接1 链接2 链接3 \ --work-path ./data/downloads \ --index 1,3,5 \ --image-format WEBP \ --proxy http://127.0.0.1:10808 \ --timeout 30 \ --max-retry 3图命令行模式提供丰富的参数配置选项API服务器模式基于FastAPI构建的RESTful API服务支持与其他系统集成from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Optional, List app FastAPI(titleXHS-Downloader API) class DownloadRequest(BaseModel): url: str download: bool True index: Optional[List[int]] None cookie: Optional[str] None proxy: Optional[str] None app.post(/xhs/detail) async def download_content(request: DownloadRequest): 处理小红书内容下载请求 try: async with XHS( cookierequest.cookie, proxyrequest.proxy, download_recordTrue ) as xhs: result await xhs.extract( request.url, downloadrequest.download, indexrequest.index ) return { status: success, data: result, message: 下载任务已提交 } except Exception as e: raise HTTPException(status_code500, detailstr(e)) # 启动服务器 if __name__ __main__: import uvicorn uvicorn.run(app, host0.0.0.0, port5556)MCP集成模式Model Context Protocol支持使得XHS-Downloader能够与AI助手深度集成# MCP服务器配置 mcp_servers: xhs-downloader: command: python args: [main.py, mcp] env: XHS_WORK_PATH: ./downloads XHS_COOKIE: ${XHS_COOKIE} # Claude Desktop配置 { mcpServers: { xhs-downloader: { type: streamable-http, url: http://127.0.0.1:5556/mcp/ } } }图MCP配置界面支持与AI助手的无缝集成Docker容器化部署项目提供完整的Docker支持便于生产环境部署# Dockerfile示例 FROM python:3.12-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ curl \ rm -rf /var/lib/apt/lists/* # 复制项目文件 COPY . . # 安装Python依赖 RUN pip install --no-cache-dir uv \ uv pip install --system --no-cache-dir -r requirements.txt # 创建数据目录 RUN mkdir -p /app/Volume/Download # 暴露端口 EXPOSE 5556 5558 # 设置启动命令 CMD [python, main.py, api]实际应用场景示例内容创作者备份系统内容创作者可以使用XHS-Downloader构建自动化备份系统import asyncio from datetime import datetime from pathlib import Path class ContentBackupSystem: def __init__(self, user_id: str, backup_root: Path Path(./backup)): self.user_id user_id self.backup_root backup_root self.backup_root.mkdir(exist_okTrue) async def backup_user_content(self, max_works: int 100): 备份用户所有作品 # 构建用户作品列表URL profile_url fhttps://www.xiaohongshu.com/user/profile/{self.user_id} # 创建按月份组织的目录结构 month_folder self.backup_root / datetime.now().strftime(%Y%m) month_folder.mkdir(exist_okTrue) async with XHS( work_pathstr(month_folder), folder_modeTrue, author_archiveTrue, record_dataTrue, image_formatWEBP, download_recordTrue ) as xhs: # 获取作品链接 work_links await self._fetch_user_works(profile_url, max_works) backup_results [] for link in work_links: try: result await xhs.extract(link, downloadTrue) if result: backup_results.append({ id: result.get(作品ID), title: result.get(作品标题), timestamp: datetime.now().isoformat(), status: success }) print(f✓ 已备份: {result.get(作品标题, 未知作品)}) else: backup_results.append({ url: link, status: failed, error: 提取失败 }) except Exception as e: backup_results.append({ url: link, status: error, error: str(e) }) # 保存备份元数据 self._save_metadata(backup_results) return backup_results def _save_metadata(self, results: list): 保存备份元数据到JSON文件 metadata { user_id: self.user_id, backup_time: datetime.now().isoformat(), total_works: len(results), successful: sum(1 for r in results if r[status] success), works: results } metadata_file self.backup_root / fbackup_{datetime.now().strftime(%Y%m%d_%H%M%S)}.json import json with open(metadata_file, w, encodingutf-8) as f: json.dump(metadata, f, ensure_asciiFalse, indent2)市场研究与竞品分析市场研究人员可以利用工具进行内容趋势分析import pandas as pd from typing import List, Dict from dataclasses import dataclass dataclass class ContentMetrics: likes: int comments: int shares: int collect: int publish_time: str tags: List[str] author_id: str class MarketResearchAnalyzer: def __init__(self, data_dir: Path Path(./research_data)): self.data_dir data_dir self.data_dir.mkdir(exist_okTrue) async def collect_competitor_data(self, competitor_ids: List[str], max_posts_per_user: int 50): 收集竞品账号数据 all_data [] for user_id in competitor_ids: user_folder self.data_dir / user_id user_folder.mkdir(exist_okTrue) async with XHS( work_pathstr(user_folder), record_dataTrue, download_recordFalse, # 仅收集数据不下载文件 author_archiveTrue ) as xhs: # 获取用户主页作品 user_url fhttps://www.xiaohongshu.com/user/profile/{user_id} work_links await self._extract_user_works(user_url, max_posts_per_user) for link in work_links: try: data await xhs.extract(link, downloadFalse) if data: metrics ContentMetrics( likesdata.get(点赞数量, 0), commentsdata.get(评论数量, 0), sharesdata.get(分享数量, 0), collectdata.get(收藏数量, 0), publish_timedata.get(发布时间, ), tagsdata.get(作品标签, ).split(), author_iduser_id ) all_data.append(metrics) # 避免请求过于频繁 await asyncio.sleep(1) except Exception as e: print(f收集数据失败 {link}: {e}) return self._analyze_metrics(all_data) def _analyze_metrics(self, data: List[ContentMetrics]) - Dict: 分析内容指标 df pd.DataFrame([vars(d) for d in data]) analysis { total_posts: len(df), avg_engagement: { likes: df[likes].mean(), comments: df[comments].mean(), shares: df[shares].mean(), collect: df[collect].mean() }, top_tags: self._extract_top_tags(df), posting_frequency: self._calculate_frequency(df), performance_by_author: df.groupby(author_id).agg({ likes: mean, comments: mean, shares: mean }).to_dict() } # 保存分析结果 self._save_analysis_report(analysis) return analysis图浏览器用户脚本提供网页端集成下载功能性能优化与安全实践请求频率控制与反爬虫策略为了避免对平台服务器造成过大压力项目实现了智能的请求频率控制class RateLimiter: def __init__(self, max_requests_per_minute: int 30): self.max_requests max_requests_per_minute self.request_times [] self.base_delay 2.0 # 基础延迟 self.jitter_factor 0.3 # 随机抖动因子 async def acquire(self): 获取请求许可 current_time time.time() # 清理过期记录 self.request_times [ t for t in self.request_times if current_time - t 60 ] # 检查频率限制 if len(self.request_times) self.max_requests: wait_time 60 - (current_time - self.request_times[0]) await asyncio.sleep(wait_time) self.request_times.pop(0) # 添加随机延迟避免模式识别 jitter random.uniform(-self.jitter_factor, self.jitter_factor) delay self.base_delay * (1 jitter) await asyncio.sleep(delay) self.request_times.append(time.time()) def adjust_delay_based_on_response(self, response_status: int): 根据响应状态调整延迟策略 if response_status 429: # Too Many Requests self.base_delay * 1.5 elif response_status 200: # 成功响应适当降低延迟但保持最小间隔 self.base_delay max(1.0, self.base_delay * 0.9)数据安全与隐私保护项目在设计时充分考虑了数据安全和隐私保护本地化存储所有下载内容存储在用户本地不经过任何第三方服务器Cookie管理Cookie信息仅在本地使用支持手动配置和自动清理请求加密使用HTTPS协议传输数据确保通信安全数据脱敏在日志和输出中自动脱敏敏感信息错误处理与容错机制完善的错误处理机制确保系统在异常情况下的稳定性class RobustDownloader: def __init__(self, max_retries: int 3, timeout: int 30): self.max_retries max_retries self.timeout timeout self.error_log [] async def robust_download(self, url: str, save_path: Path) - bool: 健壮的下载函数包含多种容错机制 for attempt in range(self.max_retries): try: # 1. 预检查 if not await self._pre_check(url): self.error_log.append(f预检查失败: {url}) return False # 2. 执行下载 success await self._download_with_timeout(url, save_path) if success: # 3. 完整性验证 if self._verify_integrity(save_path): return True else: self.error_log.append(f完整性验证失败: {url}) save_path.unlink(missing_okTrue) # 删除损坏文件 except (asyncio.TimeoutError, aiohttp.ClientError) as e: self.error_log.append(f网络错误 (尝试 {attempt1}/{self.max_retries}): {e}) if attempt self.max_retries - 1: await asyncio.sleep(2 ** attempt) # 指数退避 continue except Exception as e: self.error_log.append(f未知错误: {e}) break return False def _verify_integrity(self, file_path: Path) - bool: 验证文件完整性 if not file_path.exists(): return False # 检查文件大小 min_size 1024 # 最小1KB if file_path.stat().st_size min_size: return False # 检查文件头根据文件类型 try: with open(file_path, rb) as f: header f.read(100) if file_path.suffix.lower() in [.jpg, .jpeg]: return header.startswith(b\xff\xd8\xff) elif file_path.suffix.lower() .png: return header.startswith(b\x89PNG\r\n\x1a\n) elif file_path.suffix.lower() .mp4: return bftyp in header elif file_path.suffix.lower() .webp: return header.startswith(bRIFF) and bWEBP in header except: return False return True扩展开发指南自定义数据处理插件开发者可以通过继承基础类实现自定义的数据处理逻辑from typing import Optional, Dict, Any from pathlib import Path class CustomDataProcessor: 自定义数据处理插件示例 def __init__(self, config: Optional[Dict] None): self.config config or {} self.processors [] def register_processor(self, processor_func): 注册自定义处理器 self.processors.append(processor_func) async def process_content(self, content_data: Dict[str, Any], file_paths: List[Path]) - Dict[str, Any]: 处理下载的内容数据 processed_data content_data.copy() # 应用所有注册的处理器 for processor in self.processors: try: processed_data await processor(processed_data, file_paths) except Exception as e: print(f处理器错误: {e}) continue # 添加自定义元数据 processed_data[processed_time] datetime.now().isoformat() processed_data[processor_version] 1.0.0 return processed_data # 使用示例 async def add_watermark_processor(data: Dict, files: List[Path]) - Dict: 添加水印处理器 for file_path in files: if file_path.suffix.lower() in [.jpg, .jpeg, .png, .webp]: await add_watermark_to_image(file_path, XHS-Downloader) data[watermark_added] True return data async def compress_images_processor(data: Dict, files: List[Path]) - Dict: 图片压缩处理器 for file_path in files: if file_path.suffix.lower() in [.jpg, .jpeg, .png, .webp]: await compress_image(file_path, quality85) data[compression_applied] True return data # 集成到主流程 async def main(): processor CustomDataProcessor() processor.register_processor(add_watermark_processor) processor.register_processor(compress_images_processor) async with XHS() as xhs: result await xhs.extract(作品链接, downloadTrue) if result and 文件路径 in result: processed_data await processor.process_content( result, [Path(p) for p in result[文件路径]] ) return processed_data数据导出与格式转换项目支持多种数据导出格式便于后续分析import json import csv import sqlite3 from datetime import datetime from typing import List, Dict class DataExporter: 多格式数据导出器 def __init__(self, output_dir: Path Path(./exports)): self.output_dir output_dir self.output_dir.mkdir(exist_okTrue) def export_json(self, data: List[Dict], filename: str xhs_data.json): 导出为JSON格式 output_path self.output_dir / filename with open(output_path, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) return output_path def export_csv(self, data: List[Dict], filename: str xhs_data.csv): 导出为CSV格式 if not data: return None # 提取所有可能的字段 fieldnames set() for item in data: fieldnames.update(item.keys()) output_path self.output_dir / filename with open(output_path, w, newline, encodingutf-8) as f: writer csv.DictWriter(f, fieldnamessorted(fieldnames)) writer.writeheader() writer.writerows(data) return output_path def export_sqlite(self, data: List[Dict], db_name: str xhs_data.db): 导出到SQLite数据库 db_path self.output_dir / db_name conn sqlite3.connect(db_path) cursor conn.cursor() # 创建主表 cursor.execute( CREATE TABLE IF NOT EXISTS xhs_content ( id TEXT PRIMARY KEY, title TEXT, description TEXT, author_id TEXT, author_name TEXT, publish_time TEXT, likes INTEGER, comments INTEGER, shares INTEGER, collect_count INTEGER, tags TEXT, content_type TEXT, download_time TEXT, file_paths TEXT ) ) # 创建作者表 cursor.execute( CREATE TABLE IF NOT EXISTS xhs_authors ( author_id TEXT PRIMARY KEY, author_name TEXT, total_posts INTEGER, avg_likes REAL, avg_comments REAL, avg_shares REAL, last_updated TEXT ) ) # 批量插入数据 for item in data: cursor.execute( INSERT OR REPLACE INTO xhs_content VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) , ( item.get(作品ID, ), item.get(作品标题, ), item.get(作品描述, ), item.get(作者ID, ), item.get(作者昵称, ), item.get(发布时间, ), item.get(点赞数量, 0), item.get(评论数量, 0), item.get(分享数量, 0), item.get(收藏数量, 0), item.get(作品标签, ), item.get(作品类型, ), datetime.now().isoformat(), ,.join(item.get(文件路径, [])) )) conn.commit() conn.close() return db_path总结与最佳实践XHS-Downloader作为一个成熟的开源工具为小红书内容采集提供了完整的技术解决方案。在实际使用中我们建议遵循以下最佳实践配置优化建议合理设置请求间隔避免频繁请求导致IP被封禁建议设置2-3秒的基础延迟使用Cookie配置配置有效的Cookie可以获取更高分辨率的视频内容启用下载记录避免重复下载相同内容节省网络资源和存储空间按作者归档对于长期跟踪的创作者启用作者归档功能便于内容管理性能调优策略# 推荐的性能优化配置 optimized_config { timeout: 30, # 适当增加超时时间 chunk: 1024 * 1024, # 1MB分块下载 max_retry: 3, # 合理设置重试次数 image_format: WEBP, # 平衡质量和大小 folder_mode: True, # 每个作品独立文件夹 author_archive: True, # 按作者分类存储 download_record: True, # 启用下载记录 }安全合规使用遵守平台规则仅下载自己创作或已获得授权的内容尊重知识产权不得将下载内容用于商业侵权控制采集频率避免对平台服务器造成过大压力数据使用规范仅将数据用于合法合规的研究和分析目的技术发展趋势随着小红书平台技术的不断更新XHS-Downloader也在持续演进API适配优化持续跟踪平台API变化保持数据提取的稳定性性能提升优化并发处理和内存管理提高大规模采集效率格式支持扩展增加对新媒体格式的支持集成生态完善加强与各类开发工具和平台的集成能力通过合理配置和优化XHS-Downloader能够成为小红书内容处理的高效工具。无论是个人使用还是企业级部署其模块化设计和丰富的功能选项都能满足多样化的需求。项目的开源特性也使得开发者可以根据具体需求进行定制化开发构建符合自身业务场景的内容采集解决方案。【免费下载链接】XHS-Downloader小红书XiaoHongShu、RedNote链接提取/作品采集工具提取账号发布、收藏、点赞、专辑作品链接提取搜索结果作品、用户链接采集小红书作品信息提取小红书作品下载地址下载小红书作品文件项目地址: https://gitcode.com/gh_mirrors/xh/XHS-Downloader创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考