Hexo 工作流优化


Hexo 工作流优化

原始工作流

使用 Hexo 写博客的典型流程是:

  1. source/_posts 目录下新建 .md 文件,编写文章
  2. 执行 hexo clean && hexo generate 生成静态页面
  3. 执行 hexo server 本地预览
  4. 确认无误后 hexo deploy 部署

这套流程本身不复杂,但有几个痛点:

笔记与博客分离。 我习惯把所有笔记统一存放在一个仓库里,其中只有部分值得发布的才需要同步到 Hexo。每次发布前,需要手动把文章复制到 _posts 目录,回头还要记哪些已复制、哪些没复制。

多条命令反复敲。 hexo cleanhexo ghexo s 每次都要手动输入,频繁切换。

依赖不可见。 这个是我自己写的 python 脚本中用到 PyYAML 等第三方库,配置环境时需要手动 pip install,换了电脑就得重新查一遍。


优化后的工作流

写了一套脚本解决这几个问题。现在的流程:

写笔记 → 加 publish: true → 双击 publish.bat → 选择操作 → 完成

publish.bat 主菜单

目录结构:

D:\xxx\                           # 笔记仓库
├── publish.bat                   # 选择性发布入口(调用 selective_publish.py)
├── hexo.bat                      # Hexo 操作助手(clean / generate / server)
├── install_deps.bat              # Python 依赖安装助手
├── Scripts\
│   ├── config.ini                # 统一配置文件(所有脚本共用)
│   ├── selective_publish.py      # 选择性发布脚本(核心引擎)
│   ├── install_dependencies.py   # 依赖扫描安装工具
│   └── requirements.txt          # 由 install_dependencies.py -r 自动生成

1. 选择性发布(核心引擎)

selective_publish.py 扫描笔记仓库中的所有 .md 文件,解析 front-matter,把 publish: true 的文章复制到 Hexo 的 _posts 目录。

几个处理细节:

  • 智能跳过:逐字节比较文件内容,没改过的不会再复制一遍
  • 模板兼容:如果你有模板文件,可能会用到 {{ }} ,所以需要把 {{ }} 模板标签替换成安全占位符,避免 YAML 解析崩掉
  • 分类报告:用颜色区分不同状态,绿色是新发布的,青色是覆盖的,黄色有问题的,红色出错的

发布结果汇总

完整代码:

#!/usr/bin/env python3
"""
selective_publish.py
根据 front-matter publish: true 选择性复制文章到 Hexo _posts 目录。
递归扫描源目录,扁平复制(不保留子目录),目标同名文件直接覆盖。
详细日志,结束后汇总。
"""

import os
import re
import sys
import shutil
import configparser
import argparse
import yaml


# ---------- 跨平台 ANSI 颜色 ----------
if sys.platform == "win32":
    import ctypes as _ctypes
    _kernel32 = _ctypes.windll.kernel32
    _h = _kernel32.GetStdHandle(-11)  # STD_OUTPUT_HANDLE
    _m = _ctypes.c_uint32()
    if _kernel32.GetConsoleMode(_h, _ctypes.byref(_m)):
        _kernel32.SetConsoleMode(_h, _m.value | 0x0004)

GREEN   = "\033[32m"
RED     = "\033[31m"
YELLOW  = "\033[33m"
CYAN    = "\033[36m"
WHITE   = "\033[1;37m"
RESET   = "\033[0m"

C_GREEN  = GREEN
C_RED    = RED
C_YELLOW = YELLOW
C_CYAN   = CYAN
C_RESET  = RESET


def log_prefix(level, msg, color=""):
    text = f"[{level}] {msg}"
    if color:
        text = f"{color}{text}{RESET}"
    print(text)


def colored_print(text, color=""):
    if color:
        print(f"{color}{text}{RESET}")
    else:
        print(text)


