Hexo 工作流优化
原始工作流
使用 Hexo 写博客的典型流程是:
- 在
source/_posts目录下新建.md文件,编写文章 - 执行
hexo clean && hexo generate生成静态页面 - 执行
hexo server本地预览 - 确认无误后
hexo deploy部署
这套流程本身不复杂,但有几个痛点:
笔记与博客分离。 我习惯把所有笔记统一存放在一个仓库里,其中只有部分值得发布的才需要同步到 Hexo。每次发布前,需要手动把文章复制到 _posts 目录,回头还要记哪些已复制、哪些没复制。
多条命令反复敲。 hexo clean、hexo g、hexo s 每次都要手动输入,频繁切换。
依赖不可见。 这个是我自己写的 python 脚本中用到 PyYAML 等第三方库,配置环境时需要手动 pip install,换了电脑就得重新查一遍。
优化后的工作流
写了一套脚本解决这几个问题。现在的流程:
写笔记 → 加 publish: true → 双击 publish.bat → 选择操作 → 完成
目录结构:
D:\xxx\ # 笔记仓库
├── publish.bat # 选择性发布入口(调用 selective_publish.py)
├── hexo.bat # Hexo 操作助手(clean / generate / server)
├── install_deps.bat # Python 依赖安装助手
├── Scripts\
│ ├── config.ini # 统一配置文件(所有脚本共用)
│ ├── selective_publish.py # 选择性发布脚本(核心引擎)
│ ├── install_dependencies.py # 依赖扫描安装工具
│ └── requirements.txt # 由 install_dependencies.py -r 自动生成
1. 选择性发布(核心引擎)
selective_publish.py 扫描笔记仓库中的所有 .md 文件,解析 front-matter,把 publish: true 的文章复制到 Hexo 的 _posts 目录。
几个处理细节:
- 智能跳过:逐字节比较文件内容,没改过的不会再复制一遍
- 模板兼容:如果你有模板文件,可能会用到
{{ }},所以需要把{{ }}模板标签替换成安全占位符,避免 YAML 解析崩掉 - 分类报告:用颜色区分不同状态,绿色是新发布的,青色是覆盖的,黄色有问题的,红色出错的
完整代码:
#!/usr/bin/env python3
"""
selective_publish.py
根据 front-matter publish: true 选择性复制文章到 Hexo _posts 目录。
递归扫描源目录,扁平复制(不保留子目录),目标同名文件直接覆盖。
详细日志,结束后汇总。
"""
import os
import re
import sys
import shutil
import configparser
import argparse
import yaml
# ---------- 跨平台 ANSI 颜色 ----------
if sys.platform == "win32":
import ctypes as _ctypes
_kernel32 = _ctypes.windll.kernel32
_h = _kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE
_m = _ctypes.c_uint32()
if _kernel32.GetConsoleMode(_h, _ctypes.byref(_m)):
_kernel32.SetConsoleMode(_h, _m.value | 0x0004)
GREEN = "\033[32m"
RED = "\033[31m"
YELLOW = "\033[33m"
CYAN = "\033[36m"
WHITE = "\033[1;37m"
RESET = "\033[0m"
C_GREEN = GREEN
C_RED = RED
C_YELLOW = YELLOW
C_CYAN = CYAN
C_RESET = RESET
def log_prefix(level, msg, color=""):
text = f"[{level}] {msg}"
if color:
text = f"{color}{text}{RESET}"
print(text)
def colored_print(text, color=""):
if color:
print(f"{color}{text}{RESET}")
else:
print(text)
# ---------- 状态码 ----------
STATUS_PUBLISHED_NEW = "PUBLISHED (new)"
STATUS_PUBLISHED_OVERWRITE = "PUBLISHED (overwrite)"
STATUS_SKIPPED_NO_MARK = "SKIPPED (publish not true)"
STATUS_SKIPPED_PARSE_ERROR = "SKIPPED (front-matter error)"
STATUS_SKIPPED_OVERWRITE_DISABLED = "SKIPPED (overwrite disabled)"
STATUS_SKIPPED_COPY_FAIL = "SKIPPED (copy failed)"
STATUS_SKIPPED_NO_FRONTMATTER = "SKIPPED (no front-matter)"
STATUS_ASSET_COPIED = "ASSETS_COPIED"
STATUS_ASSET_FAIL = "ASSETS_FAILED"
STATUS_ASSET_NOT_FOUND = "ASSETS_NOT_FOUND"
SECTION_PREFIX = "selective_publish"
def load_config(config_path):
if not os.path.exists(config_path):
print(f"[错误] 配置文件不存在: {config_path}")
sys.exit(1)
config = configparser.ConfigParser()
config.read(config_path, encoding='utf-8')
paths_section = f"{SECTION_PREFIX}.paths"
options_section = f"{SECTION_PREFIX}.options"
source_dir = os.path.expanduser(config.get(paths_section, 'source_dir'))
target_dir = os.path.expanduser(config.get(paths_section, 'target_dir'))
assets_target_dir = os.path.expanduser(
config.get(paths_section, 'assets_target_dir', fallback='')
)
recursive = config.getboolean(options_section, 'recursive')
overwrite = config.getboolean(options_section, 'overwrite')
copy_assets = config.getboolean(options_section, 'copy_assets')
asset_suffix = config.get(options_section, 'asset_folder_suffix', fallback='')
verbose = config.getboolean(options_section, 'verbose')
return {
'source_dir': source_dir,
'target_dir': target_dir,
'assets_target_dir': assets_target_dir,
'recursive': recursive,
'overwrite': overwrite,
'copy_assets': copy_assets,
'asset_suffix': asset_suffix,
'verbose': verbose,
}
def sanitize_yaml_template(text):
"""
预处理 YAML 文本,将 {{ ... }} 模板占位符替换为安全的字符串,
避免 YAML 解析器将 {{ 误读为流式映射导致崩溃。
"""
return re.sub(
r'\{\{\s*(.*?)\s*\}\}',
lambda m: f'"__TEMPLATE_{m.group(1).strip().replace(" ", "_")}__"',
text,
)
def get_front_matter(filepath):
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
return None, f"读取文件失败: {e}"
if not content.startswith('---'):
return {}, None # 无 front-matter,不算错误
parts = content.split('---', 2)
if len(parts) < 3:
return {}, None # 格式不完整,按无处理
# 预处理模板语法 {{ }} -> 安全占位符
yaml_text = parts[1]
has_template = bool(re.search(r'\{\{.*?\}\}', yaml_text))
if has_template:
yaml_text = sanitize_yaml_template(yaml_text)
try:
meta = yaml.safe_load(yaml_text) or {}
return meta, None
except yaml.YAMLError as e:
return None, f"YAML 解析失败: {e}"
def should_publish(meta):
"""meta 为 None 或 publish 不是布尔 True 则返回 False"""
if meta is None:
return False
publish = meta.get('publish')
# yaml 会自动将 true/True 解析为 Python bool True
return isinstance(publish, bool) and publish is True
def copy_file(src, dst, overwrite, verbose):
"""返回 (状态, 原因)。状态: 'new'/'overwrite'/'unchanged'/False"""
if os.path.exists(dst):
# 比较文件内容,相同则跳过
try:
with open(src, 'rb') as f1, open(dst, 'rb') as f2:
if f1.read() == f2.read():
if verbose:
print(f" [不变] {os.path.basename(dst)} (内容相同,跳过)")
return "unchanged", None
except Exception:
pass
if not overwrite:
return False, f"目标已存在且 overwrite=false: {os.path.basename(dst)}"
else:
if verbose:
print(f" [覆盖] {os.path.basename(dst)}")
try:
shutil.copy2(src, dst)
return "overwrite", None
except Exception as e:
return False, f"复制失败: {e}"
# 目标不存在,新复制
try:
shutil.copy2(src, dst)
if verbose:
print(f" [复制] {os.path.basename(src)} -> {dst}")
return "new", None
except Exception as e:
return False, f"复制失败: {e}"
def copy_asset_folder(md_path, assets_target_dir, asset_suffix='', verbose=True):
"""尝试复制同名资产文件夹,返回 (成功状态, 原因)"""
base = os.path.splitext(md_path)[0]
article_name = os.path.basename(base)
candidates = [
os.path.join(os.path.dirname(md_path), article_name),
]
if asset_suffix:
candidates.insert(0, os.path.join(os.path.dirname(md_path), article_name + asset_suffix))
for src_folder in candidates:
if os.path.isdir(src_folder):
dest_folder = os.path.join(assets_target_dir, os.path.basename(src_folder))
try:
if os.path.exists(dest_folder):
shutil.rmtree(dest_folder)
shutil.copytree(src_folder, dest_folder)
if verbose:
print(f" [资产] {os.path.basename(src_folder)} -> {dest_folder}")
return True, None
except Exception as e:
return False, f"复制资产失败: {e}"
return False, "未找到资产文件夹"
def main():
parser = argparse.ArgumentParser(description='根据 publish: true 选择性发布文章')
parser.add_argument('-c', '--config', default=None, help='配置文件路径')
parser.add_argument('-v', '--verbose', action='store_true', default=None,
help='强制启用详细日志(覆盖配置文件设置)')
args = parser.parse_args()
script_dir = os.path.dirname(os.path.abspath(__file__))
config_path = args.config if args.config else os.path.join(script_dir, 'config.ini')
cfg = load_config(config_path)
source_dir = cfg['source_dir']
target_dir = cfg['target_dir']
assets_target_dir = cfg['assets_target_dir']
recursive = cfg['recursive']
overwrite = cfg['overwrite']
copy_assets = cfg['copy_assets']
asset_suffix = cfg['asset_suffix']
verbose = cfg['verbose']
if args.verbose is not None:
verbose = args.verbose
if not os.path.isdir(source_dir):
print(f"[错误] 源目录不存在: {source_dir}")
sys.exit(1)
os.makedirs(target_dir, exist_ok=True)
if copy_assets and assets_target_dir:
os.makedirs(assets_target_dir, exist_ok=True)
elif copy_assets and not assets_target_dir:
assets_target_dir = target_dir
if verbose:
print("[提示] 未配置 assets_target_dir,资产将复制到文章同目录")
# 收集 md 文件
md_files = []
for root, dirs, files in os.walk(source_dir):
for file in files:
if file.endswith('.md'):
md_files.append(os.path.join(root, file))
if not recursive:
break
print(f"扫描源目录: {source_dir}")
print(f"找到 {len(md_files)} 个 .md 文件\n")
# 统计
results = {
STATUS_PUBLISHED_NEW: [],
STATUS_PUBLISHED_OVERWRITE: [],
STATUS_SKIPPED_NO_MARK: [],
STATUS_SKIPPED_PARSE_ERROR: [],
STATUS_SKIPPED_NO_FRONTMATTER: [],
STATUS_SKIPPED_OVERWRITE_DISABLED: [],
STATUS_SKIPPED_COPY_FAIL: [],
STATUS_ASSET_COPIED: [],
STATUS_ASSET_FAIL: [],
STATUS_ASSET_NOT_FOUND: [],
}
for md_path in md_files:
rel_path = os.path.relpath(md_path, source_dir)
meta, error = get_front_matter(md_path)
if meta == {} and error is None:
results[STATUS_SKIPPED_NO_FRONTMATTER].append(rel_path)
if verbose:
print(f" [跳过] {rel_path} (无 front-matter)")
continue
if meta is None:
results[STATUS_SKIPPED_PARSE_ERROR].append((rel_path, error))
log_prefix("警告", f"{rel_path}: {error}", C_YELLOW)
continue
if not should_publish(meta):
results[STATUS_SKIPPED_NO_MARK].append(rel_path)
continue
if verbose:
print(f" [处理] {rel_path}")
# 执行发布
filename = os.path.basename(md_path)
dest_md = os.path.join(target_dir, filename)
status, reason = copy_file(md_path, dest_md, overwrite=overwrite, verbose=verbose)
if status == "new":
results[STATUS_PUBLISHED_NEW].append(rel_path)
elif status == "overwrite":
results[STATUS_PUBLISHED_OVERWRITE].append(rel_path)
elif status == "unchanged":
pass
else:
results[STATUS_SKIPPED_COPY_FAIL].append((rel_path, reason))
continue
# 资产处理
if copy_assets:
asset_ok, asset_reason = copy_asset_folder(md_path, assets_target_dir, asset_suffix, verbose)
if asset_ok:
results[STATUS_ASSET_COPIED].append(rel_path)
elif asset_reason and "未找到" not in asset_reason:
results[STATUS_ASSET_FAIL].append((rel_path, asset_reason))
else:
results[STATUS_ASSET_NOT_FOUND].append(rel_path)
# 汇总报告
print()
colored_print("=" * 40, WHITE)
colored_print("发布完成,汇总如下:", WHITE)
if results[STATUS_PUBLISHED_NEW]:
colored_print(f"本次新发布: {len(results[STATUS_PUBLISHED_NEW])}", GREEN)
else:
print("本次新发布: 0")
if results[STATUS_PUBLISHED_OVERWRITE]:
colored_print(f"本次覆盖发布: {len(results[STATUS_PUBLISHED_OVERWRITE])}", CYAN)
else:
print("本次覆盖发布: 0")
print(f"跳过 (publish 非 true): {len(results[STATUS_SKIPPED_NO_MARK])}")
print(f"跳过 (无 front-matter): {len(results[STATUS_SKIPPED_NO_FRONTMATTER])}")
if results[STATUS_SKIPPED_PARSE_ERROR]:
colored_print(f"跳过 (front-matter 错误): {len(results[STATUS_SKIPPED_PARSE_ERROR])}", YELLOW)
else:
print("跳过 (front-matter 错误): 0")
if results[STATUS_SKIPPED_COPY_FAIL]:
colored_print(f"跳过 (复制失败): {len(results[STATUS_SKIPPED_COPY_FAIL])}", RED)
else:
print("跳过 (复制失败): 0")
if copy_assets:
print(f"资产已复制: {len(results[STATUS_ASSET_COPIED])}")
print(f"资产未找到: {len(results[STATUS_ASSET_NOT_FOUND])}")
print(f"资产复制失败: {len(results[STATUS_ASSET_FAIL])}")
# 详细列表
if results[STATUS_PUBLISHED_NEW]:
colored_print(f"\n-- 本次新发布 ({len(results[STATUS_PUBLISHED_NEW])} 个) --", GREEN)
for f in results[STATUS_PUBLISHED_NEW]:
print(f" {f}")
if results[STATUS_PUBLISHED_OVERWRITE]:
colored_print(f"\n-- 本次覆盖发布 ({len(results[STATUS_PUBLISHED_OVERWRITE])} 个) --", CYAN)
for f in results[STATUS_PUBLISHED_OVERWRITE]:
print(f" {f}")
if results[STATUS_SKIPPED_NO_MARK]:
print(f"\n-- publish 非 true ({len(results[STATUS_SKIPPED_NO_MARK])} 个) --")
for f in results[STATUS_SKIPPED_NO_MARK]:
print(f" {f}")
if results[STATUS_SKIPPED_PARSE_ERROR]:
colored_print(f"\n-- front-matter 解析错误 ({len(results[STATUS_SKIPPED_PARSE_ERROR])} 个) --", RED)
for f, reason in results[STATUS_SKIPPED_PARSE_ERROR]:
print(f" {f}")
print(f" 原因: {reason}")
if results[STATUS_SKIPPED_COPY_FAIL]:
print(f"\n-- 复制失败 ({len(results[STATUS_SKIPPED_COPY_FAIL])} 个) --")
for f, reason in results[STATUS_SKIPPED_COPY_FAIL]:
print(f" {f}")
print(f" 原因: {reason}")
if copy_assets and results[STATUS_ASSET_FAIL]:
print(f"\n-- 资产复制失败 ({len(results[STATUS_ASSET_FAIL])} 个) --")
for f, reason in results[STATUS_ASSET_FAIL]:
print(f" {f}")
print(f" 原因: {reason}")
if __name__ == '__main__':
main()
2. 一键构建预览
hexo.bat 把 Hexo 命令封装成菜单,按数字操作:
@echo off
setlocal
set "HEXO_ROOT=D:\xxx"
if "%HEXO_ROOT%"=="" (
echo [错误] 未配置 hexo_root
pause
exit /b
)
:menu
cls
echo.
echo ============================================================
echo Hexo 操作助手
echo 项目根目录: %HEXO_ROOT%
echo ============================================================
echo.
echo [1] hexo clean + hexo g (清理并生成)
echo [2] hexo clean + hexo g + hexo s (清理并生成,并新窗口预览)
echo [3] hexo s (新窗口仅预览)
echo [0] 退出
echo.
set "choice="
set /p choice="请选择编号 (0-3): "
if "%choice%"=="1" goto :generate
if "%choice%"=="2" goto :serve
if "%choice%"=="3" goto :serve_only
if "%choice%"=="0" goto :end
echo 无效选项
timeout /t 1 >nul
goto :menu
:generate
echo.
echo [信息] 执行 hexo clean + hexo g ...
cd /d "%HEXO_ROOT%"
call hexo clean
call hexo g
echo.
echo Hexo 生成完成!按任意键返回菜单...
pause >nul
goto :menu
:serve
echo.
echo [信息] 执行 hexo clean + hexo g ...
cd /d "%HEXO_ROOT%"
call hexo clean
call hexo g
echo [信息] Hexo server 将在新窗口启动,请等待加载完成后返回菜单...
start "Hexo Server" cmd /c "cd /d %HEXO_ROOT% && hexo s"
echo 按任意键返回菜单...
pause >nul
goto :menu
:serve_only
echo [信息] Hexo server 将在新窗口启动...
start "Hexo Server" cmd /c "cd /d %HEXO_ROOT% && hexo s"
echo 按任意键返回菜单...
pause >nul
goto :menu
:end
endlocal
hexo server 用 start 在独立窗口启动,互不影响。
3. 发布 + 构建联动
publish.bat 是统一入口,先选择性发布,发布后提示是否构建/预览。
@echo off
setlocal
:menu
cls
echo.
echo ============================================================
echo 选择性发布笔记工具
echo 基于 front-matter 的 publish: true 复制到 Hexo
echo ============================================================
echo.
echo [1] 使用默认配置发布
echo [2] 显示详细日志发布 (verbose 模式)
echo [3] 指定配置文件发布
echo [H] 转到 Hexo 操作助手
echo [0] 退出
echo.
set "choice="
set /p choice="请选择编号 (0-3 / H): "
if "%choice%"=="1" goto :default
if "%choice%"=="2" goto :verbose
if "%choice%"=="3" goto :custom_config
if /i "%choice%"=="H" goto :hexo
if "%choice%"=="0" goto :end
echo 无效选项
timeout /t 1 >nul
goto :menu
:default
python "%~dp0Scripts\selective_publish.py"
goto :after_publish
:verbose
python "%~dp0Scripts\selective_publish.py" -v
goto :after_publish
:custom_config
set "cfg="
set /p cfg="请输入配置文件路径 (直接回车使用默认): "
if "%cfg%"=="" goto :default
python "%~dp0Scripts\selective_publish.py" -c "%cfg%"
goto :after_publish
:after_publish
echo.
echo ============================================================
echo 发布完成!是否执行 Hexo 操作?
echo ============================================================
echo.
echo [1] hexo clean + hexo g (清理并生成)
echo [2] hexo clean + hexo g + hexo s (清理生成并新窗口预览)
echo [0] 返回主菜单
echo.
set "choice="
set /p choice="请选择编号 (0-2): "
if "%choice%"=="1" goto :hexo_gen
if "%choice%"=="2" goto :hexo_serve
goto :menu
:hexo_gen
set "H=D:\xxx"
if "%H%"=="" (echo [错误] 未配置 hexo_root && pause && goto :menu)
cd /d "%H%"
call hexo clean
call hexo g
echo Hexo 生成完成!按任意键返回菜单...
pause >nul
goto :menu
:hexo_serve
set "H=D:\xxx"
if "%H%"=="" (echo [错误] 未配置 hexo_root && pause && goto :menu)
cd /d "%H%"
call hexo clean
call hexo g
echo [信息] Hexo server 将在新窗口启动...
start "Hexo Server" cmd /c "cd /d %H% && hexo s"
echo 按任意键返回菜单...
pause >nul
goto :menu
:hexo
if exist "%~dp0hexo.bat" (call "%~dp0hexo.bat") else (echo [错误] 找不到 hexo.bat && pause)
goto :menu
:end
endlocal
4. 依赖自动管理
install_dependencies.py 扫描 Scripts 下所有 .py 的 import 语句,自动安装缺失的第三方依赖。换了电脑运行 install_deps.bat 即可。
install_deps.bat 菜单:
@echo off
setlocal
:menu
cls
echo.
echo ============================================================
echo Python 依赖自动安装工具
echo 自动扫描并安装 Scripts/ 下脚本需要的第三方库
echo ============================================================
echo.
echo [1] 检查并安装所有缺失依赖
echo [2] 仅检查,不安装 (dry-run)
echo [3] 安装并生成 requirements.txt
echo [4] 仅检查并生成 requirements.txt
echo [0] 退出
echo.
set "choice="
set /p choice="请选择编号 (0-4): "
if "%choice%"=="1" goto :install
if "%choice%"=="2" goto :dryrun
if "%choice%"=="3" goto :install_req
if "%choice%"=="4" goto :dryrun_req
if "%choice%"=="0" goto :end
echo 无效选项
timeout /t 1 >nul
goto :menu
:install
python "%~dp0Scripts\install_dependencies.py"
echo.
echo 按任意键返回菜单...
pause >nul
goto :menu
:dryrun
python "%~dp0Scripts\install_dependencies.py" --dry-run
echo.
echo 按任意键返回菜单...
pause >nul
goto :menu
:install_req
python "%~dp0Scripts\install_dependencies.py" --generate-requirements
echo.
echo 按任意键返回菜单...
pause >nul
goto :menu
:dryrun_req
python "%~dp0Scripts\install_dependencies.py" --dry-run --generate-requirements
echo.
echo 按任意键返回菜单...
pause >nul
goto :menu
:end
endlocal
install_dependencies.py 的主要代码:
#!/usr/bin/env python3
"""
install_dependencies.py
自动扫描 Scripts/ 目录下所有 .py 文件的 import 语句,识别第三方依赖,
检查是否已安装,并自动安装缺失的包。
用法:
python install_dependencies.py
python install_dependencies.py --dry-run # 仅检查,不安装
python install_dependencies.py -r # 生成 requirements.txt
python install_dependencies.py --pip <pip-path> # 指定 pip 路径
"""
import os
import re
import sys
import ast
import subprocess
import importlib.util
from pathlib import Path
# ---------- 跨平台 ANSI 颜色 ----------
if sys.platform == "win32":
import ctypes as _ctypes
_kernel32 = _ctypes.windll.kernel32
_h = _kernel32.GetStdHandle(-11)
_m = _ctypes.c_uint32()
if _kernel32.GetConsoleMode(_h, _ctypes.byref(_m)):
_kernel32.SetConsoleMode(_h, _m.value | 0x0004)
GREEN = "\033[32m"
RED = "\033[31m"
YELLOW = "\033[33m"
CYAN = "\033[36m"
WHITE = "\033[1;37m"
RESET = "\033[0m"
C_GREEN = GREEN
C_RED = RED
C_YELLOW = YELLOW
C_CYAN = CYAN
C_RESET = RESET
def log_prefix(level, msg, color=""):
text = f"[{level}] {msg}"
if color:
text = f"{color}{text}{RESET}"
print(text)
def colored_print(text, color=""):
if color:
print(f"{color}{text}{RESET}")
else:
print(text)
# ---------------------------------------------------------------------------
# import 名 -> pip 包名 映射表(有些包的 import 名和 pip install 名不同)
# ---------------------------------------------------------------------------
IMPORT_TO_PIP: dict[str, str] = {
"yaml": "PyYAML",
"PIL": "Pillow",
"cv2": "opencv-python",
"bs4": "beautifulsoup4",
"sklearn": "scikit-learn",
"dotenv": "python-dotenv",
"dateutil": "python-dateutil",
"MySQLdb": "mysqlclient",
"pymysql": "PyMySQL",
}
def get_script_dir() -> Path:
return Path(__file__).resolve().parent
def find_python_files(scan_dir: Path) -> list[Path]:
"""递归扫描 scan_dir 下所有 .py 文件"""
py_files = sorted(scan_dir.rglob("*.py"))
for f in py_files:
print(f" -> {f.relative_to(scan_dir)}")
return py_files
def extract_imports(py_file: Path) -> set[str]:
"""从 .py 文件中提取所有顶层包名,AST 解析 + 正则回退"""
imports: set[str] = set()
try:
source = py_file.read_text(encoding="utf-8")
except Exception as e:
print(f"[警告] 读取 {py_file.name} 失败: {e}")
return imports
# 策略 A: AST 解析
try:
tree = ast.parse(source)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.add(alias.name.split(".")[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
imports.add(node.module.split(".")[0])
return imports
except SyntaxError:
# 策略 B: 正则回退(处理非标准语法)
print(f" AST 解析失败,使用正则回退")
pattern = re.compile(
r"^\s*(?:from\s+([a-zA-Z_]\w*)|import\s+([a-zA-Z_]\w*))",
re.MULTILINE,
)
for match in pattern.finditer(source):
mod = match.group(1) or match.group(2)
if mod:
imports.add(mod)
return imports
def get_stdlib_names() -> set[str]:
"""获取当前 Python 版本的标准库模块名列表"""
if hasattr(sys, "stdlib_module_names"):
return set(sys.stdlib_module_names)
# 旧版本回退:静态标准库列表
return {"abc", "ast", "collections", "datetime", "json",
"os", "pathlib", "re", "shutil", "subprocess", "sys",
"threading", "time", "typing", "unittest", "urllib",
"xml", "zipfile", "zlib", }
def is_installed(import_name: str, verbose: bool = True) -> bool:
"""使用 importlib.util.find_spec 检查模块是否可导入"""
spec = importlib.util.find_spec(import_name)
installed = spec is not None
if verbose:
status = "[OK] 已安装" if installed else "[X] 未安装"
print(f" {import_name}: {status}")
return installed
def get_package_version(import_name: str) -> str | None:
"""尝试获取已安装包的版本号"""
pip_name = IMPORT_TO_PIP.get(import_name, import_name)
for name in (pip_name, import_name):
try:
from importlib.metadata import version
return version(name)
except Exception:
pass
return None
def install_package(pip_name: str, pip_path: str | None = None) -> bool:
"""使用 pip 安装指定的包"""
if pip_path:
cmd = [pip_path, "install", pip_name]
else:
cmd = [sys.executable, "-m", "pip", "install", pip_name]
print(f"执行: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f" {pip_name} 安装成功 [OK]")
return True
else:
print(f" {pip_name} 安装失败 [X]")
return False
except Exception as e:
print(f" 调用 pip 异常: {e}")
return False
def main() -> None:
dry_run = "--dry-run" in sys.argv or "-n" in sys.argv
gen_requirements = "--generate-requirements" in sys.argv or "-r" in sys.argv
script_dir = get_script_dir()
py_files = find_python_files(script_dir)
# 排除自身
self_name = Path(__file__).name
py_files = [f for f in py_files if f.name != self_name]
# 提取所有 import
all_imports: set[str] = set()
for py_file in py_files:
all_imports.update(extract_imports(py_file))
# 过滤标准库
stdlib = get_stdlib_names()
third_party = {name for name in all_imports if name not in stdlib}
if not third_party:
print("未检测到第三方依赖,无需安装")
return
# 检查并安装
missing = []
for import_name in sorted(third_party):
pip_name = IMPORT_TO_PIP.get(import_name, import_name)
if not is_installed(import_name, verbose=False):
if not is_installed(pip_name, verbose=False):
missing.append((import_name, pip_name))
if not missing:
print("所有依赖均已就绪 [OK]")
return
if dry_run:
print(f"DRY RUN — 跳过安装,缺失: {', '.join(p for _, p in missing)}")
return
for import_name, pip_name in missing:
print(f"正在安装 [{pip_name}] ...")
if install_package(pip_name):
if is_installed(import_name, verbose=False):
ver = get_package_version(import_name)
print(f" [{pip_name}] 验证通过 ({ver or 'unknown'}) [OK]")
else:
print(f" [{pip_name}] 安装完成但 import 仍失败 [X]")
else:
print(f" [{pip_name}] 安装失败 [X]")
if __name__ == "__main__":
main()
关键设计:
- 零外部依赖自举:自身只用标准库,即使在全新 Python 环境中也能直接运行
- AST 解析而非正则:精确提取 import,不会被注释中的
import xxx干扰 - 包名映射表:自动纠正
yaml→PyYAML等 import 名与 pip 名的差异 - 安装后验证:
pip install完成后用importlib.util.find_spec验证是否真的可导入
5. 配置文件统一管理
所有路径和选项收进一个 Scripts/config.ini,INI 格式,按 脚本名.分类 起 section 名:
[selective_publish.paths]
# 笔记源目录,存放所有 md 文章(将递归扫描)
source_dir = D:\xxx
# Hexo 文章目标目录,通常为 Hexo 项目的 source/_posts
target_dir = D:\xxx\source\_posts
# 资源(如图片)的目标目录(仅当 copy_assets = true 时使用)
assets_target_dir =
# Hexo 项目根目录(用于 hexo clean / generate / server)
hexo_root = D:\xxx
[selective_publish.options]
# 是否递归扫描 source_dir 下的子目录
recursive = true
# 目标文件已存在时是否直接覆盖
overwrite = true
# 是否同时复制与文章同名的资源文件夹
copy_assets = false
# 资源文件夹的后缀名(如 .assets)
asset_folder_suffix =
# 是否输出详细的操作日志
verbose = false
路径不硬编码,换目录或换机器只改一个地方即可。
简化了哪些操作
| 操作 | 原来 | 现在 |
|---|---|---|
| 挑选待发布文章 | 手动复制到 _posts,凭记忆判断是否已发布 |
front-matter 中设 publish: true,脚本自动筛选 |
| 复制文章 | 手动操作 | selective_publish.py 批量处理,未修改自动跳过 |
| 构建站点 | 敲 hexo clean && hexo g |
hexo.bat 菜单按 1 |
| 本地预览 | 敲 hexo s |
hexo.bat 菜单按 3,或发布后直接联动 |
| 端到端发布 | 至少 3 步独立操作 | 运行 publish.bat,按两次数字 |
| 安装依赖 | 手动 pip install,逐个排查 |
运行 install_deps.bat,全自动 |
脚本做了三件事:判断该发什么(读 front-matter)、下一步该干什么(菜单引导)、缺什么(自动安装)。



