3575 字
18 分钟
Vapoursynth视频压制笔记
本文主要内容本文主要介绍如何使用Vapoursynth进行视频压制的相关笔记和备忘,以及如何在Linux上压制视频的一些经验分享。
说来也在猫站(PTer)压制组呆了一年多了,惭愧的是确实没学到什么高级技巧,目前也只会压制一些简单的x264视频,所以本文也只是记录一些常用的命令和脚本,方便以后查阅。同时顺便记录下我是如何折腾在Linux上Build一个docker环境来压制视频的。这么折腾是因为部分软件只能在Windows上运行,为了顺利在Linux上运行,也是废了一番功夫。
Docker环境搭建
待更新…
脚本类
自动生成不同视频源的对比代码脚本
脚本功能简介该脚本会扫描指定目录下的所有子文件夹,查找主视频文件(支持BDMV/MKV/MP4格式),并生成相应的VapourSynth代码片段,追加到指定的
.vpy文件末尾。支持BDMV结构的智能识别和递归查找其他视频格式。手动查找每个源最大m2ts再复制地址再写到代码里,这种手动行为是非常不健康的!!Terminal window python3 compare_code_gen.py -husage: compare_code_gen.py [-h] compare_vpy_file source_directory自动扫描指定目录下的所有子文件夹,查找主视频文件(BDMV/MKV/MP4),并生成VapourSynth代码追加到.vpy文件中。positional arguments:compare_vpy_file 要修改的目标 .vpy 文件路径source_directory 包含多个BD或Remux子文件夹的源目录 (e.g., 'silver/bd')options:-h, --help show this help message and exit
import argparseimport jsonfrom pathlib import Pathimport os
def find_bd_root(parent_path: Path) -> Path | None: """ 在给定的父路径下,查找包含 BDMV 文件夹的子目录(即蓝光根目录)。 只查找一层深度。 """ if not parent_path.is_dir(): return None
for item in parent_path.iterdir(): if item.is_dir(): if (item / "BDMV").is_dir(): return item elif (item.name in ["BDMV", "bdmv"]) and (item.is_dir()): return parent_path
return None
def find_largest_m2ts(bd_root_path: Path) -> Path | None: """ 在给定的 BD 根目录下,查找 BDMV/STREAM/ 文件夹中最大的 .m2ts 文件。 """ stream_dir = bd_root_path / "BDMV" / "STREAM"
if not stream_dir.is_dir(): return None m2ts_files = list(stream_dir.glob("*.m2ts"))
if not m2ts_files: print(f" -> 警告: 在 '{stream_dir}' 中未找到任何 .m2ts 文件。") return None
return max(m2ts_files, key=lambda p: p.stat().st_size)
def find_main_video_file(parent_path: Path) -> Path | None: """ 智能地在给定目录中查找主视频文件。 优先级: BDMV -> MKV (递归) -> MP4 (递归) """ # 策略 1: 查找 BDMV 结构 (非递归) bd_root = find_bd_root(parent_path) if bd_root: print(f" -> 检测到 BDMV 结构,位于: {bd_root}") return find_largest_m2ts(bd_root) # 策略 2: 按优先级递归查找其他容器格式 print(" -> 未检测到 BDMV 结构,开始递归查找其他视频文件...") for ext in ["mkv", "mp4"]: print(f" -> 正在递归搜索 .{ext} 文件...") files = list(parent_path.glob(f"**/*.{ext}"))
if files: largest_file = max(files, key=lambda p: p.stat().st_size) relative_path = largest_file.relative_to(parent_path) print(f" -> 找到最大的 {ext.upper()} 文件: {relative_path}") return largest_file else: print(f" -> 目录中未找到任何 .{ext} 文件。")
return None
def generate_vpy_snippet(var_name: str, video_path: Path) -> tuple[str, str]: """ 根据变量名和视频文件路径生成 VapourSynth 代码片段。 返回 (代码片段, 使用的安全变量名)。 """ absolute_path = os.path.abspath(video_path) # 确保变量名在VapourSynth中有效 safe_var_name = var_name.replace('-', '_').replace('.', '_')
source_line = f'{safe_var_name} = core.lsmas.LWLibavSource(r"{absolute_path}")' fps_line = f'{safe_var_name} = core.std.AssumeFPS({safe_var_name}, fpsnum=24000, fpsden=1001)' depth_line = f'{safe_var_name} = mvf.Depth({safe_var_name}, 16)'
snippet = f"{source_line}\n{fps_line}\n{depth_line}" return snippet, safe_var_name
def main(): """主执行函数""" parser = argparse.ArgumentParser( description="自动扫描指定目录下的所有子文件夹,查找主视频文件(BDMV/MKV/MP4),并生成VapourSynth代码追加到.vpy文件中。" ) parser.add_argument("compare_vpy_file", help="要修改的目标 .vpy 文件路径") parser.add_argument( "source_directory", help="包含多个BD或Remux子文件夹的源目录 (e.g., 'silver/bd')" ) args = parser.parse_args() vpy_file = Path(args.compare_vpy_file) if not vpy_file.exists(): print(f"信息: vpy 文件 '{vpy_file}' 不存在,将自动创建。") vpy_file.touch() source_dir = Path(args.source_directory) if not source_dir.is_dir(): print(f"错误: 源目录 '{source_dir}' 不存在或不是一个目录。") return sub_directories = sorted([d for d in source_dir.iterdir() if d.is_dir()])
print(f"将在 '{source_dir}' 目录下扫描 {len(sub_directories)} 个子文件夹...") generated_snippets = [] successful_vars = [] # <--- 新增:用于存储成功生成的安全变量名
for sub_dir in sub_directories: print(f"\n--- 正在处理: {sub_dir} ---")
main_video = find_main_video_file(sub_dir)
if main_video: var_name = sub_dir.name.upper()
# 生成代码片段,并获取使用到的安全变量名 snippet, safe_name = generate_vpy_snippet(var_name, main_video)
print(f"最终找到的主视频文件: {main_video}") print(f"将使用变量名: {safe_name}")
generated_snippets.append(snippet) successful_vars.append(safe_name) # <--- 新增:记录变量名 else: print(f"警告: 在 '{sub_dir}' 下未找到支持的视频文件 (BDMV/MKV/MP4),已跳过。")
if not generated_snippets: print("\n没有生成任何代码片段,vpy 文件未被修改。") return
# --- 新增:生成 FrameInfo 和 set_output 代码块 --- frame_info_lines = [] set_output_lines = [] for i, var in enumerate(successful_vars): frame_info_lines.append(f"{var} = awf.FrameInfo({var}, '{var}')") set_output_lines.append(f"{var}.set_output({i})")
all_new_code = "\n\n".join(generated_snippets) frame_info_code = "\n".join(frame_info_lines) set_output_code = "\n".join(set_output_lines) # --- 结束新增部分 ---
try: fixed_import = """import vapoursynth as vsimport awsmfunc as awfimport kagefunc as kgfimport mvsfunc as mvfimport adptvgrnModfrom rekt import rektlvlsfrom vsutil import get_yfrom screenshot import GetSnapShot, upload_to_lsky_albumcore = vs.core\n"""
original_content = "" # 只有在文件存在且不为空时才读取 if vpy_file.exists() and vpy_file.stat().st_size > 0: original_content = vpy_file.read_text(encoding='utf-8') # 如果原始文件已有import,避免重复添加 if original_content.strip().startswith("import vapoursynth"): fixed_import = ""
# 拼接所有部分 new_content = ( fixed_import + original_content.strip() + "\n\n# --- 自动生成的源滤镜代码 ---\n" + all_new_code + "\n\n# --- 自动生成的 FrameInfo 和 Output ---\n" + frame_info_code + "\n\n" + set_output_code + "\n\n# --- 自动生成的代码结束 ---\n" )
vpy_file.write_text(new_content, encoding='utf-8') print(f"\n成功!已将生成的代码追加到 '{vpy_file}' 的末尾。")
except IOError as e: print(f"错误: 读写文件 '{vpy_file}' 时发生错误: {e}") except Exception as e: print(f"发生未知错误: {e}")
if __name__ == "__main__": main()x264自动压制脚本
脚本功能简介该脚本使用VapourSynth的vspipe和x264进行视频压制,支持多CRF值和Zone设置。自动输入log文件、压制多个crf时合并log为同一个文件
Terminal window python3 x264_code_generation.py -husage: x264_code_generation.py [-h] vpy_file crf_value zone total_frames output_path视频处理脚本positional arguments:vpy_file 输入的 vpy 文件路径crf_value CRF 值 (JSON 格式的数组, e.g., '[18, 18.5]')zone Zone 值 (JSON 格式的数组, e.g., '[[103,543], [1000,2000]]')total_frames 视频总帧数output_path 输出文件目录options:-h, --help show this help message and exit
import argparseimport jsonfrom pathlib import Pathimport osimport subprocessimport sysimport time
def main(vpy_file, crf_value, zone, total_frames, output_path): start_time = time.time() # ... (命令定义部分保持不变) ... vspipe_cmd = ['vspipe', '-c', 'y4m', vpy_file, '-'] x264_cmd = [ 'x264', '--output-depth', '8', '--crf', str(crf_value), '--preset', 'veryslow', '--profile', 'high', '--level', '4.1', '--vbv-bufsize', '78125', '--vbv-maxrate', '62500', '--merange', '32', '--bframes', '16', '--deblock', '-3:-3', '--no-fast-pskip', '--rc-lookahead', '250', '--qcomp', '0.60', '--psy-rd', '1.0:0.00', '--aq-mode', '3', '--aq-strength', '0.75', '--me', 'umh', '--b-adapt', '2', '--direct', 'auto', '--subme', '11', '--trellis', '2', '--no-dct-decimate', '--no-mbtree', '--colormatrix', 'bt709', '--colorprim', 'bt709', '--transfer', 'bt709', '--ipratio', '1.30', '--pbratio', '1.20', '--output', output_path, '--frames', str(total_frames), '--demuxer', 'y4m', '-' ] if zone: tmp = [] for z in zone: tmp.append(f"{z[0]},{z[1]},b=1.3") output_index = x264_cmd.index('--output') x264_cmd.insert(output_index, '--zones') x264_cmd.insert(output_index + 1, '/'.join(tmp))
# --- 优化后的执行与日志记录模块 --- log_file_path = Path(output_path).with_suffix('.txt') full_command_str = ' '.join(vspipe_cmd) + ' | ' + ' '.join(x264_cmd) print(f'--------------------------- CRF {crf_value} begin ---------------------------') print(f"执行命令: {full_command_str}") print(f"输出文件: {output_path}") print(f"日志文件: {log_file_path}")
try: with open(log_file_path, 'w', encoding='utf-8') as log_file: # --- 新增功能:将命令写入日志文件顶部 --- # log_file.write(f"--- Command for CRF {crf_value} ---\n") log_file.write(full_command_str + "\n") # log_file.write("--- Log Output ---\n") # --- 新增功能结束 ---
vspipe_process = subprocess.Popen(vspipe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) x264_process = subprocess.Popen( x264_cmd, stdin=vspipe_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace' ) vspipe_process.stdout.close()
for line in iter(x264_process.stdout.readline, ''): # **新的、更可靠的进度行判断逻辑** # 进度行通常以 '[' 开头,并包含 'fps' 和 'eta' line_stripped = line.strip() if line_stripped.startswith('[') and 'fps' in line_stripped and 'eta' in line_stripped: # 这是进度行,我们在控制台动态刷新它 sys.stdout.write(" " * 80 + "\r") # 清除旧行内容 sys.stdout.write(line_stripped + '\r') sys.stdout.flush() # 并且,我们不把它写入日志文件 else: # 这是永久信息行 (如 x264 [info] 或最终总结) # 在控制台正常打印 print(line_stripped) # 并把它写入日志文件 log_file.write(line)
print() # 进度条结束后换行,保持终端整洁
# 等待进程结束 vspipe_process.wait() x264_process.wait()
# **新的、更智能的错误处理逻辑** # 如果 x264 成功退出,那么 vspipe 的 SIGPIPE(-13) 错误是正常的,可以忽略 if x264_process.returncode == 0 and vspipe_process.returncode in [-13, 141]: # 141 is 128+13, another way SIGPIPE is reported print("VSPipe 进程被正常中止 (SIGPIPE),这是预期行为。") elif vspipe_process.returncode != 0: # 只有在 x264 也失败,或者 vspipe 报了其他错误时,才报告 vspipe 错误 vspipe_stderr = vspipe_process.stderr.read().decode('utf-8', errors='replace') error_message = f"VSPipe 错误 (返回码: {vspipe_process.returncode}):\n{vspipe_stderr}" print(error_message) log_file.write(f"\n\n--- VSPipe Error ---\n{error_message}\n")
if x264_process.returncode != 0: print(f"x264 编码失败,返回码: {x264_process.returncode}") else: print(f"nCRF {crf_value} 编码成功!")
except FileNotFoundError: print("错误: 'vspipe' 或 'x264' 命令未找到。请确保它们已安装并在系统 PATH 中。") except Exception as e: print(f"发生未知错误: {e}")
print(f'--------------------------- CRF {crf_value} End (Time: {time.time() - start_time:.2f} 秒) -----------------------------') print("\n")
# __main__ 部分保持不变if __name__ == "__main__": parser = argparse.ArgumentParser(description="视频处理脚本") # ... 其余部分完全相同 ... parser.add_argument("vpy_file", help="输入的 vpy 文件路径") parser.add_argument("crf_value", type=json.loads, help="CRF 值 (JSON 格式的数组, e.g., '[18, 18.5]')") parser.add_argument("zone", type=json.loads, help="Zone 值 (JSON 格式的数组, e.g., '[[103,543], [1000,2000]]')") parser.add_argument("total_frames", type=int, help="视频总帧数") parser.add_argument("output_path", help="输出文件目录") args = parser.parse_args()
vpy_path = Path(args.vpy_file) if not vpy_path.exists(): print(f"错误: vpy 文件不存在 -> {vpy_path}") exit(1) if not vpy_path.name.endswith(".vpy"): print("错误: 输入文件不是 vpy 文件") exit(1)
output_dir = Path(args.output_path) output_dir.mkdir(parents=True, exist_ok=True)
vpy_file_path = str(vpy_path.resolve()) output_dir_path = str(output_dir.resolve())
for i in args.crf_value: output_file = Path(output_dir_path) / f"crf{i}.mkv" if (output_file.exists()): # 删除已存在的文件和日志 os.remove(output_file) log_file = output_file.with_suffix('.txt') if log_file.exists(): os.remove(log_file) main(vpy_file_path, i, args.zone, args.total_frames, str(output_file))
# 把所有log文件合并成一个,之间添加三行换行 combined_log_path = output_dir / "combined_log.txt" if len(args.crf_value) > 0: with open(combined_log_path, 'a', encoding='utf-8') as combined_log: for i in args.crf_value: log_file = output_dir / f"crf{i}.txt" if log_file.exists(): with open(log_file, 'r', encoding='utf-8') as lf: combined_log.write(lf.read()) combined_log.write("\n\n\n") # 三行换行分隔 # 删除文件 if log_file.exists(): os.remove(log_file) print(f"所有日志已合并到 {combined_log_path}")AI修复字幕语法错误
脚本功能简介该脚本使用OpenAI的API对SRT字幕文件进行语法和拼写错误的修复,特别关注OCR过程中常见的字符混淆问题。支持并行处理多个字幕块以提高效率。
Terminal window python3 ai-fix-srt.py -husage: ai-fix-srt.py [-h] [-o OUTPUT_FILE] [-w WORKERS] input_fileCorrect spelling and grammar in an SRT file using OpenAI in parallel.positional arguments:input_file The path to the input SRT file.options:-h, --help show this help message and exit-o OUTPUT_FILE, --output_file OUTPUT_FILEThe path to save the corrected SRT file. Defaults to 'input_filename_corrected.srt'.-w WORKERS, --workers WORKERSNumber of parallel requests to send to OpenAI. Default: 50.
import osimport srtimport timeimport argparseimport concurrent.futuresfrom openai import OpenAIfrom pathlib import Pathfrom tqdm import tqdm
# --- 配置 ---# 建议使用环境变量,但也可以直接在此处设置 API 密钥API_KEY = "xxxxxx"# API_KEY = os.getenv("OPENAI_API_KEY")BASE_URL = 'https://api.openai.com/v1' # 如果使用自定义的 OpenAI 兼容 API,请修改此处MODEL_NAME = "gemini-2.5-pro"CHUNK_SIZE = 100 # 每次处理的字幕数量MAX_WORKERS = 50 # 并行请求的数量,可根据你的 API 速率限制调整SYSTEM_PROMPT = """You are an expert proofreader and editor specializing in UK English. Your task is to correct spelling, grammar, and punctuation errors in the provided SRT subtitle block that are ocr'd from a .sup file. Especially focus on `i`, `l`, 'I`, 'L` chaos.
Follow these rules STRICTLY:1. **Target Language**: All corrections must adhere to standard UK English (e.g., use 'colour', not 'color'; 'realise', not 'realize'; 'dialogue', not 'dialog').2. **Preserve Structure**: You MUST return the corrected text in the exact same SRT format. This includes preserving all subtitle index numbers, timestamps, formatting tag, and line breaks within the text. DO NOT alter, add, or remove any timestamps or index numbers.3. **Content Focus**: Only modify the subtitle text itself for corrections. If a line of text is already correct, leave it unchanged.4. **No Extra Commentary**: Do not add any explanations, apologies, or comments before or after the SRT block. Your entire response must be ONLY the corrected SRT data.5. Do not change any names, places, or specific terminology unless they are clearly misspelled.
Here is an example of the required input and output format:
--- EXAMPLE INPUT ---300:00:19,895 --> 00:00:22,273<i>The whole famlly's together.</i><i>Mom makes braciole.</i>
400:00:22,440 --> 00:00:25,276<i>Dad puts the jersey on.</i><i>We're aiI watchlng the game.</i>
--- EXAMPLE OUTPUT ---300:00:19,895 --> 00:00:22,273<i>The whole family's together.</i><i>Mom makes braciole.</i>
400:00:22,440 --> 00:00:25,276<i>Dad puts the jersey on.</i><i>We're all watching the game.</i>
"""
def correct_chunk_task(client: OpenAI, srt_chunk_string: str, chunk_index: int): """ 单个并行任务:调用 OpenAI API 校对一个 SRT 块。 返回一个元组 (chunk_index, corrected_srt_string)。 """ try: response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"Now, please process the following SRT block:\n\n{srt_chunk_string}"} ], temperature=0.1, ) corrected_text = response.choices[0].message.content.strip() return chunk_index, corrected_text except Exception as e: # 如果 API 调用失败,打印错误并返回 None,以便主线程可以处理回退 print(f"\nError processing chunk {chunk_index + 1}: {e}") return chunk_index, None
def process_srt_file(input_path, output_path, max_workers): """ 使用并行处理读取、校对并保存 SRT 文件。 """ if not API_KEY: print("Error: OPENAI_API_KEY environment variable not set.") return
try: with open(input_path, 'r', encoding='utf-8') as f: content = f.read() original_subs = list(srt.parse(content)) print(f"Successfully loaded {len(original_subs)} subtitles from '{input_path}'.") except Exception as e: print(f"Error reading or parsing SRT file: {e}") return
# 将字幕分割成块 chunks_of_subs = [original_subs[i:i + CHUNK_SIZE] for i in range(0, len(original_subs), CHUNK_SIZE)] total_chunks = len(chunks_of_subs) print(f"Divided into {total_chunks} chunks for processing with up to {max_workers} parallel workers.")
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
# 创建一个列表来按顺序存储结果 corrected_results = [None] * total_chunks
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_chunk = { executor.submit(correct_chunk_task, client, srt.compose(chunk), i): chunk for i, chunk in enumerate(chunks_of_subs) }
# 使用 tqdm 创建进度条,当任务完成时更新 progress_bar = tqdm(concurrent.futures.as_completed(future_to_chunk), total=total_chunks, desc="Correcting Chunks")
for future in progress_bar: original_chunk = future_to_chunk[future] try: index, corrected_srt_string = future.result()
if corrected_srt_string: # 验证返回的 SRT 是否有效 try: corrected_chunk_subs = list(srt.parse(corrected_srt_string)) if len(corrected_chunk_subs) != len(original_chunk): print(f"\nWarning: Mismatch in subtitle count for chunk {index + 1}. Using original.") corrected_results[index] = original_chunk # 回退到原始块 else: corrected_results[index] = corrected_chunk_subs except Exception as parse_error: print(f"\nError parsing corrected SRT for chunk {index + 1}: {parse_error}. Using original.") corrected_results[index] = original_chunk # 回退到原始块 else: # API 调用失败,使用原始块 corrected_results[index] = original_chunk
except Exception as exc: # 任务本身抛出异常 print(f"\nA task generated an exception: {exc}. The original chunk will be used.") # 找到这个异常 future 对应的索引并回退 for f, c in future_to_chunk.items(): if f == future: # 这是个笨办法,但能用。更好的方法是在提交时就把索引和future关联起来 # (我们已经在 correct_chunk_task 返回值里做了,这里是双保险) for i, chunk_list in enumerate(chunks_of_subs): if chunk_list == c: corrected_results[i] = chunk_list break break
# 组合所有处理过的块 final_subs = [] for chunk in corrected_results: if chunk: final_subs.extend(chunk)
try: final_srt_content = srt.compose(final_subs) with open(output_path, 'w', encoding='utf-8') as f: f.write(final_srt_content) print(f"\nProcessing complete. Corrected subtitles saved to '{output_path}'.") except Exception as e: print(f"\nError writing the final SRT file: {e}")
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Correct spelling and grammar in an SRT file using OpenAI in parallel.") parser.add_argument("input_file", type=str, help="The path to the input SRT file.") parser.add_argument("-o", "--output_file", type=str, help="The path to save the corrected SRT file. Defaults to 'input_filename_corrected.srt'.") parser.add_argument("-w", "--workers", type=int, default=MAX_WORKERS, help=f"Number of parallel requests to send to OpenAI. Default: {MAX_WORKERS}.")
args = parser.parse_args()
input_path = Path(args.input_file) if not input_path.is_file(): print(f"Error: Input file not found at '{input_path}'") else: if args.output_file: output_path = Path(args.output_file) else: output_path = input_path.with_name(f"{input_path.stem}_corrected{input_path.suffix}")
process_srt_file(str(input_path), str(output_path), args.workers)自动检测Band/Block脚本
脚本功能简介该脚本使用awsmfunc库中的banddtct函数自动检测视频中的Band/Block区域,方便后续进行针对性处理。这只是一个实例脚本,具体参数和预处理步骤请根据实际视频源进行调整。识别完成后,会自动在视频文件位置生成banding.txt文件,记录检测到的band/block区域信息。
import vapoursynth as vsimport awsmfunc as awfimport kagefunc as kgfimport mvsfunc as mvfimport adptvgrnModfrom rekt import rektlvlsfrom vsutil import get_yfrom screenshot import GetSnapShotcore = vs.core
source = core.lsmas.LWLibavSource('/demux/complete/Return/extract/video.h264').std.Crop(0, 0, 20, 20)source = awf.fb(source, top=1, bottom=1, mode='fillmargins')source = awf.zresize(source, top=1, bottom=1)source = core.std.AssumeFPS(source, fpsnum=24000, fpsden=1001)source = mvf.Depth(source, 16)
# src.set_output()awf.banddtct(source,thr=100,min_zone_len=20) Vapoursynth视频压制笔记
https://blog.useforall.com/posts/vapoursynth-video-encoding-notes/ 最后更新于 2025-11-16
Lim's Blog