# ---------- 状态码 ----------
STATUS_PUBLISHED_NEW = "PUBLISHED (new)"
STATUS_PUBLISHED_OVERWRITE = "PUBLISHED (overwrite)"
STATUS_SKIPPED_NO_MARK = "SKIPPED (publish not true)"
STATUS_SKIPPED_PARSE_ERROR = "SKIPPED (front-matter error)"
STATUS_SKIPPED_OVERWRITE_DISABLED = "SKIPPED (overwrite disabled)"
STATUS_SKIPPED_COPY_FAIL = "SKIPPED (copy failed)"
STATUS_SKIPPED_NO_FRONTMATTER = "SKIPPED (no front-matter)"
STATUS_ASSET_COPIED = "ASSETS_COPIED"
STATUS_ASSET_FAIL = "ASSETS_FAILED"
STATUS_ASSET_NOT_FOUND = "ASSETS_NOT_FOUND"

SECTION_PREFIX = "selective_publish"


def load_config(config_path):
    if not os.path.exists(config_path):
        print(f"[错误] 配置文件不存在: {config_path}")
        sys.exit(1)

    config = configparser.ConfigParser()
    config.read(config_path, encoding='utf-8')

    paths_section = f"{SECTION_PREFIX}.paths"
    options_section = f"{SECTION_PREFIX}.options"

    source_dir = os.path.expanduser(config.get(paths_section, 'source_dir'))
    target_dir = os.path.expanduser(config.get(paths_section, 'target_dir'))
    assets_target_dir = os.path.expanduser(
        config.get(paths_section, 'assets_target_dir', fallback='')
    )

    recursive = config.getboolean(options_section, 'recursive')
    overwrite = config.getboolean(options_section, 'overwrite')
    copy_assets = config.getboolean(options_section, 'copy_assets')
    asset_suffix = config.get(options_section, 'asset_folder_suffix', fallback='')
    verbose = config.getboolean(options_section, 'verbose')

    return {
        'source_dir': source_dir,
        'target_dir': target_dir,
        'assets_target_dir': assets_target_dir,
        'recursive': recursive,
        'overwrite': overwrite,
        'copy_assets': copy_assets,
        'asset_suffix': asset_suffix,
        'verbose': verbose,
    }


def sanitize_yaml_template(text):
    """
    预处理 YAML 文本,将 {{ ... }} 模板占位符替换为安全的字符串,
    避免 YAML 解析器将 {{ 误读为流式映射导致崩溃。
    """
    return re.sub(
        r'\{\{\s*(.*?)\s*\}\}',
        lambda m: f'"__TEMPLATE_{m.group(1).strip().replace(" ", "_")}__"',
        text,
    )


def get_front_matter(filepath):
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            content = f.read()
    except Exception as e:
        return None, f"读取文件失败: {e}"

    if not content.startswith('---'):
        return {}, None  # 无 front-matter,不算错误

    parts = content.split('---', 2)
    if len(parts) < 3:
        return {}, None  # 格式不完整,按无处理

    # 预处理模板语法 {{ }} -> 安全占位符
    yaml_text = parts[1]
    has_template = bool(re.search(r'\{\{.*?\}\}', yaml_text))
    if has_template:
        yaml_text = sanitize_yaml_template(yaml_text)

    try:
        meta = yaml.safe_load(yaml_text) or {}
        return meta, None
    except yaml.YAMLError as e:
        return None, f"YAML 解析失败: {e}"


def should_publish(meta):
    """meta 为 None 或 publish 不是布尔 True 则返回 False"""
    if meta is None:
        return False
    publish = meta.get('publish')
    # yaml 会自动将 true/True 解析为 Python bool True
    return isinstance(publish, bool) and publish is True


def copy_file(src, dst, overwrite, verbose):
    """返回 (状态, 原因)。状态: 'new'/'overwrite'/'unchanged'/False"""
    if os.path.exists(dst):
        # 比较文件内容,相同则跳过
        try:
            with open(src, 'rb') as f1, open(dst, 'rb') as f2:
                if f1.read() == f2.read():
                    if verbose:
                        print(f"  [不变] {os.path.basename(dst)} (内容相同,跳过)")
                    return "unchanged", None
        except Exception:
            pass

        if not overwrite:
            return False, f"目标已存在且 overwrite=false: {os.path.basename(dst)}"
        else:
            if verbose:
                print(f"  [覆盖] {os.path.basename(dst)}")
            try:
                shutil.copy2(src, dst)
                return "overwrite", None
            except Exception as e:
                return False, f"复制失败: {e}"

    # 目标不存在,新复制
    try:
        shutil.copy2(src, dst)
        if verbose:
            print(f"  [复制] {os.path.basename(src)} -> {dst}")
        return "new", None
    except Exception as e:
        return False, f"复制失败: {e}"


