
前面的爬虫都是同步的——发一个请求等着回来再发下一个。遇到大规模采集时同步 IO 等在网络上的时间占了 90%CPU 一直在空闲。用aiohttpasyncio做异步爬虫同时发出几十个请求总时间从几小时压缩到十几分钟。一、同步 vs 异步 的核心区别# 同步一个一个来总共 10 秒# 请求1 → 等1秒 → 请求2 → 等1秒 → ... → 请求10 → 等1秒# 总时间 10秒# 异步同时发出总共 1 秒# 请求1 → 等1秒 → 返回# 请求2 → 等1秒 → 返回# ... 1秒后全部返回# 请求10 → 等1秒 → 返回# 总时间 ≈ 1秒异步适合 IO 密集型任务网络请求、文件读写不适合 CPU 密集型图片处理、数据计算。二、aiohttp 基础1. 安装pipinstallaiohttp2. 最简单的异步请求importaiohttpimportasyncioasyncdeffetch(url):异步请求一个 URLasyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresp:# 返回文本内容returnawaitresp.text()# 执行htmlasyncio.run(fetch(https://example.com))print(html[:200])3. 并发请求多个 URLimportaiohttpimportasyncioimporttimeasyncdeffetch_one(session,url):单个请求try:asyncwithsession.get(url,timeout10)asresp:returnawaitresp.text()exceptExceptionase:returnf请求失败:{e}asyncdeffetch_all(urls):并发请求所有 URLasyncwithaiohttp.ClientSession()assession:tasks[fetch_one(session,url)forurlinurls]resultsawaitasyncio.gather(*tasks)returnresults# 使用urls[fhttps://example.com/page/{i}foriinrange(1,21)]starttime.time()resultsasyncio.run(fetch_all(urls))print(f总耗时:{time.time()-start:.2f}秒)print(f共获取{len(results)}个页面)同步写法跑 20 个页面要 20 秒以上异步跑大约 1-2 秒取决于网络。三、控制并发数——信号量如果不控制并发一下子发出几百个请求很可能被网站封 IP 或自己电脑连接数不够。importaiohttpimportasyncioclassAsyncCrawler:带并发控制的异步爬虫def__init__(self,max_concurrency10):# 信号量控制最大并发数self.semaphoreasyncio.Semaphore(max_concurrency)self.results[]asyncdeffetch(self,session,url):带并发限制的请求asyncwithself.semaphore:# 超过 max_concurrency 会等待try:asyncwithsession.get(url,timeout10)asresp:textawaitresp.text()print(f完成:{url}({len(text)}字符))return(url,text)exceptExceptionase:print(f失败:{url}-{e})return(url,None)asyncdefcrawl(self,urls):批量爬取asyncwithaiohttp.ClientSession()assession:tasks[self.fetch(session,url)forurlinurls]self.resultsawaitasyncio.gather(*tasks)returnself.resultsdefsave_results(self,filenameresults.json):保存结果importjson data[]forurl,contentinself.results:ifcontent:data.append({url:url,length:len(content)})withopen(filename,w,encodingutf-8)asf:json.dump(data,f,ensure_asciiFalse,indent2)print(f已保存{len(data)}条结果到{filename})# 使用同时最多 10 个请求crawlerAsyncCrawler(max_concurrency10)urls[fhttps://example.com/page/{i}foriinrange(1,101)]importtime starttime.time()resultsasyncio.run(crawler.crawl(urls))print(f总耗时:{time.time()-start:.2f}s)crawler.save_results()四、异步爬取 解析可以用asyncio.Queue做生产者-消费者模式importaiohttpimportasynciofrombs4importBeautifulSoupasyncdefworker(name,queue,session,results):消费者从队列取 URL 并爬取whileTrue:urlawaitqueue.get()try:asyncwithsession.get(url)asresp:htmlawaitresp.text()# 解析soupBeautifulSoup(html,html.parser)titlesoup.title.stringifsoup.titleelse无标题results.append({url:url,title:title})print(f[{name}] 完成:{url}→{title})exceptExceptionase:print(f[{name}] 失败:{url}-{e})finally:queue.task_done()asyncdefmain(urls,concurrency10):主入口生产者消费者模式queueasyncio.Queue()results[]# 生产者往队列放 URLforurlinurls:awaitqueue.put(url)asyncwithaiohttp.ClientSession()assession:# 创建 N 个消费者协程workers[asyncio.create_task(worker(fworker-{i},queue,session,results))foriinrange(concurrency)]# 等待队列处理完毕awaitqueue.join()# 取消所有 workerforwinworkers:w.cancel()returnresults# 使用urls[fhttps://example.com/page/{i}foriinrange(1,51)]resultsasyncio.run(main(urls,concurrency10))print(f\n共爬取{len(results)}个页面)forrinresults[:5]:print(f{r[url]}→{r[title]})五、超时与重试1. 设置超时asyncdeffetch_with_timeout(session,url):带超时的请求try:# 总超时30秒连接超时10秒timeoutaiohttp.ClientTimeout(total30,connect10)asyncwithsession.get(url,timeouttimeout)asresp:returnawaitresp.text()exceptasyncio.TimeoutError:print(f超时:{url})returnNone2. 自动重试asyncdeffetch_with_retry(session,url,max_retries3):带重试的请求forattemptinrange(max_retries):try:asyncwithsession.get(url,timeout10)asresp:ifresp.status200:returnawaitresp.text()else:print(f状态码异常{resp.status}:{url})exceptExceptionase:print(f第{attempt1}次失败:{url}-{e})awaitasyncio.sleep(2**attempt)# 指数退避1s、2s、4sreturnNone六、异步 代理asyncdeffetch_with_proxy(session,url,proxy):使用代理try:asyncwithsession.get(url,proxyproxy,timeout10)asresp:returnawaitresp.text()exceptExceptionase:print(f代理{proxy}请求失败:{e})returnNoneasyncdefcrawl_with_proxies(urls,proxies):使用代理池并发爬取asyncwithaiohttp.ClientSession()assession:tasks[]fori,urlinenumerate(urls):proxyproxies[i%len(proxies)]tasks.append(fetch_with_proxy(session,url,proxy))returnawaitasyncio.gather(*tasks)七、异步爬虫的最佳实践并发数设置10 个并发 → 阿里云等大网站基本没压力 20 个并发 → 多数小网站也扛得住 50 个并发 → 可能触发反爬 100 个并发 → 被 ban 概率极高且本地连接数可能不够用建议从 5-10 个并发开始慢慢往上加。完整模板importaiohttpimportasyncioimporttimefromtypingimportList,DictclassBaseAsyncCrawler:异步爬虫基类def__init__(self,max_concurrency10,delay0):self.max_concurrencymax_concurrency self.delaydelay# 请求间隔秒self.semaphoreasyncio.Semaphore(max_concurrency)self.sessionNoneasyncdef__aenter__(self):self.sessionaiohttp.ClientSession()returnselfasyncdef__aexit__(self,*args):awaitself.session.close()asyncdeffetch(self,url:str)-str:单个请求asyncwithself.semaphore:try:asyncwithself.session.get(url,timeout10)asresp:ifself.delay:awaitasyncio.sleep(self.delay)returnawaitresp.text()exceptExceptionase:print(f请求失败{url}:{e})returnasyncdefcrawl(self,urls:List[str])-List[str]:批量爬取tasks[self.fetch(url)forurlinurls]returnawaitasyncio.gather(*tasks)# 使用asyncdefmain():urls[fhttps://example.com/page/{i}foriinrange(10)]asyncwithBaseAsyncCrawler(max_concurrency5)ascrawler:starttime.time()resultsawaitcrawler.crawl(urls)print(f完成{len(results)}个请求耗时{time.time()-start:.2f}s)asyncio.run(main())八、异步 vs 多线程 怎么选对比异步aiohttp多线程requestsThreadPool性能✅ 极高几千并发没问题❌ 受限于 GIL 和线程切换代码⭐⭐ 需要 async/await 语法⭐ 简单不用学新语法调试⭐⭐ 稍麻烦⭐ 容易适用大规模采集上万条中小规模几千条建议爬几千条数据用requests ThreadPoolExecutor就够了爬几万条以上上aiohttp异步不要为了异步而异步简单够用优先 觉得有用的话点赞 关注【张老师技术栈】吧每周更新 Java/Python/爬虫 实战干货不让你白来。