4483 字
22 分钟
B站视频转移
本文主要内容利用
BBDown和biliup-rs搬运某个B账号的视频到另外一个B站账号
因为一直有台VDS闲置着,六核心7900x服务器,想着闲着也是闲着,就去做录播了,想着薅一点叔叔的创作激励填补下服务器费用,但是可能因为录的太多了,导致创作激励被扬了,然后创建了个新账号继续做,想着把老账号视频搬过来,因此就有个这个文章。
组件说明
- Python:代码运行。
- Openlist:上传视频到Openlist对应的网盘。
- BBDown:用于B站视频下载和合并。
- biliup-rs:上传视频到B站账号。
部署流程
基本配置
- 创建对于文件夹
Terminal window mkdir -p docker_data/bilisync/data && cd docker_data/bilisync/data - 下载
BBDown、biliup-rs、ffmpeg二进制文件。Terminal window # 这里都以linux x86系统为例wget https://github.com/nilaoda/BBDown/releases/download/1.6.3/BBDown_1.6.3_20240814_linux-x64.zipunzip BBDown_1.6.3_20240814_linux-x64.zip && chmod +x BBDown && rm BBDown_1.6.3_20240814_linux-x64.zipwget https://github.com/biliup/biliup-rs/releases/download/v0.2.3/biliupR-v0.2.3-x86_64-linux.tar.xztar -xf biliupR-v0.2.3-x86_64-linux.tar.xz && mv biliupR-v0.2.3-x86_64-linux/biliup . && chmod +x biliup && rm biliupR-v0.2.3-x86_64-linux.tar.xz && rm -rf biliupR-v0.2.3-x86_64-linuxwget https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xztar -xf ffmpeg-master-latest-linux64-gpl.tar.xz && mv ffmpeg-master-latest-linux64-gpl/bin/ffmpeg . && chmod +x ffmpeg && rm ffmpeg-master-latest-linux64-gpl.tar.xz && rm -rf ffmpeg-master-latest-linux64-gpl BBDown登录,设置配置文件Terminal window ./BBDown loginnano BBDown.config#本文件是BBDown程序的配置文件#以#开头的都会被程序忽略#然后剩余非空白内容程序逐行读取,对于一个选项,其参数应当在下一行出现# 单个视频非分P的情况的文件下载后存储规则--file-pattern<videoTitle>/<videoTitle># 单个视频多分P的情况的文件下载后存储规则--multi-file-pattern<videoTitle>/<pageTitle># 下面设置下载多个分P时,每个分P的下载间隔为2秒--delay-per-page2# ffmpeg二进制文件路径--ffmpeg-path/app/data/ffmpeg# 如果你的机器下载很慢,可以尝试更改这个host,地址可以参考https://github.com/the1812/Bilibili-Evolved/issues/3234#issuecomment-1504764774#--upos-host#upos-sz-mirrorcosov.bilivideo.com# 跳过封面下载--skip-cover# 跳过字幕下载--skip-subtitle# 使用aria2下载引擎,不需要请注释-aria2# 额外的aira2下载参数--aria2c-args-j48 # 48线程biliup-rs登录按照提示登录(这个账号是你希望把视频上传到哪个B站账号,和下载账号区分开来)./biliup login
安全警告运行登录命令后,会在
data目录下生成包含您B站账户凭证的敏感文件(如BBDown.data和cookies.json)。请妥善保管,切勿在任何地方(如GitHub、论坛)分享这些文件的内容!
代码配置
-
创建
bilivideosync.py文件Terminal window nano bilivideosync.pyimport jsonimport osimport subprocessimport sysimport reimport shutilimport timeimport threadingfrom alist import AlistUploader# --- 配置区 ---# 您的视频数据# ❗只要保证每个元素有对应的bvid和title即可。获取方法非常多,说两个我用的# 1. 去主页->投稿Tab,找到以`https://api.bilibili.com/x/space/wbi/arc/search?pn=2&ps=40&tid=0&special_type=`开头的请求,复制preview->data->list->vlist数据# 2. 打开某一个合集,找到以`https://api.bilibili.com/x/polymer/web-space/seasons_archives_list?mid=`开头的请求,复制preview->data->archives数据JSON_DATA = """[{"aid": xxxxx,"bvid": "xxxxx","ctime": xxxxx,"duration": xxxx,"enable_vt": false,"interactive_video": false,"pic": "http://xxxxx","playback_position": 0,"pubdate": xxxxx,"stat": {"view": xxxxx,"vt": 0},"state": 0,"title": "xxxxxx","ugc_pay": 0,"vt_display": "","is_lesson_video": 0}]"""# 日志文件名SUCCESS_LOG_FILE = "success.log"FAIL_LOG_FILE = "fail.log"# Alist配置ALIST_BASE_URL = "http://172.17.0.1:65007" # 修改为你的Alist服务器地址ALIST_USERNAME = "xxxxxx" # ❗修改为你的Alist用户名ALIST_PASSWORD = "xxxxxx" # ❗修改为你的Alist密码# --- 配置区结束 ---# 全局变量,用于存储一次性输入的用户信息upload_details = {}# 初始化Alist上传器alist_uploader = AlistUploader(ALIST_BASE_URL, ALIST_USERNAME, ALIST_PASSWORD)def check_executables():"""检查依赖的可执行文件是否存在且可执行"""if not (os.path.exists('./BBDown') and os.access('./BBDown', os.X_OK)):print("❌ 错误: './BBDown' 不存在或没有执行权限。")sys.exit(1)if not (os.path.exists('./biliup') and os.access('./biliup', os.X_OK)):print("❌ 错误: './biliup' 不存在或没有执行权限。")sys.exit(1)print("✅ 依赖检查通过 (BBDown, biliup)。")def sanitize_filename(filename):"""移除Windows和Linux文件名中的非法字符以匹配BBDown的行为"""return re.sub(r'[\\/:*?"<>|]', '_', filename)def find_video_files(directory):"""在指定目录中查找所有视频文件,并返回一个路径列表。"""video_paths = []if not os.path.isdir(directory):return video_paths # 返回空列表# 排序以保证P序稳定file_list = sorted(os.listdir(directory))for file in file_list:if file.lower().endswith(('.mp4', '.flv', '.mkv', '.webm')):video_paths.append(os.path.join(directory, file))return video_pathsdef read_log_file(filepath):"""读取日志文件内容到集合中"""if not os.path.exists(filepath):return set()with open(filepath, 'r', encoding='utf-8') as f:return set(line.strip() for line in f if line.strip())def update_log_files(bvid, status):"""更新成功或失败日志"""success_bvids = read_log_file(SUCCESS_LOG_FILE)fail_bvids = read_log_file(FAIL_LOG_FILE)if status == 'success':success_bvids.add(bvid)if bvid in fail_bvids:fail_bvids.remove(bvid)else: # status == 'fail'fail_bvids.add(bvid)if bvid in success_bvids:success_bvids.remove(bvid)with open(SUCCESS_LOG_FILE, 'w', encoding='utf-8') as f:f.write('\n'.join(sorted(list(success_bvids))) + '\n')with open(FAIL_LOG_FILE, 'w', encoding='utf-8') as f:f.write('\n'.join(sorted(list(fail_bvids))) + '\n')def get_upload_details_from_user():"""从用户处获取上传所需信息,仅执行一次"""if not upload_details:print("\n--- 首次上传,请输入投稿信息 (后续将自动复用) ---")upload_details['tid'] = input("请输入投稿分区TID (例如: 171代表单机游戏): ")upload_details['desc'] = input("请输入视频简介: ")upload_details['tags'] = input("请输入视频标签 (用英文逗号 ',' 分隔): ")def download_video(bvid, title):"""下载指定BVID的视频,并实时显示下载进度。"""print(f"\n[📥] 正在下载: {title} (BVID: {bvid})")print("-" * 40)try:command = ['./BBDown', bvid]subprocess.run(command, check=True, text=True)print("-" * 40)print(f"✅ 下载成功: {title}")return Trueexcept subprocess.CalledProcessError:print("-" * 40)print(f"❌ 下载失败: {title} (BVID: {bvid})")return Falseexcept FileNotFoundError:print("-" * 40)print(f"❌ 命令执行失败: 找不到 './BBDown' 可执行文件。")return Falsedef upload_to_alist(title, video_paths):"""上传视频文件到Alist指定路径参数:title: 视频标题,用于生成Alist目录路径video_paths: 视频文件路径列表返回:成功上传所有文件返回True,否则返回False"""if not video_paths:print("⚠️ 没有找到需要上传到Alist的视频文件")return False# 生成Alist目标路径remote_dir_path = alist_uploader.generate_folder_path(title)print(f"[🗂️] 创建Alist目录: {remote_dir_path}")# 创建目标文件夹if not alist_uploader.create_new_folder(remote_dir_path):print(f"❌ 创建Alist目录失败: {remote_dir_path}")return False# 开始上传文件print(f"[☁️] 开始上传 {len(video_paths)} 个文件到Alist...")upload_complete_event = alist_uploader.upload_files(video_paths, remote_dir_path)# 等待所有上传完成print("等待所有文件上传完成...")upload_complete_event.wait()# 检查是否所有文件都上传成功success = alist_uploader.all_uploads_successful()if success:print(f"✅ 所有文件都成功上传到Alist: {remote_dir_path}")else:print(f"❌ 部分文件上传Alist失败,请检查日志")return successdef upload_video(bvid, title):"""上传指定视频并返回成功与否"""sanitized_folder = sanitize_filename(title)video_paths = find_video_files(sanitized_folder)if not video_paths:print(f"❌ 上传预处理失败: 在 '{sanitized_folder}' 目录中找不到视频文件。")return Falseget_upload_details_from_user()print(f"\n[🚀] 正在上传: {title} (共 {len(video_paths)} 个分P)")command = ['./biliup', 'upload','--copyright', '1','--line', 'txa','--limit', '20','--title', title,'--tid', upload_details['tid'],'--desc', upload_details['desc'],'--tag', upload_details['tags'],]command.extend(video_paths)try:subprocess.run(command, check=True, text=True)print(f"✅ 上传成功: {title}")return Trueexcept subprocess.CalledProcessError:print(f"❌ 上传失败: {title}")return Falseexcept FileNotFoundError:print(f"❌ 命令执行失败: 找不到 './biliup' 可执行文件。")return Falsedef process_video(bvid, title, download_first=True):"""完整的处理流程:下载 -> 上传到Alist -> 上传到B站 -> 清理 -> 记录"""if download_first:if not download_video(bvid, title):update_log_files(bvid, 'fail')return# 获取下载的视频文件路径sanitized_folder = sanitize_filename(title)video_paths = find_video_files(sanitized_folder)# 上传到Alist(可选步骤,失败不影响后续B站上传)# ❗如果不需要上传到openlist,请把下面三行注释掉print("\n[☁️] 开始上传到Alist...")alist_success = upload_to_alist(title, video_paths)print(f"[☁️] Alist上传{'成功' if alist_success else '失败'}")# 上传到B站if upload_video(bvid, title):update_log_files(bvid, 'success')# 上传成功后,删除文件夹try:shutil.rmtree(sanitized_folder)print(f"🗑️ 已清理文件夹: {sanitized_folder}")except OSError as e:print(f"⚠️ 清理文件夹失败: {sanitized_folder}, 错误: {e}")else:update_log_files(bvid, 'fail')def main():"""主执行函数"""check_executables()try:videos_data = json.loads(JSON_DATA)video_map = {v['bvid']: v for v in videos_data}except json.JSONDecodeError:print("❌ 错误: JSON 数据格式不正确。")sys.exit(1)successful_bvids = read_log_file(SUCCESS_LOG_FILE)failed_bvids = read_log_file(FAIL_LOG_FILE)processed_bvids = set()if failed_bvids:print("\n--- 检查上次失败的任务 ---")for bvid in list(failed_bvids):if bvid not in video_map:print(f"ℹ️ 失败列表中的 {bvid} 在当前JSON数据中不存在,已跳过。")continuevideo_info = video_map[bvid]title = video_info['title']sanitized_folder = sanitize_filename(title)print(f"\n[🔄] 正在重试失败任务: {title} (BVID: {bvid})")if os.path.isdir(sanitized_folder):print(" └── 检测到已下载的文件夹,将直接尝试上传。")process_video(bvid, title, download_first=False)else:print(" └── 未检测到文件夹,将重新下载并上传。")process_video(bvid, title, download_first=True)processed_bvids.add(bvid)print("\n--- 开始处理新的任务列表 ---")for video in videos_data:bvid = video.get("bvid")title = video.get("title")if not bvid or not title:continueif bvid in processed_bvids:print(f"ℹ️ {bvid} 已在失败重试阶段处理过,跳过。")continueif bvid in successful_bvids:print(f"ℹ️ {title} (BVID: {bvid}) 已在之前成功上传,跳过。")continueprocess_video(bvid, title, download_first=True)print("\n\n🎉🎉🎉 所有任务处理完毕!🎉🎉🎉")final_fails = read_log_file(FAIL_LOG_FILE)if final_fails:print(f"\n⚠️ 注意: 仍有 {len(final_fails)} 个任务失败,请检查 '{FAIL_LOG_FILE}' 文件。")if __name__ == '__main__':main()注注意代码里的❗符号,需要修改或确认自己需要不需要
-
创建
alist.py文件Terminal window nano alist.pyimport requestsimport hashlibimport jsonimport urllib.parseimport osimport timeimport threadingimport queueimport refrom datetime import datetimefrom tqdm import tqdmimport subprocessclass AlistUploader:def __init__(self, base_url, username, password):self.base_url = base_urlself.username = usernameself.password = passwordself.token = Noneself.upload_queue = queue.Queue()self.upload_threads = []self.max_threads = 6 # 最大并发上传线程数self.upload_results = {} # 存储上传结果self.upload_complete_event = threading.Event()def get_token(self):"""获取 Alist 的 token"""if self.token:return self.tokenurl = f"{self.base_url}/api/auth/login/hash"password_with_suffix = f"{self.password}-https://github.com/alist-org/alist"hashed_password = hashlib.sha256(password_with_suffix.encode()).hexdigest()payload = {"username": self.username,"password": hashed_password}headers = {"Content-Type": "application/json"}response = requests.post(url, headers=headers, data=json.dumps(payload))if response.status_code == 200:result = response.json()if result["code"] == 200 and result["message"] == "success":self.token = result["data"]["token"]return self.token# print(f"获取 token 失败: {response.text}")tqdm.write(f"获取 token 失败: {response.text}")return Nonedef create_new_folder(self, folder_path):"""创建文件夹"""url = f"{self.base_url}/api/fs/mkdir"token = self.get_token()if not token:return Falseheaders = {"Authorization": token,"Content-Type": "application/json"}payload = {"path": folder_path}response = requests.post(url, headers=headers, json=payload)if response.status_code == 200:result = response.json()if result["code"] == 200 and result["message"] == "success":# print(f"新建文件夹成功: {folder_path}")tqdm.write(f"新建文件夹成功: {folder_path}")return Trueelse:# print(f"新建文件夹失败: {result['message']}")tqdm.write(f"新建文件夹失败: {result['message']}")return Falseelse:# print(f"新建文件夹请求失败,状态码: {response.status_code}, 响应内容: {response.text}")tqdm.write(f"新建文件夹请求失败,状态码: {response.status_code}, 响应内容: {response.text}")return Falsedef extract_date_user_from_title(self, title):"""从标题中提取日期和用户名"""# 提取用户名user_match = re.search(r'【弹幕】(.*?)直播回放', title)user = user_match.group(1) if user_match else "未知用户"# 提取日期date_match = re.search(r'(\d{4})年(\d{2})月(\d{2})日', title)if date_match:year = date_match.group(1)month = date_match.group(2)day = date_match.group(3)else:# 尝试其他日期格式date_match = re.search(r'(\d{4})[-.年](\d{1,2})[-.月](\d{1,2})', title)if date_match:year = date_match.group(1)month = date_match.group(2).zfill(2)day = date_match.group(3).zfill(2)else:# 使用当前日期now = datetime.now()year = str(now.year)month = str(now.month).zfill(2)day = str(now.day).zfill(2)return user, year, month, daydef generate_folder_path(self, title):"""生成文件夹路径: /115/录播/{{user}}/{{yyyy}}年/{{MM}}-{{dd}}"""user, year, month, day = self.extract_date_user_from_title(title)return f"/115/录播/{user}/{year}年/{month}-{day}"def upload_file(self, local_file_path, remote_dir_path, as_task=True, retry=0):"""上传单个文件到 Alist,通过调用系统 curl 命令实现。避免 Python 内存占用,并为并行上传做准备(并行性由调用方管理)。"""token = self.get_token()if not token:tqdm.write("无法获取 AList 访问令牌。")return Falsefile_name = os.path.basename(local_file_path)# 确保远程目录路径以 / 结尾,或者不以 / 结尾,然后拼接文件名为 /# Alist 的 File-Path 头部需要一个完整的远程文件路径,例如 /path/to/dir/file.txtremote_file_path = f"{remote_dir_path.rstrip('/')}/{file_name}"url = f"{self.base_url}/api/fs/form"# 构建 curl 命令cmd_parts = ["curl","-X", "PUT", # HTTP 方法为 PUT"--http1.1","--retry", "3", # 失败自动重试3次"--retry-delay", "5", # 重试间隔5秒"-H", f"Authorization: {token}",# File-Path 头部的值需要 URL 编码"-H", f"File-Path: {urllib.parse.quote(remote_file_path)}",]if as_task:cmd_parts.extend(["-H", "As-Task: true"])cmd_parts.extend(["-s", "--show-error", "--fail"])cmd_parts.extend(["-F", f"file=@{local_file_path}"])# 添加 URLcmd_parts.append(url)# 可选:为了调试,你可以添加 -v (verbose) 或 --progress-bar# cmd_parts.append("-v") # 输出更详细的请求/响应过程# cmd_parts.append("--progress-bar") # 显示进度条try:curl_response = subprocess.run(cmd_parts,capture_output=True, # 捕获 stdout 和 stderrtext=True,check=False, # 不在非零退出码时抛出 CalledProcessErrorencoding='utf-8',)# 优先解析 AList 的 JSON 响应result = Nonetry:if curl_response.stdout: # 只有当 stdout 不为空时才尝试解析result = json.loads(curl_response.stdout)except json.JSONDecodeError:# 如果 JSON 解析失败,则这是一个错误情况error_message = f"无法解析 AList 响应 JSON: {curl_response.stdout.strip() if curl_response.stdout else '无输出'}"if curl_response.stderr:error_message += f"\ncURL 错误输出: {curl_response.stderr.strip()}"if retry < 3:tqdm.write(f"上传 {file_name} 失败: {error_message},正在重试...")return self.upload_file(local_file_path, remote_dir_path, as_task, retry + 1)else:tqdm.write(f"上传 {file_name} 最终失败: {error_message}")return False# 根据 AList 的 JSON 响应判断成功或失败if result and result.get("code") == 200 and result.get("message") == "success":# AList 明确表示成功,即使 curl 退出码非零 (如 92),也视为成功if as_task and 'task' in result.get("data", {}):task_id = result["data"]["task"]["id"]# tqdm.write(f"文件 {file_name} 上传任务已创建,任务ID: {task_id}")return self.monitor_upload_task(task_id, local_file_path, remote_dir_path)# tqdm.write(f"文件 {file_name} 上传成功。")return Trueelse:# AList 返回了错误代码或消息,或者没有返回有效的 JSON (已在上层捕获)alist_error_message = result.get("message", "未知错误或无AList响应") if result else "无AList响应或解析失败"# 组合 AList 的错误信息和 curl 的退出码/错误输出,用于日志full_error_detail = f"AList错误: {alist_error_message}. "if curl_response.returncode != 0:full_error_detail += f"curl退出码: {curl_response.returncode}. "if curl_response.stderr:full_error_detail += f"cURL错误输出: {curl_response.stderr.strip()}."if retry < 3:tqdm.write(f"上传 {file_name} 失败: {full_error_detail},正在重试...")return self.upload_file(local_file_path, remote_dir_path, as_task, retry + 1)else:tqdm.write(f"上传 {file_name} 最终失败: {full_error_detail}")return Falseexcept FileNotFoundError:# curl 命令本身没有找到tqdm.write(f"错误: curl 命令未找到。请确保 curl 已安装并配置在系统 PATH 中。")return Falseexcept Exception as e:# 捕获其他任何意外的Python异常if retry < 3:tqdm.write(f"上传文件 {file_name} 时出错: {str(e)},正在重试...")return self.upload_file(local_file_path, remote_dir_path, as_task, retry + 1)else:tqdm.write(f"上传文件 {file_name} 时出错: {str(e)}")return Falsedef get_task_info(self, task_id):"""获取指定任务的信息"""token = self.get_token()if not token:return Noneurl = f"{self.base_url}/api/task/upload/info"params = {"tid": task_id}headers = {"Authorization": token}response = requests.post(url, headers=headers, params=params)if response.status_code == 200:result = response.json()if result["code"] == 200 and result["message"] == "success":return result["data"]else:# print(f"获取任务信息失败: {result['message']}")tqdm.write(f"获取任务信息失败: {result['message']}")return Noneelse:tqdm.write(f"获取任务信息失败,状态码: {response.status_code}, 响应内容: {response.text}")return Nonedef monitor_upload_task(self, task_id, local_file_path=None, remote_dir_path=None, poll_interval=2, timeout=7200, retry=0):"""监控任务进度,直到任务完成,失败时支持重试参数:task_id: 任务IDlocal_file_path: 本地文件路径,用于重试remote_dir_path: 远程目录路径,用于重试poll_interval: 轮询间隔(秒)timeout: 超时时间(秒)retry: 剩余重试次数返回:成功时返回任务信息,失败且重试次数用完时返回False"""from tqdm import tqdmstart_time = time.time()pbar = Nonetry:while time.time() - start_time < timeout:task_info = self.get_task_info(task_id)if not task_info:time.sleep(poll_interval)continue# 获取进度progress = task_info.get('progress', 0)state = task_info.get('state', '')# 初始化进度条if pbar is None:# pbar = tqdm(total=100, desc=f"上传任务 {task_id}", unit="%")pbar = tqdm(total=100, desc=f"{local_file_path.split('/')[-1]} 上传中", unit="%")# 更新进度条pbar.n = int(progress * 100)pbar.refresh()# 检查任务是否完成state = task_info.get('state', '')error = task_info.get('error', '')if state == 2 and progress >= 1.0:# tqdm.write(f"任务完成!总共传输: {task_info.get('total_bytes', 0)} 字节")return Trueelif state == 2 and progress == 0:# print("任务已完成,秒传成功")# tqdm.write("任务已完成,秒传成功")return Trueelif error:retry += 1# 如果有错误且有重试次数,尝试重新上传if retry < 4 and local_file_path and remote_dir_path and os.path.exists(local_file_path):pbar.close()pbar = None# print(f"任务失败: {error},剩余重试次数: {4 - retry}")tqdm.write(f"任务失败: {error},剩余重试次数: {4 - retry}")# 重新上传文件file_name = os.path.basename(local_file_path)remote_file_path = f"{remote_dir_path}/{file_name}"url = f"{self.base_url}/api/fs/form"headers = {"Authorization": self.get_token(),"File-Path": urllib.parse.quote(remote_file_path),"As-Task": "true","Content-Length": str(os.path.getsize(local_file_path))}with open(local_file_path, 'rb') as file:# 核心修改:重试时也使用 data 参数进行流式上传response = requests.put(url, headers=headers, data=file)if response.status_code == 200:result = response.json()if result["code"] == 200 and result["message"] == "success" and 'task' in result["data"]:new_task_id = result["data"]["task"]["id"]tqdm.write(f"重新创建上传任务,新任务ID: {new_task_id}")# 递归调用自身监控新任务,减少重试次数return self.monitor_upload_task(new_task_id,local_file_path,remote_dir_path,poll_interval,timeout - (time.time() - start_time), # 剩余超时时间retry - 1)elif retry >= 4:tqdm.write(f"任务失败: {error},所有重试次数已用完")return Falseelse:tqdm.write(f"任务失败: {error},无法重试")return Falsetime.sleep(poll_interval)tqdm.write(f"\n监控超时,已经等待 {timeout} 秒")return Falsefinally:# 确保进度条被关闭if pbar is not None:pbar.close()def _worker(self):"""工作线程函数,处理上传队列中的任务"""while True:try:file_info = self.upload_queue.get(block=False)if file_info is None: # 结束信号self.upload_queue.task_done()breaklocal_file_path, remote_dir_path = file_infofile_name = os.path.basename(local_file_path)# tqdm.write(f"开始上传文件: {file_name} 到 {remote_dir_path}")success = self.upload_file(local_file_path, remote_dir_path)# 记录上传结果self.upload_results[local_file_path] = success# tqdm.write(f"文件 {file_name} {'上传成功' if success else '上传失败'}")self.upload_queue.task_done()except queue.Empty:# 队列为空,检查是否所有任务已完成if self.upload_queue.unfinished_tasks == 0:breaktime.sleep(0.5)except Exception as e:tqdm.write(f"上传线程出错: {str(e)}")self.upload_queue.task_done()def upload_files(self, file_paths, remote_dir_path):"""异步上传多个文件到 Alist 的指定目录参数:file_paths: 本地文件路径列表remote_dir_path: 远程目录路径返回:一个事件对象,可以用于等待上传完成"""# 清空之前的结果和事件状态self.upload_results = {}self.upload_complete_event.clear()# 将所有文件加入上传队列for file_path in file_paths:self.upload_queue.put((file_path, remote_dir_path))# 启动工作线程self.upload_threads = []for _ in range(min(self.max_threads, len(file_paths))):thread = threading.Thread(target=self._worker)thread.daemon = Truethread.start()self.upload_threads.append(thread)# 启动监控线程,等待所有上传完成monitor_thread = threading.Thread(target=self._monitor_upload_completion)monitor_thread.daemon = Truemonitor_thread.start()return self.upload_complete_eventdef _monitor_upload_completion(self):"""监控线程,等待所有上传任务完成"""for thread in self.upload_threads:thread.join()self.upload_complete_event.set()# 汇总上传结果total_files = len(self.upload_results)success_count = sum(1 for success in self.upload_results.values() if success)tqdm.write(f"\n上传完成: {success_count}/{total_files} 文件上传成功")def all_uploads_successful(self):"""检查所有文件是否都上传成功"""if not self.upload_results:return Falsereturn all(self.upload_results.values())# 测试代码if __name__ == "__main__":uploader = AlistUploader(base_url="http://xxxxx",username="xxxxx",password="xxxxx")# 测试创建文件夹test_folder = "/115/录播/测试用户/2025年/05-31"uploader.create_new_folder(test_folder)# 测试上传文件test_files = ["/config/Desktop/bilibili.tar.gz"]event = uploader.upload_files(test_files, test_folder)# 等待上传完成event.wait()tqdm.write(f"所有文件上传完成,全部成功: {uploader.all_uploads_successful()}")
配置Docker容器
- 创建
Dockerfile和docker-compose.yamlcd .. && nano Dockerfile && nano docker-compose.yaml`Dockerfile FROM python:3.10-bookwormWORKDIR /app# 安装aria2和对应python包RUN apt update && \apt install -y \aria2 && \apt clean && \rm -rf /var/lib/apt/lists/* && \pip install --no-cache-dir tqdm requestsCMD ["python3", "bilivideosync.py"]docker-compose.yaml services:bilivideosync:build: .volumes:- ./data:/apptty: true # 必须保留,否则第一个视频下载并同步到网盘结束后,无法输入所需的上传到B站的分区ID,视频简介和视频标签stdin_open: true # 必须保留,否则第一个视频下载并同步到网盘结束后,无法输入所需的上传到B站的分区ID,视频简介和视频标签network_mode: bridge - 启动容器
docker compose up -d - 启动容器后,输入
docker compose attach --sig-proxy=false bilivideosync进入容器内交互式终端,等待输入分区ID的提示出现,分别填写完分区ID、视频简介和视频Tags后,即可退出,后续会复用第一次填写的信息
最后更新于 2025-07-13,距今已过 126 天
部分内容可能已过时
Lim's Blog