
爬虫爬到一半网络断了、电脑重启了、IP 被封了——如果从头再来前面的进度就白费了。断点续爬就是让爬虫记住爬到了哪里中断后自动从断点继续。一、为什么需要断点续爬你没有断点续爬 爬了200页 → 网络中断 → 重新从第1页开始 → 浪费大量时间 你有断点续爬 爬了200页 → 网络中断 → 重新启动 → 从第201页继续 → 不浪费不能续爬的爬虫只适合小规模几百页大规模采集几万页以上必须有断点续爬能力。二、实现方式一文件记录进度最简单的方式爬到哪一页就写到文件里。importjsonimportosimporttimeimportrequestsfrombs4importBeautifulSoupclassResumeCrawler:支持断点续爬的爬虫def__init__(self,start_page1,end_page100):self.state_filecrawl_state.json# 状态文件self.data_filecrawled_data.jsonl# 数据文件self.end_pageend_page self.current_pageself._load_state()orstart_page self.sessionrequests.Session()self.session.headers.update({User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0})def_load_state(self):加载上次的进度ifos.path.exists(self.state_file):withopen(self.state_file,r)asf:statejson.load(f)print(f 发现上次进度第{state[current_page]}页)returnstate[current_page]returnNonedef_save_state(self):保存当前进度withopen(self.state_file,w)asf:json.dump({current_page:self.current_page},f)def_save_data(self,items):保存爬到的数据withopen(self.data_file,a,encodingutf-8)asf:foriteminitems:f.write(json.dumps(item,ensure_asciiFalse)\n)defcrawl_page(self,page):爬取单页urlfhttps://example.com/products?page{page}respself.session.get(url)soupBeautifulSoup(resp.text,html.parser)items[]forproductinsoup.select(.product):items.append({title:product.select_one(.title).text.strip(),price:product.select_one(.price).text.strip(),page:page,crawl_time:time.time(),})returnitemsdefrun(self):主运行逻辑print(f开始爬取从第{self.current_page}页到第{self.end_page}页)whileself.current_pageself.end_page:try:print(f正在爬取第{self.current_page}页...)itemsself.crawl_page(self.current_page)ifitems:self._save_data(items)print(f 第{self.current_page}页完成获取{len(items)}条)# 保存进度self._save_state()self.current_page1time.sleep(1)# 礼貌延时exceptExceptionase:print(f❌ 第{self.current_page}页出错:{e})print(f已保存进度到第{self.current_page}页等待 30 秒后重试...)self._save_state()time.sleep(30)# 等一会再重试print(f✅ 全部完成共爬取到第{self.end_page}页)# 使用crawlerResumeCrawler(start_page1,end_page500)crawler.run()# 如果中途中断了重新运行会自动从断点继续三、实现方式二Redis 记录进度分布式环境多台机器同时爬时文件方式就不行了——文件在 A 机器上B 机器看不到。用 Redis 存进度所有机器共享importredisimportjsonclassRedisResumeCrawler:基于 Redis 的断点续爬def__init__(self,namecrawler,start_page1):self.namename self.redis_clientredis.Redis(hostlocalhost,port6379,db0)self.progress_keyf{name}:progressself.data_keyf{name}:dataself.current_pageself._load_progress()orstart_pagedef_load_progress(self):从 Redis 加载进度progressself.redis_client.get(self.progress_key)ifprogress:pageint(progress)print(f 上次爬到第{page}页继续...)returnpagereturnNonedef_save_progress(self,page):保存进度到 Redisself.redis_client.set(self.progress_key,page)def_save_data(self,items):数据存到 Redis 列表foriteminitems:self.redis_client.rpush(self.data_key,json.dumps(item,ensure_asciiFalse))defrun(self,end_page1000):运行爬虫whileself.current_pageend_page:# ... 爬取逻辑同上 ...self._save_progress(self.current_page)self.current_page1四、增量采集——只爬新数据断点续爬解决的是中断后重爬增量采集解决的是已经爬过的不再爬。1. 基于 URL 去重importhashlibimportredisclassIncrementalCrawler:增量爬虫只爬新 URLdef__init__(self):self.redisredis.Redis(hostlocalhost,port6379,db0)self.seen_keycrawler:seen_urlsdef_url_hash(self,url):URL 指纹returnhashlib.md5(url.encode()).hexdigest()defis_crawled(self,url):检查 URL 是否已经爬过returnself.redis.sismember(self.seen_key,self._url_hash(url))defmark_crawled(self,url):标记 URL 已爬self.redis.sadd(self.seen_key,self._url_hash(url))defextract_new_urls(self,page_url,new_urls):从新页面中筛选出未爬过的 URLfresh_urls[]forurlinnew_urls:ifnotself.is_crawled(url):fresh_urls.append(url)returnfresh_urls2. 基于内容哈希去重相同内容不再爬importhashlibclassContentDedupCrawler:基于内容去重的爬虫def__init__(self):self.content_hashesset()def_content_hash(self,text):计算内容的指纹# 去掉空白字符后再算哈希避免微小的格式差异导致重复clean.join(text.split())returnhashlib.md5(clean.encode()).hexdigest()defis_duplicate(self,text):判断内容是否重复hself._content_hash(text)ifhinself.content_hashes:returnTrueself.content_hashes.add(h)returnFalse3. 基于时间戳只爬新闻/文章类fromdatetimeimportdatetime,timedeltaclassTimeBasedCrawler:基于时间的增量爬虫def__init__(self,max_days7):self.max_daysmax_days self.cutoff_datedatetime.now()-timedelta(daysmax_days)defshould_crawl(self,article_date_str):判断是否应该爬取只爬最近 N 天的try:article_datedatetime.strptime(article_date_str,%Y-%m-%d)returnarticle_dateself.cutoff_dateexcept:returnTrue# 日期解析失败就默认爬defrun_daily(self):每天定时执行todaydatetime.now().strftime(%Y-%m-%d)print(f开始增量采集{today})# 只爬当天的数据self.crawl_by_date(today)五、完整示例增量爬取新闻importrequestsfrombs4importBeautifulSoupimportjsonimporthashlibimportosimporttimeclassNewsIncrementalCrawler:新闻增量爬虫def__init__(self,state_filenews_state.json):self.state_filestate_file self.stateself._load_state()self.sessionrequests.Session()def_load_state(self):加载状态ifos.path.exists(self.state_file):withopen(self.state_file,r)asf:returnjson.load(f)return{crawled_urls:[],latest_crawl_time:}def_save_state(self):保存状态withopen(self.state_file,w)asf:json.dump(self.state,f,ensure_asciiFalse,indent2)def_is_crawled(self,url):是否已经爬过url_hashhashlib.md5(url.encode()).hexdigest()returnurl_hashinself.state[crawled_urls]def_mark_crawled(self,url):标记已爬url_hashhashlib.md5(url.encode()).hexdigest()self.state[crawled_urls].append(url_hash)# 只保留最近 10000 条防止状态文件越来越大iflen(self.state[crawled_urls])10000:self.state[crawled_urls]self.state[crawled_urls][-10000:]defcrawl_news_list(self):爬取新闻列表页urlhttps://example.com/newsrespself.session.get(url)soupBeautifulSoup(resp.text,html.parser)new_articles[]foriteminsoup.select(.news-item):titleitem.select_one(.title).text.strip()linkitem.select_one(a).get(href)dateitem.select_one(.date).text.strip()# 只处理新文章ifnotself._is_crawled(link):new_articles.append({title:title,url:link,date:date,})returnnew_articlesdefcrawl_detail(self,url):爬取新闻详情respself.session.get(url)soupBeautifulSoup(resp.text,html.parser)return{content:soup.select_one(.article-content).text.strip(),author:soup.select_one(.author).text.strip(),}defrun(self):执行增量采集print(f开始增量采集上次共爬取{len(self.state[crawled_urls])}条)new_articlesself.crawl_news_list()print(f本次发现{len(new_articles)}条新文章)forarticleinnew_articles:try:detailself.crawl_detail(article[url])article.update(detail)# 保存到文件withopen(news_data.jsonl,a,encodingutf-8)asf:f.write(json.dumps(article,ensure_asciiFalse)\n)# 标记已爬self._mark_crawled(article[url])self._save_state()print(f ✅{article[title]})time.sleep(1)exceptExceptionase:print(f ❌{article[title]}:{e})print(f增量采集完成新增{len(new_articles)}篇)# 每天定时执行crawlerNewsIncrementalCrawler()crawler.run()六、数据去重的三种粒度去重粒度方法适用场景URL 去重记 URL 是否爬过通用最常用内容去重算内容的 MD5同一篇文章有不同 URL转载字段去重按标题/ID去重数据库已有唯一约束推荐组合URL 去重为主 内容去重为辅。七、断点续爬的最佳实践① 每次爬完一页/一条就保存进度不要等全部爬完才存 ② 状态文件和数据文件分开存 ③ 定期清理 URL 去重集合防止无限增长 ④ 爬虫启动时先恢复进度而不是从头开始 ⑤ 每页爬完加 1-3 秒延时给后续重试留时间最重要的原则宁可重复保存进度也不要丢失进度。每爬一页存一次是安全的每 10 页存一次就有丢失风险。 觉得有用的话点赞 关注【张老师技术栈】吧每周更新 Java/Python/爬虫 实战干货不让你白来。