Mobile wallpaper
4483 字
22 分钟

B站视频转移

2025-07-13
浏览量 加载中...
本文主要内容

利用BBDownbiliup-rs搬运某个B账号的视频到另外一个B站账号

因为一直有台VDS闲置着,六核心7900x服务器,想着闲着也是闲着,就去做录播了,想着薅一点叔叔的创作激励填补下服务器费用,但是可能因为录的太多了,导致创作激励被扬了,然后创建了个新账号继续做,想着把老账号视频搬过来,因此就有个这个文章。

组件说明#

  • Python:代码运行。
  • Openlist:上传视频到Openlist对应的网盘。
  • BBDown:用于B站视频下载和合并。
  • biliup-rs:上传视频到B站账号。

部署流程#

基本配置#

  1. 创建对于文件夹
    Terminal window
    mkdir -p docker_data/bilisync/data && cd docker_data/bilisync/data
  2. 下载BBDownbiliup-rsffmpeg二进制文件。
    Terminal window
    # 这里都以linux x86系统为例
    wget https://github.com/nilaoda/BBDown/releases/download/1.6.3/BBDown_1.6.3_20240814_linux-x64.zip
    unzip BBDown_1.6.3_20240814_linux-x64.zip && chmod +x BBDown && rm BBDown_1.6.3_20240814_linux-x64.zip
    wget https://github.com/biliup/biliup-rs/releases/download/v0.2.3/biliupR-v0.2.3-x86_64-linux.tar.xz
    tar -xf biliupR-v0.2.3-x86_64-linux.tar.xz && mv biliupR-v0.2.3-x86_64-linux/biliup . && chmod +x biliup && rm biliupR-v0.2.3-x86_64-linux.tar.xz && rm -rf biliupR-v0.2.3-x86_64-linux
    wget https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-linux64-gpl.tar.xz
    tar -xf ffmpeg-master-latest-linux64-gpl.tar.xz && mv ffmpeg-master-latest-linux64-gpl/bin/ffmpeg . && chmod +x ffmpeg && rm ffmpeg-master-latest-linux64-gpl.tar.xz && rm -rf ffmpeg-master-latest-linux64-gpl
  3. BBDown登录,设置配置文件
    Terminal window
    ./BBDown login
    nano BBDown.config
    #本文件是BBDown程序的配置文件
    #以#开头的都会被程序忽略
    #然后剩余非空白内容程序逐行读取,对于一个选项,其参数应当在下一行出现
    # 单个视频非分P的情况的文件下载后存储规则
    --file-pattern
    <videoTitle>/<videoTitle>
    # 单个视频多分P的情况的文件下载后存储规则
    --multi-file-pattern
    <videoTitle>/<pageTitle>
    # 下面设置下载多个分P时,每个分P的下载间隔为2秒
    --delay-per-page
    2
    # ffmpeg二进制文件路径
    --ffmpeg-path
    /app/data/ffmpeg
    # 如果你的机器下载很慢,可以尝试更改这个host,地址可以参考https://github.com/the1812/Bilibili-Evolved/issues/3234#issuecomment-1504764774
    #--upos-host
    #upos-sz-mirrorcosov.bilivideo.com
    # 跳过封面下载
    --skip-cover
    # 跳过字幕下载
    --skip-subtitle
    # 使用aria2下载引擎,不需要请注释
    -aria2
    # 额外的aira2下载参数
    --aria2c-args
    -j48 # 48线程
  4. biliup-rs登录
    ./biliup login
    按照提示登录(这个账号是你希望把视频上传到哪个B站账号,和下载账号区分开来)
安全警告

运行登录命令后,会在 data 目录下生成包含您B站账户凭证的敏感文件(如 BBDown.datacookies.json)。请妥善保管,切勿在任何地方(如GitHub、论坛)分享这些文件的内容!