def copy_asset_folder(md_path, assets_target_dir, asset_suffix='', verbose=True):
    """尝试复制同名资产文件夹,返回 (成功状态, 原因)"""
    base = os.path.splitext(md_path)[0]
    article_name = os.path.basename(base)

    candidates = [
        os.path.join(os.path.dirname(md_path), article_name),
    ]
    if asset_suffix:
        candidates.insert(0, os.path.join(os.path.dirname(md_path), article_name + asset_suffix))

    for src_folder in candidates:
        if os.path.isdir(src_folder):
            dest_folder = os.path.join(assets_target_dir, os.path.basename(src_folder))
            try:
                if os.path.exists(dest_folder):
                    shutil.rmtree(dest_folder)
                shutil.copytree(src_folder, dest_folder)
                if verbose:
                    print(f"  [资产] {os.path.basename(src_folder)} -> {dest_folder}")
                return True, None
            except Exception as e:
                return False, f"复制资产失败: {e}"
    return False, "未找到资产文件夹"


def main():
    parser = argparse.ArgumentParser(description='根据 publish: true 选择性发布文章')
    parser.add_argument('-c', '--config', default=None, help='配置文件路径')
    parser.add_argument('-v', '--verbose', action='store_true', default=None,
                        help='强制启用详细日志(覆盖配置文件设置)')
    args = parser.parse_args()

    script_dir = os.path.dirname(os.path.abspath(__file__))
    config_path = args.config if args.config else os.path.join(script_dir, 'config.ini')
    cfg = load_config(config_path)

    source_dir = cfg['source_dir']
    target_dir = cfg['target_dir']
    assets_target_dir = cfg['assets_target_dir']
    recursive = cfg['recursive']
    overwrite = cfg['overwrite']
    copy_assets = cfg['copy_assets']
    asset_suffix = cfg['asset_suffix']
    verbose = cfg['verbose']
    if args.verbose is not None:
        verbose = args.verbose

    if not os.path.isdir(source_dir):
        print(f"[错误] 源目录不存在: {source_dir}")
        sys.exit(1)

    os.makedirs(target_dir, exist_ok=True)
    if copy_assets and assets_target_dir:
        os.makedirs(assets_target_dir, exist_ok=True)
    elif copy_assets and not assets_target_dir:
        assets_target_dir = target_dir
        if verbose:
            print("[提示] 未配置 assets_target_dir,资产将复制到文章同目录")

    # 收集 md 文件
    md_files = []
    for root, dirs, files in os.walk(source_dir):
        for file in files:
            if file.endswith('.md'):
                md_files.append(os.path.join(root, file))
        if not recursive:
            break

    print(f"扫描源目录: {source_dir}")
    print(f"找到 {len(md_files)} 个 .md 文件\n")

    # 统计
    results = {
        STATUS_PUBLISHED_NEW: [],
        STATUS_PUBLISHED_OVERWRITE: [],
        STATUS_SKIPPED_NO_MARK: [],
        STATUS_SKIPPED_PARSE_ERROR: [],
        STATUS_SKIPPED_NO_FRONTMATTER: [],
        STATUS_SKIPPED_OVERWRITE_DISABLED: [],
        STATUS_SKIPPED_COPY_FAIL: [],
        STATUS_ASSET_COPIED: [],
        STATUS_ASSET_FAIL: [],
        STATUS_ASSET_NOT_FOUND: [],
    }

    for md_path in md_files:
        rel_path = os.path.relpath(md_path, source_dir)

        meta, error = get_front_matter(md_path)

        if meta == {} and error is None:
            results[STATUS_SKIPPED_NO_FRONTMATTER].append(rel_path)
            if verbose:
                print(f"  [跳过] {rel_path} (无 front-matter)")
            continue

        if meta is None:
            results[STATUS_SKIPPED_PARSE_ERROR].append((rel_path, error))
            log_prefix("警告", f"{rel_path}: {error}", C_YELLOW)
            continue

        if not should_publish(meta):
            results[STATUS_SKIPPED_NO_MARK].append(rel_path)
            continue

        if verbose:
            print(f"  [处理] {rel_path}")

        # 执行发布
        filename = os.path.basename(md_path)
        dest_md = os.path.join(target_dir, filename)

        status, reason = copy_file(md_path, dest_md, overwrite=overwrite, verbose=verbose)
        if status == "new":
            results[STATUS_PUBLISHED_NEW].append(rel_path)
        elif status == "overwrite":
            results[STATUS_PUBLISHED_OVERWRITE].append(rel_path)
        elif status == "unchanged":
            pass
        else:
            results[STATUS_SKIPPED_COPY_FAIL].append((rel_path, reason))
            continue

        # 资产处理
        if copy_assets:
            asset_ok, asset_reason = copy_asset_folder(md_path, assets_target_dir, asset_suffix, verbose)
            if asset_ok:
                results[STATUS_ASSET_COPIED].append(rel_path)
            elif asset_reason and "未找到" not in asset_reason:
                results[STATUS_ASSET_FAIL].append((rel_path, asset_reason))
            else:
                results[STATUS_ASSET_NOT_FOUND].append(rel_path)

    # 汇总报告
    print()
    colored_print("=" * 40, WHITE)
    colored_print("发布完成,汇总如下:", WHITE)
    if results[STATUS_PUBLISHED_NEW]:
        colored_print(f"本次新发布: {len(results[STATUS_PUBLISHED_NEW])}", GREEN)
    else:
        print("本次新发布: 0")
    if results[STATUS_PUBLISHED_OVERWRITE]:
        colored_print(f"本次覆盖发布: {len(results[STATUS_PUBLISHED_OVERWRITE])}", CYAN)
    else:
        print("本次覆盖发布: 0")
    print(f"跳过 (publish 非 true): {len(results[STATUS_SKIPPED_NO_MARK])}")
    print(f"跳过 (无 front-matter): {len(results[STATUS_SKIPPED_NO_FRONTMATTER])}")
    if results[STATUS_SKIPPED_PARSE_ERROR]:
        colored_print(f"跳过 (front-matter 错误): {len(results[STATUS_SKIPPED_PARSE_ERROR])}", YELLOW)
    else:
        print("跳过 (front-matter 错误): 0")
    if results[STATUS_SKIPPED_COPY_FAIL]:
        colored_print(f"跳过 (复制失败): {len(results[STATUS_SKIPPED_COPY_FAIL])}", RED)
    else:
        print("跳过 (复制失败): 0")

    if copy_assets:
        print(f"资产已复制: {len(results[STATUS_ASSET_COPIED])}")
        print(f"资产未找到: {len(results[STATUS_ASSET_NOT_FOUND])}")
        print(f"资产复制失败: {len(results[STATUS_ASSET_FAIL])}")

    # 详细列表
    if results[STATUS_PUBLISHED_NEW]:
        colored_print(f"\n-- 本次新发布 ({len(results[STATUS_PUBLISHED_NEW])} 个) --", GREEN)
        for f in results[STATUS_PUBLISHED_NEW]:
            print(f"  {f}")
    if results[STATUS_PUBLISHED_OVERWRITE]:
        colored_print(f"\n-- 本次覆盖发布 ({len(results[STATUS_PUBLISHED_OVERWRITE])} 个) --", CYAN)
        for f in results[STATUS_PUBLISHED_OVERWRITE]:
            print(f"  {f}")
    if results[STATUS_SKIPPED_NO_MARK]:
        print(f"\n-- publish 非 true ({len(results[STATUS_SKIPPED_NO_MARK])} 个) --")
        for f in results[STATUS_SKIPPED_NO_MARK]:
            print(f"  {f}")
    if results[STATUS_SKIPPED_PARSE_ERROR]:
        colored_print(f"\n-- front-matter 解析错误 ({len(results[STATUS_SKIPPED_PARSE_ERROR])} 个) --", RED)
        for f, reason in results[STATUS_SKIPPED_PARSE_ERROR]:
            print(f"  {f}")
            print(f"    原因: {reason}")
    if results[STATUS_SKIPPED_COPY_FAIL]:
        print(f"\n-- 复制失败 ({len(results[STATUS_SKIPPED_COPY_FAIL])} 个) --")
        for f, reason in results[STATUS_SKIPPED_COPY_FAIL]:
            print(f"  {f}")
            print(f"    原因: {reason}")
    if copy_assets and results[STATUS_ASSET_FAIL]:
        print(f"\n-- 资产复制失败 ({len(results[STATUS_ASSET_FAIL])} 个) --")
        for f, reason in results[STATUS_ASSET_FAIL]:
            print(f"  {f}")
            print(f"    原因: {reason}")


