中国建设银行员工网站,百度网站小程序怎么做,可以做营销任务的网站,wordpress建站详细教程视频项目场景#xff1a;
网络爬虫项目#xff0c;主要实现多进程、多线程方式快速缓存网页资源到MongoDB#xff0c;并解析网页数据#xff0c;将信息写入到csv文件中。 问题描述
在单独使用多线程的过程中#xff0c;是没有问题的#xff0c;比如这个爬虫示例是爬取豆瓣电…
项目场景
网络爬虫项目主要实现多进程、多线程方式快速缓存网页资源到MongoDB并解析网页数据将信息写入到csv文件中。 问题描述
在单独使用多线程的过程中是没有问题的比如这个爬虫示例是爬取豆瓣电影排行榜TOP250解析到csv中数据还是250条在实现多进程的方式中主要是通过MongoDB来实现一个队列的效果多条进程从数据库中取出待解析的链接进行解析在实现的过程中发现解析数据是没有问题的打印到控制台的数据是没有丢失数据的情况但是在最终写出的csv文件中数据只有一小部分。 在尝试了国内所有能用的AI之后无果AI只能对逻辑问题判断而对一些Runtime问题还是差点意思好在CSDN有大佬将问题发布到问答区后大佬一句话就点醒了我在此表示感谢。
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime,timedelta
from multiprocessing.dummy import Pool
import os
import random
import re
import threading
import time
import urllib.parse
import urllib.request
import urllib3
from urllib.parse import urlparse, urlsplit
from urllib.parse import urljoin
import urllib.robotparser
from lxml import html as lhtml
import csv
import pickle
import zlib
from bson.binary import Binary
from pymongo import MongoClient
from zipfile import ZipFile
from io import StringIO
# 多线程爬虫
# 封装MongoDB缓存类
class MongoCache:def __init__(self,clientNone,expirestimedelta(days30)):if client None:self.client MongoClient(localhost,27017)else:self.client clientself.db self.client[cache]self.webpage self.db[webcrawler]self.expires expiresself.webpage.create_index(timestamp,expireAfterSecondsexpires.total_seconds())def __getitem__(self,url):根据url从磁盘提取缓存record self.webpage.find_one({_id:url})if record:return pickle.loads(zlib.decompress(record[result]))else:raise KeyError(url 不存在)def __setitem__(self,url,result):将数据存入磁盘缓存中record {result:Binary(zlib.compress(pickle.dumps(result))),timestamp:datetime.now()}self.webpage.update_one({_id:url},{$set:record},upsertTrue)
# 将下载功能封装成一个类
class Downloader:def __init__(self,delay5,user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0,proxiesNone,request_max3,cacheNone):self.throttle Throttle(delaydelay)self.user_agent user_agentself.proxies proxiesself.request_max request_maxself.cache cache# 定义连接管理池self.http urllib3.PoolManager()def __call__(self,url):result Noneif self.cache:try:result self.cache[url]except KeyError:passelse:if self.request_max 0 and 500 result[code] 600:result Noneif result is None:self.throttle.wait(url)proxy random.choice(self.proxies) if self.proxies else Noneheaders {User-agent:self.user_agent}result self.download(url,headers,self.request_max,proxy)if self.cache:self.cache[url] resultreturn result[html]def download(self,url,headers,request_max,proxyNone):print(正在下载, URL{}.format(url))# 发起GET请求request urllib.request.Request(url,headersheaders)response self.http.request(GET, url,headersheaders)# 如果使用代理的话if proxy:opener urllib.request.build_opener()proxy_params {urlparse.urlparse(url).scheme:proxy}opener.add_handler(urllib.request.ProxyHandler(proxy_params))try:response opener.open(request)if response.status 200:html response.dataelse:print(遇到了错误状态码是{}.format(response.status))if request_max 0:self.download(url,headers,request_max-1,proxy)except Exception as e:print(下载遇到了错误,错误代码是{}.format(e))html Noneif request_max 3:html self.download(url,headers,request_max-1,proxy)finally:response.release_conn()return {html:html,code:response.status}else:# 如果没有选择代理那就正常请求try:if response.status 200:html_file response.data # 或者 response.data.decode(utf-8) 如果需要字符串# 在这里处理 htmlfile比如保存到文件或进行解析等return {html:html_file,code:response.status}else:print(遇到了错误状态码是{}.format(response.status))if request_max 0:self.download(url,headers,request_max-1)except urllib3.exceptions.HTTPError as e:print(遇到了错误错误代码是{}.format(e))except Exception as ex:print(遇到了错误错误代码是{}.format(ex))finally:response.release_conn()
# 定义一个scrape_callback类用于存储解析到的数据
class Scrape_callback:def __init__(self):self.writer csv.writer(open(D:/Crawl_Results/downloaded_data.csv,w, encodingutf-8,newline,errorsreplace))self.fields (中文名,外文名,评分,上映时间,国家,导演,时长,类型)self.writer.writerow(self.fields)def __call__(self,html):if not self.writer:raise RuntimeError(CSV writer is not initialized. Call open_writer() first.)html_string html.decode(utf-8)root lhtml.fromstring(html_string)result_list []try:# 解析电影标题title_content root.cssselect(div#content)[0]span_title title_content.cssselect(span[propertyv:itemreviewed])[0]title_text span_title.text_content().split( ,1)for name in title_text:result_list.append(name)if len(title_text) 1:result_list.append(--)# 解析电影评分rate_span root.cssselect(strong[propertyv:average])[0]rate_text rate_span.text_content()result_list.append(rate_text)# 解析上映国家及日期date_span root.cssselect(span[propertyv:initialReleaseDate])[0]date_text date_span.text_content()parenthesis_index date_text.find(()if parenthesis_index ! -1:# 提取日期部分括号前的所有字符date date_text[:parenthesis_index]# 提取国家部分括号内及之后的字符再去除括号country date_text[parenthesis_index 1:-1]else:# 如果没有找到括号则只有日期部分date date_textcountry --result_list.append(date)result_list.append(country)# 解析导演direct_by_a root.cssselect(a[relv:directedBy])[0]direct_by_text direct_by_a.text_content()result_list.append(direct_by_text)# 解析片长runtime_span root.cssselect(span[propertyv:runtime])[0]runtime_text runtime_span.text_content()result_list.append(runtime_text)gener_text# 解析类型gener_spans root.cssselect(span[propertyv:genre])for gener_span in gener_spans:gener_text gener_span.text_content() |gener_text gener_text.rstrip(|)result_list.append(gener_text)print({}|{}|{}|{}|{}.format(title_text,rate_text,date,country,direct_by_text,gener_text))self.writer.writerow(result_list)result_list.clear() # 清空列表以备下次使用而不是重新创建except IndexError:print(未找到指定的元素)except Exception as e:print(f处理过程中发生错误: {e})def do_write(self,result_list):if not self.writer None:self.writer.writerow(result_list)else:print(打开文件失败)def close_writer(self):# 如果writer是外部创建的则不应在此关闭文件# 但在当前上下文中文件是在这个类中打开的所以应该在这里关闭if self.writer:#self.writer.writerow([]) # 写入空行作为结束标记可选# 注意在with块外不需要手动关闭文件它会自动处理self.writer None # 清除writer引用帮助垃圾回收
# 定义一个类用于控制延时
class Throttle:用于控制爬虫访问统一域名资源时的延时# 初始化函数def __init__(self,delay):self.delay delayself.domains {}# 控制延时def wait(self,url):# 解析url获取域名domain urlparse(url).netloc# 获取上一次访问的时间last_accessed self.domains.get(domain)# 如果设置到延时并且已经访问过了if self.delay 0 and last_accessed is not None:# 计算从上次访问到当前时间过去的秒数与规定的延迟时长的差值sleep_secs self.delay - (datetime.now() - last_accessed).seconds# 判断距离上次访问的时间间隔是否达到了延迟要求if sleep_secs 0:print(正在休眠将等待{}秒后再次连接.format(sleep_secs))# 如果时间还没有达到就调用time.sleep进行休眠time.sleep(sleep_secs)# 更新本次访问的时间self.domains[domain] datetime.now()
# 爬取网页的函数
def threaded_crawler(delay,request_max,seed_url,link_regex,max_deepth5,max_threads6,scrape_callbackScrape_callback(),cacheMongoCache(),proxiesNone):# 定义一个User_agent列表user_agent_list [BadCrawler,GoodCrawler]# 解析网站的robots.txtrp urllib.robotparser.RobotFileParser()rp.set_url(f{seed_url}/robots.txt)rp.read()# 定义一个用户当前设置的user_agentcurrent_user_agent Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0# 只有当默认的火狐这个User-agent被禁再从user_agent_list中找看还有合适的没if not rp.can_fetch(current_user_agent,seed_url):# 从列表中找一个网站允许的user_agentfor user_agent in user_agent_list:if rp.can_fetch(user_agent,seed_url):current_user_agent user_agentbreakelse:print(该网站的robots.txt禁止我们访问)# 从提供的种子url生成一个待解析的url列表crawl_url_queue [seed_url]# 定义一个字典记录链接和深度用于判断链接是否已经下载避免在不同页面中反复横跳have_crawl_url_queue {seed_url:0}downloader Downloader(delaydelay,user_agentcurrent_user_agent,cachecache,request_maxrequest_max,proxiesproxies)for item_count in range(1,10):current_url {}?start{}.format(seed_url,item_count*25)crawl_url_queue.append(current_url)have_crawl_url_queue[current_url] 0def process_queue():current_thread threading.current_thread()thread_name current_thread.namewhile crawl_url_queue:try:# 只要列表中有值则弹出一个url用于解析url crawl_url_queue.pop()print(线程{}正在处理{}.format(thread_name,url))except IndexError as index_error:breakelse:# 读取当前要解析url的深度如果深度超过最大值则停止deepth have_crawl_url_queue[url]if deepth max_deepth:# 执行下载html downloader(url)if not html None:# 如果有传入提取数据的回调函数则调用它if scrape_callback:scrape_callback(html)# 从下载到的html网页中递归的获取链接links_from_html get_links(html)if not links_from_html None:for link in links_from_html:link urljoin(seed_url,link)# 判断找到的链接是否符合我们想要的正则表达式if re.match(link_regex,link):# 如果符合再判断是否已经下载过了如果没有下载过就把它加到待解析的url列表和已下载集合中if link not in have_crawl_url_queue:have_crawl_url_queue[link] deepth 1crawl_url_queue.append(link)threads []while threads or crawl_url_queue:for thread in threads:if not thread.is_alive():threads.remove(thread)while len(threads) max_threads and crawl_url_queue:thread threading.Thread(targetprocess_queue)thread.setDaemon(True)thread.start()threads.append(thread)# 从下载到的html中继续解析连接
def get_links(html):webpage_regex re.compile(a[^]href[\](.*?)[\],re.IGNORECASE)if not html None:html_string html.decode(utf-8)return webpage_regex.findall(html_string)else:return None# 测试
seed_urlhttps://movie.douban.com/top250
link_regex^https://(?!music\\.douban\\.com/subject/)movie\\.douban\\.com/subject/(\\d)/$
threaded_crawler(5,5,seed_url,link_regex,5) 原因分析
当多个进程或线程试图同时写入同一个CSV文件时因为文件I/O操作不是线程安全的特别是在没有适当锁定机制的情况下在这个脚本中虽然使用了锁但是锁只是锁定了线程间的竞争多个进程在写入的时候实际上是存在文件覆盖的情况的为了解决这个问题我们可以采用“分而治之”的策略让每个进程将其结果写入一个独立的CSV文件然后再合并这些文件。 解决方案
在调用Scrape_callback()类时为其传入进程的ID让每一条进程单独处理一个csv文件这样就不存在文件覆盖的问题在解析完所有的文件后再将这些csv文件合并为一个文件输出。
from datetime import datetime,timedelta
import multiprocessing
import os
import random
import re
import threading
import time
import urllib.parse
import urllib.request
import urllib3
from urllib.parse import urlparse, urlsplit
from urllib.parse import urljoin
import urllib.robotparser
from lxml import html as lhtml
import csv
import pickle
import zlib
from bson.binary import Binary
from pymongo import MongoClient,errors
from zipfile import ZipFile
from io import StringIO
# 多进程
# 封装MongoDB进程队列
class MongoQueue:OUTSTANDING,PROCESSING,COMPLETE range(3)def __init__(self,clientNone,timeout300):if client None:self.client MongoClient(localhost,27017)else:self.client clientself.db self.client[cache]self.webpage self.db[crawler_queue]self.timeout timeoutself.lock threading.Lock()def __bool__(self):record self.webpage.find_one({status:{$ne:self.COMPLETE}})if record:return Trueelse:return Falsedef push(self,url):with self.lock:try:self.webpage.insert_one({_id:url,status:self.OUTSTANDING,timestamp:datetime.now()})except errors.DuplicateKeyError as e:self.repair()passdef pop(self):with self.lock:record self.webpage.find_one_and_update(filter {status:self.OUTSTANDING},update{$set:{status:self.PROCESSING,timestamp:datetime.now()}})if record:return record[_id]else:self.repair()raise KeyError()def complete(self,url):#self.webpage.update_one({_id:url},{$set:{status:self.COMPLETE}})self.webpage.delete_one({_id:url})def repair(self):record self.webpage.find_one_and_update(filter{timestamp:{$lt:datetime.now() - timedelta(secondsself.timeout)},status:{$ne:self.OUTSTANDING}},update{$set:{status:self.OUTSTANDING}})if record:print(Released:{}.format(record[_id]))def clear(self):self.webpage.delete_many({status:{$ne:self.OUTSTANDING}})# 封装磁盘缓存类
class DiskCache:def __init__(self,max_length,cache_dirD:\\Crawl_Results\\cache,expirestimedelta(days30)):self.cache_dir cache_dirself.max_length max_lengthself.expires expiresdef url_to_path(self,url):从传入的url中创建文件路径components urlsplit(url)path components.pathif not path:path /index.htmlelif path.endswith(/):path index.htmlfilename components.netloc path components.queryfilename re.sub([^/0-9a-zA-Z\\-.,;],_,filename)filename /.join(segment[:255] for segment in filename.split(/))return os.path.join(self.cache_dir,filename)def __getitem__(self,url):根据url从磁盘提取缓存path self.url_to_path(url)if os.path.exists(path):with open(path,rb) as fp:#result,timestamp pickle.loads(zlib.decompress(fp.read()))result fp.read()# if self.has_expired(timestamp):# raise KeyError(url 缓存资源已过期)# return resultelse:raise KeyError(url 不存在)def __setitem__(self,url,result):将数据存入磁盘缓存中path self.url_to_path(url)folder os.path.dirname(path)# 时间戳# timestamp datetime.now()# data pickle.dumps((result,timestamp))if not os.path.exists(folder):os.makedirs(folder)print(保存到了{}.format(folder))with open(path,wb) as fp:#fp.write(zlib.compress(data))fp.write(result)def has_expired(self, timestamp):判断缓存是否过期return datetime.now() timestamp self.expires# 封装MongoDB缓存类
class MongoCache:def __init__(self,clientNone,expirestimedelta(days30)):if client None:self.client MongoClient(localhost,27017)else:self.client clientself.db self.client[cache]self.webpage self.db[webcrawler]self.expires expiresself.webpage.create_index(timestamp,expireAfterSecondsexpires.total_seconds())def __getitem__(self,url):根据url从磁盘提取缓存record self.webpage.find_one({_id:url})if record:return pickle.loads(zlib.decompress(record[result]))else:raise KeyError(url 不存在)def __setitem__(self,url,result):将数据存入磁盘缓存中record {result:Binary(zlib.compress(pickle.dumps(result))),timestamp:datetime.now()}self.webpage.update_one({_id:url},{$set:record},upsertTrue)# 将下载功能封装成一个类
class Downloader:def __init__(self,delay5,user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0,proxiesNone,request_max3,cacheNone):self.throttle Throttle(delaydelay)self.user_agent user_agentself.proxies proxiesself.request_max request_maxself.cache cache# 定义连接管理池self.http urllib3.PoolManager()def __call__(self,url):result Noneif self.cache:try:result self.cache[url]except KeyError:passelse:if self.request_max 0 and 500 result[code] 600:result Noneif result is None:self.throttle.wait(url)proxy random.choice(self.proxies) if self.proxies else Noneheaders {User-agent:self.user_agent}result self.download(url,headers,self.request_max,proxy)if self.cache:self.cache[url] resultreturn result[html]def download(self,url,headers,request_max,proxyNone):print(正在下载, URL{}.format(url))# 发起GET请求request urllib.request.Request(url,headersheaders)response self.http.request(GET, url,headersheaders)# 如果使用代理的话if proxy:opener urllib.request.build_opener()proxy_params {urlparse.urlparse(url).scheme:proxy}opener.add_handler(urllib.request.ProxyHandler(proxy_params))try:response opener.open(request)if response.status 200:html response.dataelse:print(遇到了错误状态码是{}.format(response.status))if request_max 0:self.download(url,headers,request_max-1,proxy)except Exception as e:print(下载遇到了错误,错误代码是{}.format(e))html Noneif request_max 3:html self.download(url,headers,request_max-1,proxy)finally:response.release_conn()return {html:html,code:response.status}else:# 如果没有选择代理那就正常请求try:if response.status 200:html_file response.data # 或者 response.data.decode(utf-8) 如果需要字符串# 在这里处理 htmlfile比如保存到文件或进行解析等return {html:html_file,code:response.status}else:print(遇到了错误状态码是{}.format(response.status))if request_max 0:self.download(url,headers,request_max-1)except urllib3.exceptions.HTTPError as e:print(遇到了错误错误代码是{}.format(e))except Exception as ex:print(遇到了错误错误代码是{}.format(ex))finally:response.release_conn()# 定义一个scrape_callback类用于存储解析到的数据
class Scrape_callback:def __init__(self,process_id):self.writer csv.writer(open(fD:/Crawl_Results/downloaded_data_{process_id}.csv,w, encodingutf-8,newline,errorsreplace))self.fields (中文名,外文名,评分,上映时间,国家,导演,时长,类型)self.writer.writerow(self.fields)self.process_id process_idself.lock threading.Lock()def __call__(self,html):with self.lock:if not self.writer:raise RuntimeError(CSV writer is not initialized. Call open_writer() first.)html_string html.decode(utf-8)root lhtml.fromstring(html_string)result_list []try:# 解析电影标题title_content root.cssselect(div#content)[0]span_title title_content.cssselect(span[propertyv:itemreviewed])[0]title_text span_title.text_content().split( ,1)for name in title_text:result_list.append(name)if len(title_text) 1:result_list.append(--)# 解析电影评分rate_span root.cssselect(strong[propertyv:average])[0]rate_text rate_span.text_content()result_list.append(rate_text)# 解析上映国家及日期date_span root.cssselect(span[propertyv:initialReleaseDate])[0]date_text date_span.text_content()parenthesis_index date_text.find(()if parenthesis_index ! -1:# 提取日期部分括号前的所有字符date date_text[:parenthesis_index]# 提取国家部分括号内及之后的字符再去除括号country date_text[parenthesis_index 1:-1]else:# 如果没有找到括号则只有日期部分date date_textcountry --result_list.append(date)result_list.append(country)# 解析导演direct_by_a root.cssselect(a[relv:directedBy])[0]direct_by_text direct_by_a.text_content()result_list.append(direct_by_text)# 解析片长runtime_span root.cssselect(span[propertyv:runtime])[0]runtime_text runtime_span.text_content()result_list.append(runtime_text)gener_text# 解析类型gener_spans root.cssselect(span[propertyv:genre])for gener_span in gener_spans:gener_text gener_span.text_content() |gener_text gener_text.rstrip(|)result_list.append(gener_text)print({}|{}|{}|{}|{}.format(title_text,rate_text,date,country,direct_by_text,gener_text))self.writer.writerow(result_list)result_list.clear() # 清空列表以备下次使用而不是重新创建except IndexError:print(未找到指定的元素)except Exception as e:print(f处理过程中发生错误: {e})def do_write(self,result_list):if not self.writer None:self.writer.writerow(result_list)else:print(打开文件失败)def close_writer(self):# 如果writer是外部创建的则不应在此关闭文件# 但在当前上下文中文件是在这个类中打开的所以应该在这里关闭if self.writer:#self.writer.writerow([]) # 写入空行作为结束标记可选# 注意在with块外不需要手动关闭文件它会自动处理self.writer None # 清除writer引用帮助垃圾回收# 定义一个类用于控制延时
class Throttle:用于控制爬虫访问统一域名资源时的延时# 初始化函数def __init__(self,delay):self.delay delayself.domains {}# 控制延时def wait(self,url):# 解析url获取域名domain urlparse(url).netloc# 获取上一次访问的时间last_accessed self.domains.get(domain)# 如果设置到延时并且已经访问过了if self.delay 0 and last_accessed is not None:# 计算从上次访问到当前时间过去的秒数与规定的延迟时长的差值sleep_secs self.delay - (datetime.now() - last_accessed).seconds# 判断距离上次访问的时间间隔是否达到了延迟要求if sleep_secs 0:print(正在休眠将等待{}秒后再次连接.format(sleep_secs))# 如果时间还没有达到就调用time.sleep进行休眠time.sleep(sleep_secs)# 更新本次访问的时间self.domains[domain] datetime.now()# 爬取网页的函数
def threaded_crawler(seed_url,link_regex,process_id,max_threads3,crawl_queue MongoQueue()):# 创建用于文件解析的类scrape_callback Scrape_callback(process_id)# 定义一个User_agent列表user_agent_list [BadCrawler,GoodCrawler]# 解析网站的robots.txtrp urllib.robotparser.RobotFileParser()rp.set_url(f{seed_url}/robots.txt)rp.read()# 定义一个用户当前设置的user_agentcurrent_user_agent Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0# 只有当默认的火狐这个User-agent被禁再从user_agent_list中找看还有合适的没if not rp.can_fetch(current_user_agent,seed_url):# 从列表中找一个网站允许的user_agentfor user_agent in user_agent_list:if rp.can_fetch(user_agent,seed_url):current_user_agent user_agentbreakelse:print(该网站的robots.txt禁止我们访问)# 创建队列并把种子url添加进去crawl_queue.push(seed_url)downloader Downloader(delay5,user_agentcurrent_user_agent,cacheMongoCache(),request_max5,proxiesNone)for item_count in range(1,10):current_url {}?start{}.format(seed_url,item_count*25)crawl_queue.push(current_url)def process_queue():current_thread threading.current_thread()thread_name current_thread.namewhile crawl_queue:try:#print(当前有带解析的链接共{}条.format(len(crawl_queue)))# 只要列表中有值则弹出一个url用于解析url crawl_queue.pop()except IndexError as index_error:print(出错了)breakexcept KeyError as keyerror:passelse:# 执行下载html downloader(url)if not html None:# 如果有传入提取数据的回调函数则调用它scrape_callback(html)# 从下载到的html网页中递归的获取链接links_from_html get_links(html)if not links_from_html None:for link in links_from_html:link urljoin(seed_url,link)# 判断找到的链接是否符合我们想要的正则表达式if re.match(link_regex,link):crawl_queue.push(link)# 修改url的状态crawl_queue.complete(url)threads []while threads or crawl_queue:for thread in threads:if not thread.is_alive():threads.remove(thread)while len(threads) max_threads and crawl_queue:thread threading.Thread(targetprocess_queue)thread.setDaemon(True)thread.start()threads.append(thread)scrape_callback.close_writer()crawl_queue.clear()# 多进程函数
def process_link_crawler(args,**kwargs):# 解包参数以获取seed_url和link_regexseed_url, link_regex argsnum_cpus multiprocessing.cpu_count()processes []csv_files []use_cpu num_cpus//4for i in range(use_cpu):process_id fpid_{os.getpid()}_{i} # 生成唯一的进程ID标识符csv_files.append(fD:/Crawl_Results/downloaded_data_{process_id}.csv)p multiprocessing.Process(targetthreaded_crawler, args(seed_url, link_regex, process_id))p.start()processes.append(p)for p in processes:p.join()# 解析完毕开始合并文件# 合并CSV文件merge_csv_files(csv_files, D:/Crawl_Results/merged_data.csv)# 用于合并csv的
def merge_csv_files(csv_files, output_file):with open(output_file, w, encodingutf-8, newline) as outfile:writer csv.writer(outfile)for csv_file in csv_files:with open(csv_file, r, encodingutf-8, errorsreplace) as infile:reader csv.reader(infile)next(reader) # 跳过标题行因为它已经在第一个文件中写入了for row in reader:writer.writerow(row)print(合并成功!)# 清理单独的CSV文件可选for csv_file in csv_files:os.remove(csv_file)# 从下载到的html中继续解析连接
def get_links(html):webpage_regex re.compile(a[^]href[\](.*?)[\],re.IGNORECASE)if not html None:html_string html.decode(utf-8)return webpage_regex.findall(html_string)else:return Nonedef main():# 测试seed_url2https://movie.douban.com/annual/2023/seed_urlhttps://movie.douban.com/top250link_regex^https://(?!music\\.douban\\.com/subject/)movie\\.douban\\.com/subject/(\\d)/$process_link_crawler((seed_url,link_regex))if __name__ __main__:main()