一、分块下载
特点:可以实现断点续下
import os
import requests
def download_file_with_resume(url, dest_path, chunk_size):# 获取文件的总大小response = requests.head(url)total_size = int(response.headers.get('content-length', 0))# 检查文件是否已部分下载downloaded_size = 0if os.path.exists(dest_path):downloaded_size = os.path.getsize(dest_path)with open(dest_path, 'ab') as f:# 从上次下载的位置继续下载for start in range(downloaded_size, total_size, chunk_size):end = min(start + chunk_size - 1, total_size - 1)headers = {'Range': f'bytes={start}-{end}'}part_response = requests.get(url, headers=headers, stream=True)total_size = int(part_response.headers.get('content-length', 0))print("total_size:",total_size)if part_response.status_code == 206 or part_response.status_code == 200:f.write(part_response.content)else:raise Exception(f"Failed to download chunk: {part_response.status_code}")print(f"Download completed: {dest_path}")# 示例用法
url = 'https://xxxx.com//video.mp4'
dest_path = 'abc.mp4'
chunk_size = 1024*1024*1 # 1MB
download_file_with_resume(url, dest_path,chunk_size)
二、线程池分块下载(大幅提高下载速度)
特点:下载速度快,1GB几十秒内下完
import os
import requests
import math
import threadpool
import time
import shutil
class BlockDownload:def __init__(self):self.error_try = 5 # 请求下载错误重试次数self.wait_time = 3 # 请求下载错误等待时间self.proxies = {'http':'127.0.0.1:7890','https':'127.0.0.1:7890'}self.timeout = 6def download_chunk(self,url, start_byte, end_byte,chunk_num, output_file_path):"""下载文件的一个块并保存到临时文件。"""headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/547.34 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/547.34','Range':f'bytes={start_byte}-{end_byte}'}for i in range(self.error_try):try:response = requests.get(url, headers=headers,timeout=self.timeout,proxies=self.proxies,stream=True)if response.status_code == 206 or response.status_code == 200:with open(f'{output_file_path}.part{chunk_num}', 'wb') as file:file.write(response.content)print(f"块 {chunk_num} 下载完成。")returnelse:print(f"块 {chunk_num} 下载失败,状态码: {response.status_code}")continueexcept Exception as e:print(f"块 {chunk_num} 下载错误,{i} 次重试!",e)time.sleep(self.wait_time)continueprint(f"块 {chunk_num} 下载失败,程序结束!")exit(0)def merge_chunks(self,output_file_path, num_chunks):"""合并所有下载的块到最终文件。"""with open(output_file_path, 'wb') as output_file:for i in range(num_chunks):temp_file_path = f'{output_file_path}.part{i}'with open(temp_file_path, 'rb') as temp_file:shutil.copyfileobj(temp_file,output_file)os.remove(temp_file_path)print(f"所有块已合并为: {output_file_path}")def get_optimal_chunk_size(slef,total_size):"""获取最佳分块大小"""if total_size < 64 * 1024 * 1024: # 小于64MBreturn 1 * 1024 * 1024 # 1MBelif total_size < 512 * 1024 * 1024: # 小于512MBreturn 2 * 1024 * 1024 # 2MBelif total_size < 1024 * 1024 * 1024: # 小于1024MBreturn 4 * 1024 * 1024 # 4MBelse:return 8 * 1024 * 1024 # 8MBdef thread_start(self,url,file_path,thread_num):headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/547.34 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/547.34'}response = requests.head(url,headers=headers,timeout=self.timeout,proxies=self.proxies)total_size = int(response.headers.get('content-length', 0))if not total_size:response = requests.head(response.headers.get('Location', url), headers=headers, timeout=self.timeout, proxies=self.proxies)total_size = int(response.headers.get('content-length', 0))if not total_size:print(f"视频大小为:{total_size}")returnprint(f"文件总大小:{total_size},开始下载...")chunk_size = self.get_optimal_chunk_size(total_size)block_num = math.ceil(total_size / chunk_size) # 总分块数arguments_list = [] # 创建存放任务参数列表for i in range(block_num):start_byte = i * chunk_sizeend_byte = start_byte + chunk_size - 1if i == block_num - 1:end_byte = total_size - 1 # 最后一个块可能会包含剩余的所有字节arguments_list.append(([url, start_byte, end_byte, i, file_path], None))pool = threadpool.ThreadPool(thread_num) # 创建线程tasks_list = threadpool.makeRequests(self.download_chunk, arguments_list) # 按照参数列表长度创建任务列表for task in tasks_list:pool.putRequest(task) # 将要执行的任务放入线程池pool.wait()self.merge_chunks(file_path, block_num)def run(self):download_url = 'https://xxxx.com/video.mp4'file_path = r'D:\Video\xxx.mp4'thread_num = 32 # 线程数,最好根据视频大小、带宽设置self.thread_start(download_url,file_path,thread_num)if __name__ == '__main__':bd = BlockDownload()bd.run()