if __name__ == '__main__':
    main()

2. 一键构建预览

hexo.bat 把 Hexo 命令封装成菜单,按数字操作:

hexo.bat 菜单

@echo off
setlocal

set "HEXO_ROOT=D:\xxx"
if "%HEXO_ROOT%"=="" (
    echo [错误] 未配置 hexo_root
    pause
    exit /b
)

:menu
cls
echo.
echo ============================================================
echo   Hexo 操作助手
echo   项目根目录: %HEXO_ROOT%
echo ============================================================
echo.
echo   [1] hexo clean + hexo g (清理并生成)
echo   [2] hexo clean + hexo g + hexo s (清理并生成,并新窗口预览)
echo   [3] hexo s (新窗口仅预览)
echo   [0] 退出
echo.
set "choice="
set /p choice="请选择编号 (0-3): "

if "%choice%"=="1" goto :generate
if "%choice%"=="2" goto :serve
if "%choice%"=="3" goto :serve_only
if "%choice%"=="0" goto :end
echo 无效选项
timeout /t 1 >nul
goto :menu

:generate
echo.
echo [信息] 执行 hexo clean + hexo g ...
cd /d "%HEXO_ROOT%"
call hexo clean
call hexo g
echo.
echo Hexo 生成完成!按任意键返回菜单...
pause >nul
goto :menu