代码配置#

  1. 创建bilivideosync.py文件

    Terminal window
    nano bilivideosync.py
    import json
    import os
    import subprocess
    import sys
    import re
    import shutil
    import time
    import threading
    from alist import AlistUploader
    # --- 配置区 ---
    # 您的视频数据
    # ❗只要保证每个元素有对应的bvid和title即可。获取方法非常多,说两个我用的
    # 1. 去主页->投稿Tab,找到以`https://api.bilibili.com/x/space/wbi/arc/search?pn=2&ps=40&tid=0&special_type=`开头的请求,复制preview->data->list->vlist数据
    # 2. 打开某一个合集,找到以`https://api.bilibili.com/x/polymer/web-space/seasons_archives_list?mid=`开头的请求,复制preview->data->archives数据
    JSON_DATA = """
    [
    {
    "aid": xxxxx,
    "bvid": "xxxxx",
    "ctime": xxxxx,
    "duration": xxxx,
    "enable_vt": false,
    "interactive_video": false,
    "pic": "http://xxxxx",
    "playback_position": 0,
    "pubdate": xxxxx,
    "stat": {
    "view": xxxxx,
    "vt": 0
    },
    "state": 0,
    "title": "xxxxxx",
    "ugc_pay": 0,
    "vt_display": "",
    "is_lesson_video": 0
    }
    ]
    """
    # 日志文件名
    SUCCESS_LOG_FILE = "success.log"
    FAIL_LOG_FILE = "fail.log"
    # Alist配置
    ALIST_BASE_URL = "http://172.17.0.1:65007" # 修改为你的Alist服务器地址
    ALIST_USERNAME = "xxxxxx" # ❗修改为你的Alist用户名
    ALIST_PASSWORD = "xxxxxx" # ❗修改为你的Alist密码
    # --- 配置区结束 ---
    # 全局变量,用于存储一次性输入的用户信息
    upload_details = {}
    # 初始化Alist上传器
    alist_uploader = AlistUploader(ALIST_BASE_URL, ALIST_USERNAME, ALIST_PASSWORD)
    def check_executables():
    """检查依赖的可执行文件是否存在且可执行"""
    if not (os.path.exists('./BBDown') and os.access('./BBDown', os.X_OK)):
    print("❌ 错误: './BBDown' 不存在或没有执行权限。")
    sys.exit(1)
    if not (os.path.exists('./biliup') and os.access('./biliup', os.X_OK)):
    print("❌ 错误: './biliup' 不存在或没有执行权限。")
    sys.exit(1)
    print("✅ 依赖检查通过 (BBDown, biliup)。")
    def sanitize_filename(filename):
    """移除Windows和Linux文件名中的非法字符以匹配BBDown的行为"""
    return re.sub(r'[\\/:*?"<>|]', '_', filename)
    def find_video_files(directory):
    """在指定目录中查找所有视频文件,并返回一个路径列表。"""
    video_paths = []
    if not os.path.isdir(directory):
    return video_paths # 返回空列表
    # 排序以保证P序稳定
    file_list = sorted(os.listdir(directory))
    for file in file_list:
    if file.lower().endswith(('.mp4', '.flv', '.mkv', '.webm')):
    video_paths.append(os.path.join(directory, file))
    return video_paths
    def read_log_file(filepath):
    """读取日志文件内容到集合中"""
    if not os.path.exists(filepath):
    return set()
    with open(filepath, 'r', encoding='utf-8') as f:
    return set(line.strip() for line in f if line.strip())
    def update_log_files(bvid, status):
    """更新成功或失败日志"""
    success_bvids = read_log_file(SUCCESS_LOG_FILE)
    fail_bvids = read_log_file(FAIL_LOG_FILE)
    if status == 'success':
    success_bvids.add(bvid)
    if bvid in fail_bvids:
    fail_bvids.remove(bvid)
    else: # status == 'fail'
    fail_bvids.add(bvid)
    if bvid in success_bvids:
    success_bvids.remove(bvid)
    with open(SUCCESS_LOG_FILE, 'w', encoding='utf-8') as f:
    f.write('\n'.join(sorted(list(success_bvids))) + '\n')
    with open(FAIL_LOG_FILE, 'w', encoding='utf-8') as f:
    f.write('\n'.join(sorted(list(fail_bvids))) + '\n')
    def get_upload_details_from_user():
    """从用户处获取上传所需信息,仅执行一次"""
    if not upload_details:
    print("\n--- 首次上传,请输入投稿信息 (后续将自动复用) ---")
    upload_details['tid'] = input("请输入投稿分区TID (例如: 171代表单机游戏): ")
    upload_details['desc'] = input("请输入视频简介: ")
    upload_details['tags'] = input("请输入视频标签 (用英文逗号 ',' 分隔): ")
    def download_video(bvid, title):
    """
    下载指定BVID的视频,并实时显示下载进度。
    """
    print(f"\n[📥] 正在下载: {title} (BVID: {bvid})")
    print("-" * 40)
    try:
    command = ['./BBDown', bvid]
    subprocess.run(command, check=True, text=True)
    print("-" * 40)
    print(f"✅ 下载成功: {title}")
    return True
    except subprocess.CalledProcessError:
    print("-" * 40)
    print(f"❌ 下载失败: {title} (BVID: {bvid})")
    return False
    except FileNotFoundError:
    print("-" * 40)
    print(f"❌ 命令执行失败: 找不到 './BBDown' 可执行文件。")
    return False
    def upload_to_alist(title, video_paths):
    """
    上传视频文件到Alist指定路径
    参数:
    title: 视频标题,用于生成Alist目录路径
    video_paths: 视频文件路径列表
    返回:
    成功上传所有文件返回True,否则返回False
    """
    if not video_paths:
    print("⚠️ 没有找到需要上传到Alist的视频文件")
    return False
    # 生成Alist目标路径
    remote_dir_path = alist_uploader.generate_folder_path(title)
    print(f"[🗂️] 创建Alist目录: {remote_dir_path}")
    # 创建目标文件夹
    if not alist_uploader.create_new_folder(remote_dir_path):
    print(f"❌ 创建Alist目录失败: {remote_dir_path}")
    return False
    # 开始上传文件
    print(f"[☁️] 开始上传 {len(video_paths)} 个文件到Alist...")
    upload_complete_event = alist_uploader.upload_files(video_paths, remote_dir_path)
    # 等待所有上传完成
    print("等待所有文件上传完成...")
    upload_complete_event.wait()
    # 检查是否所有文件都上传成功
    success = alist_uploader.all_uploads_successful()
    if success:
    print(f"✅ 所有文件都成功上传到Alist: {remote_dir_path}")
    else:
    print(f"❌ 部分文件上传Alist失败,请检查日志")
    return success
    def upload_video(bvid, title):
    """上传指定视频并返回成功与否"""
    sanitized_folder = sanitize_filename(title)
    video_paths = find_video_files(sanitized_folder)
    if not video_paths:
    print(f"❌ 上传预处理失败: 在 '{sanitized_folder}' 目录中找不到视频文件。")
    return False
    get_upload_details_from_user()
    print(f"\n[🚀] 正在上传: {title} (共 {len(video_paths)} 个分P)")
    command = [
    './biliup', 'upload',
    '--copyright', '1',
    '--line', 'txa',
    '--limit', '20',
    '--title', title,
    '--tid', upload_details['tid'],
    '--desc', upload_details['desc'],
    '--tag', upload_details['tags'],
    ]
    command.extend(video_paths)
    try:
    subprocess.run(command, check=True, text=True)
    print(f"✅ 上传成功: {title}")
    return True
    except subprocess.CalledProcessError:
    print(f"❌ 上传失败: {title}")
    return False
    except FileNotFoundError:
    print(f"❌ 命令执行失败: 找不到 './biliup' 可执行文件。")
    return False
    def process_video(bvid, title, download_first=True):
    """完整的处理流程:下载 -> 上传到Alist -> 上传到B站 -> 清理 -> 记录"""
    if download_first:
    if not download_video(bvid, title):
    update_log_files(bvid, 'fail')
    return
    # 获取下载的视频文件路径
    sanitized_folder = sanitize_filename(title)
    video_paths = find_video_files(sanitized_folder)
    # 上传到Alist(可选步骤,失败不影响后续B站上传)
    # ❗如果不需要上传到openlist,请把下面三行注释掉
    print("\n[☁️] 开始上传到Alist...")
    alist_success = upload_to_alist(title, video_paths)
    print(f"[☁️] Alist上传{'成功' if alist_success else '失败'}")
    # 上传到B站
    if upload_video(bvid, title):
    update_log_files(bvid, 'success')
    # 上传成功后,删除文件夹
    try:
    shutil.rmtree(sanitized_folder)
    print(f"🗑️ 已清理文件夹: {sanitized_folder}")
    except OSError as e:
    print(f"⚠️ 清理文件夹失败: {sanitized_folder}, 错误: {e}")
    else:
    update_log_files(bvid, 'fail')
    def main():
    """主执行函数"""
    check_executables()
    try:
    videos_data = json.loads(JSON_DATA)
    video_map = {v['bvid']: v for v in videos_data}
    except json.JSONDecodeError:
    print("❌ 错误: JSON 数据格式不正确。")
    sys.exit(1)
    successful_bvids = read_log_file(SUCCESS_LOG_FILE)
    failed_bvids = read_log_file(FAIL_LOG_FILE)
    processed_bvids = set()
    if failed_bvids:
    print("\n--- 检查上次失败的任务 ---")
    for bvid in list(failed_bvids):
    if bvid not in video_map:
    print(f"ℹ️ 失败列表中的 {bvid} 在当前JSON数据中不存在,已跳过。")
    continue
    video_info = video_map[bvid]
    title = video_info['title']
    sanitized_folder = sanitize_filename(title)
    print(f"\n[🔄] 正在重试失败任务: {title} (BVID: {bvid})")
    if os.path.isdir(sanitized_folder):
    print(" └── 检测到已下载的文件夹,将直接尝试上传。")
    process_video(bvid, title, download_first=False)
    else:
    print(" └── 未检测到文件夹,将重新下载并上传。")
    process_video(bvid, title, download_first=True)
    processed_bvids.add(bvid)
    print("\n--- 开始处理新的任务列表 ---")
    for video in videos_data:
    bvid = video.get("bvid")
    title = video.get("title")
    if not bvid or not title:
    continue
    if bvid in processed_bvids:
    print(f"ℹ️ {bvid} 已在失败重试阶段处理过,跳过。")
    continue
    if bvid in successful_bvids:
    print(f"ℹ️ {title} (BVID: {bvid}) 已在之前成功上传,跳过。")
    continue
    process_video(bvid, title, download_first=True)
    print("\n\n🎉🎉🎉 所有任务处理完毕!🎉🎉🎉")
    final_fails = read_log_file(FAIL_LOG_FILE)
    if final_fails:
    print(f"\n⚠️ 注意: 仍有 {len(final_fails)} 个任务失败,请检查 '{FAIL_LOG_FILE}' 文件。")
    if __name__ == '__main__':
    main()

    注意代码里的❗符号,需要修改或确认自己需要不需要

  2. 创建alist.py文件

    Terminal window
    nano alist.py
    import requests
    import hashlib
    import json
    import urllib.parse
    import os
    import time
    import threading
    import queue
    import re
    from datetime import datetime
    from tqdm import tqdm
    import subprocess
    class AlistUploader:
    def __init__(self, base_url, username, password):
    self.base_url = base_url
    self.username = username
    self.password = password
    self.token = None
    self.upload_queue = queue.Queue()
    self.upload_threads = []
    self.max_threads = 6 # 最大并发上传线程数
    self.upload_results = {} # 存储上传结果
    self.upload_complete_event = threading.Event()
    def get_token(self):
    """获取 Alist 的 token"""
    if self.token:
    return self.token
    url = f"{self.base_url}/api/auth/login/hash"
    password_with_suffix = f"{self.password}-https://github.com/alist-org/alist"
    hashed_password = hashlib.sha256(password_with_suffix.encode()).hexdigest()
    payload = {
    "username": self.username,
    "password": hashed_password
    }
    headers = {"Content-Type": "application/json"}
    response = requests.post(url, headers=headers, data=json.dumps(payload))
    if response.status_code == 200:
    result = response.json()
    if result["code"] == 200 and result["message"] == "success":
    self.token = result["data"]["token"]
    return self.token
    # print(f"获取 token 失败: {response.text}")
    tqdm.write(f"获取 token 失败: {response.text}")
    return None
    def create_new_folder(self, folder_path):
    """创建文件夹"""
    url = f"{self.base_url}/api/fs/mkdir"
    token = self.get_token()
    if not token:
    return False
    headers = {
    "Authorization": token,
    "Content-Type": "application/json"
    }
    payload = {
    "path": folder_path
    }
    response = requests.post(url, headers=headers, json=payload)
    if response.status_code == 200:
    result = response.json()
    if result["code"] == 200 and result["message"] == "success":
    # print(f"新建文件夹成功: {folder_path}")
    tqdm.write(f"新建文件夹成功: {folder_path}")
    return True
    else:
    # print(f"新建文件夹失败: {result['message']}")
    tqdm.write(f"新建文件夹失败: {result['message']}")
    return False
    else:
    # print(f"新建文件夹请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
    tqdm.write(f"新建文件夹请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
    return False
    def extract_date_user_from_title(self, title):
    """从标题中提取日期和用户名"""
    # 提取用户名
    user_match = re.search(r'【弹幕】(.*?)直播回放', title)
    user = user_match.group(1) if user_match else "未知用户"
    # 提取日期
    date_match = re.search(r'(\d{4})(\d{2})(\d{2})', title)
    if date_match:
    year = date_match.group(1)
    month = date_match.group(2)
    day = date_match.group(3)
    else:
    # 尝试其他日期格式
    date_match = re.search(r'(\d{4})[-.年](\d{1,2})[-.月](\d{1,2})', title)
    if date_match:
    year = date_match.group(1)
    month = date_match.group(2).zfill(2)
    day = date_match.group(3).zfill(2)
    else:
    # 使用当前日期
    now = datetime.now()
    year = str(now.year)
    month = str(now.month).zfill(2)
    day = str(now.day).zfill(2)
    return user, year, month, day
    def generate_folder_path(self, title):
    """生成文件夹路径: /115/录播/{{user}}/{{yyyy}}年/{{MM}}-{{dd}}"""
    user, year, month, day = self.extract_date_user_from_title(title)
    return f"/115/录播/{user}/{year}年/{month}-{day}"
    def upload_file(self, local_file_path, remote_dir_path, as_task=True, retry=0):
    """
    上传单个文件到 Alist,通过调用系统 curl 命令实现。
    避免 Python 内存占用,并为并行上传做准备(并行性由调用方管理)。
    """
    token = self.get_token()
    if not token:
    tqdm.write("无法获取 AList 访问令牌。")
    return False
    file_name = os.path.basename(local_file_path)
    # 确保远程目录路径以 / 结尾,或者不以 / 结尾,然后拼接文件名为 /
    # Alist 的 File-Path 头部需要一个完整的远程文件路径,例如 /path/to/dir/file.txt
    remote_file_path = f"{remote_dir_path.rstrip('/')}/{file_name}"
    url = f"{self.base_url}/api/fs/form"
    # 构建 curl 命令
    cmd_parts = [
    "curl",
    "-X", "PUT", # HTTP 方法为 PUT
    "--http1.1",
    "--retry", "3", # 失败自动重试3次
    "--retry-delay", "5", # 重试间隔5秒
    "-H", f"Authorization: {token}",
    # File-Path 头部的值需要 URL 编码
    "-H", f"File-Path: {urllib.parse.quote(remote_file_path)}",
    ]
    if as_task:
    cmd_parts.extend(["-H", "As-Task: true"])
    cmd_parts.extend(["-s", "--show-error", "--fail"])
    cmd_parts.extend(["-F", f"file=@{local_file_path}"])
    # 添加 URL
    cmd_parts.append(url)
    # 可选:为了调试,你可以添加 -v (verbose) 或 --progress-bar
    # cmd_parts.append("-v") # 输出更详细的请求/响应过程
    # cmd_parts.append("--progress-bar") # 显示进度条
    try:
    curl_response = subprocess.run(
    cmd_parts,
    capture_output=True, # 捕获 stdout 和 stderr
    text=True,
    check=False, # 不在非零退出码时抛出 CalledProcessError
    encoding='utf-8',
    )
    # 优先解析 AList 的 JSON 响应
    result = None
    try:
    if curl_response.stdout: # 只有当 stdout 不为空时才尝试解析
    result = json.loads(curl_response.stdout)
    except json.JSONDecodeError:
    # 如果 JSON 解析失败,则这是一个错误情况
    error_message = f"无法解析 AList 响应 JSON: {curl_response.stdout.strip() if curl_response.stdout else '无输出'}"
    if curl_response.stderr:
    error_message += f"\ncURL 错误输出: {curl_response.stderr.strip()}"
    if retry < 3:
    tqdm.write(f"上传 {file_name} 失败: {error_message},正在重试...")
    return self.upload_file(local_file_path, remote_dir_path, as_task, retry + 1)
    else:
    tqdm.write(f"上传 {file_name} 最终失败: {error_message}")
    return False
    # 根据 AList 的 JSON 响应判断成功或失败
    if result and result.get("code") == 200 and result.get("message") == "success":
    # AList 明确表示成功,即使 curl 退出码非零 (如 92),也视为成功
    if as_task and 'task' in result.get("data", {}):
    task_id = result["data"]["task"]["id"]
    # tqdm.write(f"文件 {file_name} 上传任务已创建,任务ID: {task_id}")
    return self.monitor_upload_task(task_id, local_file_path, remote_dir_path)
    # tqdm.write(f"文件 {file_name} 上传成功。")
    return True
    else:
    # AList 返回了错误代码或消息,或者没有返回有效的 JSON (已在上层捕获)
    alist_error_message = result.get("message", "未知错误或无AList响应") if result else "无AList响应或解析失败"
    # 组合 AList 的错误信息和 curl 的退出码/错误输出,用于日志
    full_error_detail = f"AList错误: {alist_error_message}. "
    if curl_response.returncode != 0:
    full_error_detail += f"curl退出码: {curl_response.returncode}. "
    if curl_response.stderr:
    full_error_detail += f"cURL错误输出: {curl_response.stderr.strip()}."
    if retry < 3:
    tqdm.write(f"上传 {file_name} 失败: {full_error_detail},正在重试...")
    return self.upload_file(local_file_path, remote_dir_path, as_task, retry + 1)
    else:
    tqdm.write(f"上传 {file_name} 最终失败: {full_error_detail}")
    return False
    except FileNotFoundError:
    # curl 命令本身没有找到
    tqdm.write(f"错误: curl 命令未找到。请确保 curl 已安装并配置在系统 PATH 中。")
    return False
    except Exception as e:
    # 捕获其他任何意外的Python异常
    if retry < 3:
    tqdm.write(f"上传文件 {file_name} 时出错: {str(e)},正在重试...")
    return self.upload_file(local_file_path, remote_dir_path, as_task, retry + 1)
    else:
    tqdm.write(f"上传文件 {file_name} 时出错: {str(e)}")
    return False
    def get_task_info(self, task_id):
    """获取指定任务的信息"""
    token = self.get_token()
    if not token:
    return None
    url = f"{self.base_url}/api/task/upload/info"
    params = {"tid": task_id}
    headers = {"Authorization": token}
    response = requests.post(url, headers=headers, params=params)
    if response.status_code == 200:
    result = response.json()
    if result["code"] == 200 and result["message"] == "success":
    return result["data"]
    else:
    # print(f"获取任务信息失败: {result['message']}")
    tqdm.write(f"获取任务信息失败: {result['message']}")
    return None
    else:
    tqdm.write(f"获取任务信息失败,状态码: {response.status_code}, 响应内容: {response.text}")
    return None
    def monitor_upload_task(self, task_id, local_file_path=None, remote_dir_path=None, poll_interval=2, timeout=7200, retry=0):
    """
    监控任务进度,直到任务完成,失败时支持重试
    参数:
    task_id: 任务ID
    local_file_path: 本地文件路径,用于重试
    remote_dir_path: 远程目录路径,用于重试
    poll_interval: 轮询间隔(秒)
    timeout: 超时时间(秒)
    retry: 剩余重试次数
    返回:
    成功时返回任务信息,失败且重试次数用完时返回False
    """
    from tqdm import tqdm
    start_time = time.time()
    pbar = None
    try:
    while time.time() - start_time < timeout:
    task_info = self.get_task_info(task_id)
    if not task_info:
    time.sleep(poll_interval)
    continue
    # 获取进度
    progress = task_info.get('progress', 0)
    state = task_info.get('state', '')
    # 初始化进度条
    if pbar is None:
    # pbar = tqdm(total=100, desc=f"上传任务 {task_id}", unit="%")
    pbar = tqdm(total=100, desc=f"{local_file_path.split('/')[-1]} 上传中", unit="%")
    # 更新进度条
    pbar.n = int(progress * 100)
    pbar.refresh()
    # 检查任务是否完成
    state = task_info.get('state', '')
    error = task_info.get('error', '')
    if state == 2 and progress >= 1.0:
    # tqdm.write(f"任务完成!总共传输: {task_info.get('total_bytes', 0)} 字节")
    return True
    elif state == 2 and progress == 0:
    # print("任务已完成,秒传成功")
    # tqdm.write("任务已完成,秒传成功")
    return True
    elif error:
    retry += 1
    # 如果有错误且有重试次数,尝试重新上传
    if retry < 4 and local_file_path and remote_dir_path and os.path.exists(local_file_path):
    pbar.close()
    pbar = None
    # print(f"任务失败: {error},剩余重试次数: {4 - retry}")
    tqdm.write(f"任务失败: {error},剩余重试次数: {4 - retry}")
    # 重新上传文件
    file_name = os.path.basename(local_file_path)
    remote_file_path = f"{remote_dir_path}/{file_name}"
    url = f"{self.base_url}/api/fs/form"
    headers = {
    "Authorization": self.get_token(),
    "File-Path": urllib.parse.quote(remote_file_path),
    "As-Task": "true",
    "Content-Length": str(os.path.getsize(local_file_path))
    }
    with open(local_file_path, 'rb') as file:
    # 核心修改:重试时也使用 data 参数进行流式上传
    response = requests.put(url, headers=headers, data=file)
    if response.status_code == 200:
    result = response.json()
    if result["code"] == 200 and result["message"] == "success" and 'task' in result["data"]:
    new_task_id = result["data"]["task"]["id"]
    tqdm.write(f"重新创建上传任务,新任务ID: {new_task_id}")
    # 递归调用自身监控新任务,减少重试次数
    return self.monitor_upload_task(
    new_task_id,
    local_file_path,
    remote_dir_path,
    poll_interval,
    timeout - (time.time() - start_time), # 剩余超时时间
    retry - 1
    )
    elif retry >= 4:
    tqdm.write(f"任务失败: {error},所有重试次数已用完")
    return False
    else:
    tqdm.write(f"任务失败: {error},无法重试")
    return False
    time.sleep(poll_interval)
    tqdm.write(f"\n监控超时,已经等待 {timeout} 秒")
    return False
    finally:
    # 确保进度条被关闭
    if pbar is not None:
    pbar.close()
    def _worker(self):
    """工作线程函数,处理上传队列中的任务"""
    while True:
    try:
    file_info = self.upload_queue.get(block=False)
    if file_info is None: # 结束信号
    self.upload_queue.task_done()
    break
    local_file_path, remote_dir_path = file_info
    file_name = os.path.basename(local_file_path)
    # tqdm.write(f"开始上传文件: {file_name} 到 {remote_dir_path}")
    success = self.upload_file(local_file_path, remote_dir_path)
    # 记录上传结果
    self.upload_results[local_file_path] = success
    # tqdm.write(f"文件 {file_name} {'上传成功' if success else '上传失败'}")
    self.upload_queue.task_done()
    except queue.Empty:
    # 队列为空,检查是否所有任务已完成
    if self.upload_queue.unfinished_tasks == 0:
    break
    time.sleep(0.5)
    except Exception as e:
    tqdm.write(f"上传线程出错: {str(e)}")
    self.upload_queue.task_done()
    def upload_files(self, file_paths, remote_dir_path):
    """
    异步上传多个文件到 Alist 的指定目录
    参数:
    file_paths: 本地文件路径列表
    remote_dir_path: 远程目录路径
    返回:
    一个事件对象,可以用于等待上传完成
    """
    # 清空之前的结果和事件状态
    self.upload_results = {}
    self.upload_complete_event.clear()
    # 将所有文件加入上传队列
    for file_path in file_paths:
    self.upload_queue.put((file_path, remote_dir_path))
    # 启动工作线程
    self.upload_threads = []
    for _ in range(min(self.max_threads, len(file_paths))):
    thread = threading.Thread(target=self._worker)
    thread.daemon = True
    thread.start()
    self.upload_threads.append(thread)
    # 启动监控线程,等待所有上传完成
    monitor_thread = threading.Thread(target=self._monitor_upload_completion)
    monitor_thread.daemon = True
    monitor_thread.start()
    return self.upload_complete_event
    def _monitor_upload_completion(self):
    """监控线程,等待所有上传任务完成"""
    for thread in self.upload_threads:
    thread.join()
    self.upload_complete_event.set()
    # 汇总上传结果
    total_files = len(self.upload_results)
    success_count = sum(1 for success in self.upload_results.values() if success)
    tqdm.write(f"\n上传完成: {success_count}/{total_files} 文件上传成功")
    def all_uploads_successful(self):
    """检查所有文件是否都上传成功"""
    if not self.upload_results:
    return False
    return all(self.upload_results.values())
    # 测试代码
    if __name__ == "__main__":
    uploader = AlistUploader(
    base_url="http://xxxxx",
    username="xxxxx",
    password="xxxxx"
    )
    # 测试创建文件夹
    test_folder = "/115/录播/测试用户/2025年/05-31"
    uploader.create_new_folder(test_folder)
    # 测试上传文件
    test_files = ["/config/Desktop/bilibili.tar.gz"]
    event = uploader.upload_files(test_files, test_folder)
    # 等待上传完成
    event.wait()
    tqdm.write(f"所有文件上传完成,全部成功: {uploader.all_uploads_successful()}")

配置Docker容器#

  1. 创建Dockerfiledocker-compose.yaml
    cd .. && nano Dockerfile && nano docker-compose.yaml`
    Dockerfile
    FROM python:3.10-bookworm
    WORKDIR /app
    # 安装aria2和对应python包
    RUN apt update && \
    apt install -y \
    aria2 && \
    apt clean && \
    rm -rf /var/lib/apt/lists/* && \
    pip install --no-cache-dir tqdm requests
    CMD ["python3", "bilivideosync.py"]
    docker-compose.yaml
    services:
    bilivideosync:
    build: .
    volumes:
    - ./data:/app
    tty: true # 必须保留,否则第一个视频下载并同步到网盘结束后,无法输入所需的上传到B站的分区ID,视频简介和视频标签
    stdin_open: true # 必须保留,否则第一个视频下载并同步到网盘结束后,无法输入所需的上传到B站的分区ID,视频简介和视频标签
    network_mode: bridge
  2. 启动容器docker compose up -d
  3. 启动容器后,输入docker compose attach --sig-proxy=false bilivideosync进入容器内交互式终端,等待输入分区ID的提示出现,分别填写完分区ID、视频简介和视频Tags后,即可退出,后续会复用第一次填写的信息
最后更新于 2025-07-13,距今已过 126 天

部分内容可能已过时

评论区

目录