:serve
echo.
echo [信息] 执行 hexo clean + hexo g ...
cd /d "%HEXO_ROOT%"
call hexo clean
call hexo g
echo [信息] Hexo server 将在新窗口启动,请等待加载完成后返回菜单...
start "Hexo Server" cmd /c "cd /d %HEXO_ROOT% && hexo s"
echo 按任意键返回菜单...
pause >nul
goto :menu

:serve_only
echo [信息] Hexo server 将在新窗口启动...
start "Hexo Server" cmd /c "cd /d %HEXO_ROOT% && hexo s"
echo 按任意键返回菜单...
pause >nul
goto :menu

:end
endlocal

hexo serverstart 在独立窗口启动,互不影响。


3. 发布 + 构建联动

publish.bat 是统一入口,先选择性发布,发布后提示是否构建/预览。

@echo off
setlocal

:menu
cls
echo.
echo ============================================================
echo   选择性发布笔记工具
echo   基于 front-matter 的 publish: true 复制到 Hexo
echo ============================================================
echo.
echo   [1] 使用默认配置发布
echo   [2] 显示详细日志发布 (verbose 模式)
echo   [3] 指定配置文件发布
echo   [H] 转到 Hexo 操作助手
echo   [0] 退出
echo.
set "choice="
set /p choice="请选择编号 (0-3 / H): "

if "%choice%"=="1" goto :default
if "%choice%"=="2" goto :verbose
if "%choice%"=="3" goto :custom_config
if /i "%choice%"=="H" goto :hexo
if "%choice%"=="0" goto :end
echo 无效选项
timeout /t 1 >nul
goto :menu

:default
python "%~dp0Scripts\selective_publish.py"
goto :after_publish

:verbose
python "%~dp0Scripts\selective_publish.py" -v
goto :after_publish

:custom_config
set "cfg="
set /p cfg="请输入配置文件路径 (直接回车使用默认): "
if "%cfg%"=="" goto :default
python "%~dp0Scripts\selective_publish.py" -c "%cfg%"
goto :after_publish

:after_publish
echo.
echo ============================================================
echo 发布完成!是否执行 Hexo 操作?
echo ============================================================
echo.
echo   [1] hexo clean + hexo g (清理并生成)
echo   [2] hexo clean + hexo g + hexo s (清理生成并新窗口预览)
echo   [0] 返回主菜单
echo.
set "choice="
set /p choice="请选择编号 (0-2): "

if "%choice%"=="1" goto :hexo_gen
if "%choice%"=="2" goto :hexo_serve
goto :menu

:hexo_gen
set "H=D:\xxx"
if "%H%"=="" (echo [错误] 未配置 hexo_root && pause && goto :menu)
cd /d "%H%"
call hexo clean
call hexo g
echo Hexo 生成完成!按任意键返回菜单...
pause >nul
goto :menu

:hexo_serve
set "H=D:\xxx"
if "%H%"=="" (echo [错误] 未配置 hexo_root && pause && goto :menu)
cd /d "%H%"
call hexo clean
call hexo g
echo [信息] Hexo server 将在新窗口启动...
start "Hexo Server" cmd /c "cd /d %H% && hexo s"
echo 按任意键返回菜单...
pause >nul
goto :menu

:hexo
if exist "%~dp0hexo.bat" (call "%~dp0hexo.bat") else (echo [错误] 找不到 hexo.bat && pause)
goto :menu

:end
endlocal

4. 依赖自动管理

install_dependencies.py 扫描 Scripts 下所有 .py 的 import 语句,自动安装缺失的第三方依赖。换了电脑运行 install_deps.bat 即可。

install_deps.bat 菜单:

@echo off
setlocal

:menu
cls
echo.
echo ============================================================
echo   Python 依赖自动安装工具
echo   自动扫描并安装 Scripts/ 下脚本需要的第三方库
echo ============================================================
echo.
echo   [1] 检查并安装所有缺失依赖
echo   [2] 仅检查,不安装 (dry-run)
echo   [3] 安装并生成 requirements.txt
echo   [4] 仅检查并生成 requirements.txt
echo   [0] 退出
echo.
set "choice="
set /p choice="请选择编号 (0-4): "

if "%choice%"=="1" goto :install
if "%choice%"=="2" goto :dryrun
if "%choice%"=="3" goto :install_req
if "%choice%"=="4" goto :dryrun_req
if "%choice%"=="0" goto :end
echo 无效选项
timeout /t 1 >nul
goto :menu

:install
python "%~dp0Scripts\install_dependencies.py"
echo.
echo 按任意键返回菜单...
pause >nul
goto :menu

:dryrun
python "%~dp0Scripts\install_dependencies.py" --dry-run
echo.
echo 按任意键返回菜单...
pause >nul
goto :menu

:install_req
python "%~dp0Scripts\install_dependencies.py" --generate-requirements
echo.
echo 按任意键返回菜单...
pause >nul
goto :menu

:dryrun_req
python "%~dp0Scripts\install_dependencies.py" --dry-run --generate-requirements
echo.
echo 按任意键返回菜单...
pause >nul
goto :menu

:end
endlocal

install_dependencies.py 的主要代码:

#!/usr/bin/env python3
"""
install_dependencies.py
自动扫描 Scripts/ 目录下所有 .py 文件的 import 语句,识别第三方依赖,
检查是否已安装,并自动安装缺失的包。

用法:
    python install_dependencies.py
    python install_dependencies.py --dry-run          # 仅检查,不安装
    python install_dependencies.py -r                 # 生成 requirements.txt
    python install_dependencies.py --pip <pip-path>   # 指定 pip 路径
"""

import os
import re
import sys
import ast
import subprocess
import importlib.util
from pathlib import Path


# ---------- 跨平台 ANSI 颜色 ----------
if sys.platform == "win32":
    import ctypes as _ctypes
    _kernel32 = _ctypes.windll.kernel32
    _h = _kernel32.GetStdHandle(-11)
    _m = _ctypes.c_uint32()
    if _kernel32.GetConsoleMode(_h, _ctypes.byref(_m)):
        _kernel32.SetConsoleMode(_h, _m.value | 0x0004)

GREEN   = "\033[32m"
RED     = "\033[31m"
YELLOW  = "\033[33m"
CYAN    = "\033[36m"
WHITE   = "\033[1;37m"
RESET   = "\033[0m"

C_GREEN  = GREEN
C_RED    = RED
C_YELLOW = YELLOW
C_CYAN   = CYAN
C_RESET  = RESET


def log_prefix(level, msg, color=""):
    text = f"[{level}] {msg}"
    if color:
        text = f"{color}{text}{RESET}"
    print(text)


def colored_print(text, color=""):
    if color:
        print(f"{color}{text}{RESET}")
    else:
        print(text)


# ---------------------------------------------------------------------------
# import 名 -> pip 包名 映射表(有些包的 import 名和 pip install 名不同)
# ---------------------------------------------------------------------------
IMPORT_TO_PIP: dict[str, str] = {
    "yaml": "PyYAML",
    "PIL": "Pillow",
    "cv2": "opencv-python",
    "bs4": "beautifulsoup4",
    "sklearn": "scikit-learn",
    "dotenv": "python-dotenv",
    "dateutil": "python-dateutil",
    "MySQLdb": "mysqlclient",
    "pymysql": "PyMySQL",
}


def get_script_dir() -> Path:
    return Path(__file__).resolve().parent


def find_python_files(scan_dir: Path) -> list[Path]:
    """递归扫描 scan_dir 下所有 .py 文件"""
    py_files = sorted(scan_dir.rglob("*.py"))
    for f in py_files:
        print(f"  -> {f.relative_to(scan_dir)}")
    return py_files


def extract_imports(py_file: Path) -> set[str]:
    """从 .py 文件中提取所有顶层包名,AST 解析 + 正则回退"""
    imports: set[str] = set()

    try:
        source = py_file.read_text(encoding="utf-8")
    except Exception as e:
        print(f"[警告] 读取 {py_file.name} 失败: {e}")
        return imports

    # 策略 A: AST 解析
    try:
        tree = ast.parse(source)
        for node in ast.walk(tree):
            if isinstance(node, ast.Import):
                for alias in node.names:
                    imports.add(alias.name.split(".")[0])
            elif isinstance(node, ast.ImportFrom):
                if node.module:
                    imports.add(node.module.split(".")[0])
        return imports
    except SyntaxError:
        # 策略 B: 正则回退(处理非标准语法)
        print(f"  AST 解析失败,使用正则回退")
        pattern = re.compile(
            r"^\s*(?:from\s+([a-zA-Z_]\w*)|import\s+([a-zA-Z_]\w*))",
            re.MULTILINE,
        )
        for match in pattern.finditer(source):
            mod = match.group(1) or match.group(2)
            if mod:
                imports.add(mod)
        return imports


def get_stdlib_names() -> set[str]:
    """获取当前 Python 版本的标准库模块名列表"""
    if hasattr(sys, "stdlib_module_names"):
        return set(sys.stdlib_module_names)
    # 旧版本回退:静态标准库列表
    return {"abc", "ast", "collections", "datetime", "json",
            "os", "pathlib", "re", "shutil", "subprocess", "sys",
            "threading", "time", "typing", "unittest", "urllib",
            "xml", "zipfile", "zlib", }


def is_installed(import_name: str, verbose: bool = True) -> bool:
    """使用 importlib.util.find_spec 检查模块是否可导入"""
    spec = importlib.util.find_spec(import_name)
    installed = spec is not None
    if verbose:
        status = "[OK] 已安装" if installed else "[X] 未安装"
        print(f"  {import_name}: {status}")
    return installed


def get_package_version(import_name: str) -> str | None:
    """尝试获取已安装包的版本号"""
    pip_name = IMPORT_TO_PIP.get(import_name, import_name)
    for name in (pip_name, import_name):
        try:
            from importlib.metadata import version
            return version(name)
        except Exception:
            pass
    return None


def install_package(pip_name: str, pip_path: str | None = None) -> bool:
    """使用 pip 安装指定的包"""
    if pip_path:
        cmd = [pip_path, "install", pip_name]
    else:
        cmd = [sys.executable, "-m", "pip", "install", pip_name]

    print(f"执行: {' '.join(cmd)}")
    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode == 0:
            print(f"  {pip_name} 安装成功 [OK]")
            return True
        else:
            print(f"  {pip_name} 安装失败 [X]")
            return False
    except Exception as e:
        print(f"  调用 pip 异常: {e}")
        return False


def main() -> None:
    dry_run = "--dry-run" in sys.argv or "-n" in sys.argv
    gen_requirements = "--generate-requirements" in sys.argv or "-r" in sys.argv

    script_dir = get_script_dir()
    py_files = find_python_files(script_dir)
    # 排除自身
    self_name = Path(__file__).name
    py_files = [f for f in py_files if f.name != self_name]

    # 提取所有 import
    all_imports: set[str] = set()
    for py_file in py_files:
        all_imports.update(extract_imports(py_file))

    # 过滤标准库
    stdlib = get_stdlib_names()
    third_party = {name for name in all_imports if name not in stdlib}

    if not third_party:
        print("未检测到第三方依赖,无需安装")
        return

    # 检查并安装
    missing = []
    for import_name in sorted(third_party):
        pip_name = IMPORT_TO_PIP.get(import_name, import_name)
        if not is_installed(import_name, verbose=False):
            if not is_installed(pip_name, verbose=False):
                missing.append((import_name, pip_name))

    if not missing:
        print("所有依赖均已就绪 [OK]")
        return

    if dry_run:
        print(f"DRY RUN — 跳过安装,缺失: {', '.join(p for _, p in missing)}")
        return

    for import_name, pip_name in missing:
        print(f"正在安装 [{pip_name}] ...")
        if install_package(pip_name):
            if is_installed(import_name, verbose=False):
                ver = get_package_version(import_name)
                print(f"  [{pip_name}] 验证通过 ({ver or 'unknown'}) [OK]")
            else:
                print(f"  [{pip_name}] 安装完成但 import 仍失败 [X]")
        else:
            print(f"  [{pip_name}] 安装失败 [X]")


if __name__ == "__main__":
    main()

关键设计:

  • 零外部依赖自举:自身只用标准库,即使在全新 Python 环境中也能直接运行
  • AST 解析而非正则:精确提取 import,不会被注释中的 import xxx 干扰
  • 包名映射表:自动纠正 yamlPyYAML 等 import 名与 pip 名的差异
  • 安装后验证pip install 完成后用 importlib.util.find_spec 验证是否真的可导入

依赖安装结果


5. 配置文件统一管理

所有路径和选项收进一个 Scripts/config.ini,INI 格式,按 脚本名.分类 起 section 名:

[selective_publish.paths]

# 笔记源目录,存放所有 md 文章(将递归扫描)
source_dir = D:\xxx

# Hexo 文章目标目录,通常为 Hexo 项目的 source/_posts
target_dir = D:\xxx\source\_posts

# 资源(如图片)的目标目录(仅当 copy_assets = true 时使用)
assets_target_dir =

# Hexo 项目根目录(用于 hexo clean / generate / server)
hexo_root = D:\xxx

[selective_publish.options]

# 是否递归扫描 source_dir 下的子目录
recursive = true

# 目标文件已存在时是否直接覆盖
overwrite = true

# 是否同时复制与文章同名的资源文件夹
copy_assets = false

# 资源文件夹的后缀名(如 .assets)
asset_folder_suffix =

# 是否输出详细的操作日志
verbose = false

路径不硬编码,换目录或换机器只改一个地方即可。


简化了哪些操作

操作 原来 现在
挑选待发布文章 手动复制到 _posts,凭记忆判断是否已发布 front-matter 中设 publish: true,脚本自动筛选
复制文章 手动操作 selective_publish.py 批量处理,未修改自动跳过
构建站点 hexo clean && hexo g hexo.bat 菜单按 1
本地预览 hexo s hexo.bat 菜单按 3,或发布后直接联动
端到端发布 至少 3 步独立操作 运行 publish.bat,按两次数字
安装依赖 手动 pip install,逐个排查 运行 install_deps.bat,全自动

脚本做了三件事:判断该发什么(读 front-matter)、下一步该干什么(菜单引导)、缺什么(自动安装)。


文章作者: 草莓多多
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 草莓多多 !
  目录