> 原文链接:Event Hooks
sidebar_position: 6 title: "Event Hooks" description: "Run custom code at key lifecycle points — log activity, send alerts, post to webhooks"
Event Hooks(事件钩子)
Hermes 有两个钩子系统,可在关键生命周期节点运行自定义代码:
| 系统 | 注册方式 | 运行环境 | 用途 |
|---|---|---|---|
| Gateway 钩子 | ~/.hermes/hooks/ 中的 HOOK.yaml + handler.py | 仅 Gateway | 日志、告警、Webhook |
| 插件钩子 | 插件中的 ctx.register_hook() | CLI + Gateway | 工具拦截、指标采集、护栏 |
两个系统都是非阻塞的——任何钩子中的错误都会被捕获并记录日志,永远不会导致 Agent 崩溃。
Gateway Event Hooks
Gateway 钩子在 Gateway 运行期间(Telegram、Discord、Slack、WhatsApp)自动触发,不会阻塞主 Agent 管道。
创建钩子
每个钩子是 ~/.hermes/hooks/ 下的一个目录,包含两个文件:
~/.hermes/hooks/
└── my-hook/
├── HOOK.yaml # 声明要监听哪些事件
└── handler.py # Python 处理函数
HOOK.yaml
name: my-hook
description: 将所有 Agent 活动记录到文件
events:
- agent:start
- agent:end
- agent:step
events 列表决定了哪些事件会触发你的处理函数。你可以订阅任意事件组合,包括 command:* 这样的通配符。
handler.py
import json
from datetime import datetime
from pathlib import Path
LOG_FILE = Path.home() / ".hermes" / "hooks" / "my-hook" / "activity.log"
async def handle(event_type: str, context: dict):
"""为每个订阅事件调用。必须命名为 'handle'。"""
entry = {
"timestamp": datetime.now().isoformat(),
"event": event_type,
**context,
}
with open(LOG_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
处理函数规则:
- 必须命名为
handle - 接收
event_type(字符串)和context(字典) - 可以是
async def或普通def— 两者都支持 - 错误会被捕获并记录日志,永远不会导致 Agent 崩溃
可用事件
| 事件 | 触发时机 | 上下文键 |
|---|---|---|
gateway:startup | Gateway 进程启动 | platforms(活跃平台名称列表) |
session:start | 新的消息会话创建 | platform, user_id, session_id, session_key |
session:end | 会话结束(重置之前) | platform, user_id, session_key |
session:reset | 用户执行 /new 或 /reset | platform, user_id, session_key |
agent:start | Agent 开始处理消息 | platform, user_id, session_id, message |
agent:step | 工具调用循环的每次迭代 | platform, user_id, session_id, iteration, tool_names |
agent:end | Agent 完成处理 | platform, user_id, session_id, message, response |
command:* | 执行任何斜杠命令 | platform, user_id, command, args |
通配符匹配
为 command:* 注册的处理函数会在任何 command: 事件(command:model、command:reset 等)触发时执行。通过单次订阅即可监控所有斜杠命令。
示例
启动检查清单 (BOOT.md) — 内置
Gateway 附带一个内置的 boot-md 钩子,每次启动时会查找 ~/.hermes/BOOT.md。如果文件存在,Agent 会在后台会话中运行其中的指令。无需安装——只需创建文件即可。
创建 ~/.hermes/BOOT.md:
# 启动检查清单
1. 检查是否有定时任务在夜间失败 — 运行 `hermes cron list`
2. 向 Discord #general 发送消息:"Gateway restarted, all systems go"
3. 检查 /opt/app/deploy.log 中过去 24 小时是否有错误
Agent 会在后台线程中运行这些指令,因此不会阻塞 Gateway 启动。如果没有任何需要关注的事项,Agent 会回复 [SILENT],不发送任何消息。
:::tip 提示 没有 BOOT.md?钩子会静默跳过——零开销。需要启动自动化时创建文件,不需要时删除即可。 :::
长时间任务 Telegram 告警
当 Agent 执行超过 10 步时发送消息通知你:
# ~/.hermes/hooks/long-task-alert/HOOK.yaml
name: long-task-alert
description: 当 Agent 执行步骤过多时告警
events:
- agent:step
# ~/.hermes/hooks/long-task-alert/handler.py
import os
import httpx
THRESHOLD = 10
BOT_TOKEN=os.getenv("TELEGRAM_BOT_TOKEN")
CHAT_ID = os.getenv("TELEGRAM_HOME_CHANNEL")
async def handle(event_type: str, context: dict):
iteration = context.get("iteration", 0)
if iteration == THRESHOLD and BOT_TOKEN and CHAT_ID:
tools = ", ".join(context.get("tool_names", []))
text = f"⚠️ Agent has been running for {iteration} steps. Last tools: {tools}"
async with httpx.AsyncClient() as client:
await client.post(
f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage",
json={"chat_id": CHAT_ID, "text": text},
)
命令使用日志记录器
追踪使用了哪些斜杠命令:
# ~/.hermes/hooks/command-logger/HOOK.yaml
name: command-logger
description: 记录斜杠命令使用情况
events:
- command:*
# ~/.hermes/hooks/command-logger/handler.py
import json
from datetime import datetime
from pathlib import Path
LOG = Path.home() / ".hermes" / "logs" / "command_usage.jsonl"
def handle(event_type: str, context: dict):
LOG.parent.mkdir(parents=True, exist_ok=True)
entry = {
"ts": datetime.now().isoformat(),
"command": context.get("command"),
"args": context.get("args"),
"platform": context.get("platform"),
"user": context.get("user_id"),
}
with open(LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
会话启动 Webhook
在新会话创建时 POST 到外部服务:
# ~/.hermes/hooks/session-webhook/HOOK.yaml
name: session-webhook
description: 新会话时通知外部服务
events:
- session:start
- session:reset
# ~/.hermes/hooks/session-webhook/handler.py
import httpx
WEBHOOK_URL = "https://your-service.example.com/hermes-events"
async def handle(event_type: str, context: dict):
async with httpx.AsyncClient() as client:
await client.post(WEBHOOK_URL, json={
"event": event_type,
**context,
}, timeout=5)
工作原理
- Gateway 启动时,
HookRegistry.discover_and_load()扫描~/.hermes/hooks/ - 每个包含
HOOK.yaml+handler.py的子目录会被动态加载 - 处理函数为其声明的事件注册
- 在每个生命周期节点,
hooks.emit()触发所有匹配的处理函数 - 任何处理函数中的错误都会被捕获并记录日志——损坏的钩子永远不会导致 Agent 崩溃
:::info 信息 Gateway 钩子只在 Gateway(Telegram、Discord、Slack、WhatsApp)中触发。CLI 不会加载 Gateway 钩子。要在所有环境中都能工作的钩子,请使用插件钩子。 :::
Plugin Hooks
插件可以注册在 CLI 和 Gateway 会话中都触发的钩子。这些钩子通过插件 register() 函数中的 ctx.register_hook() 以编程方式注册。
def register(ctx):
ctx.register_hook("pre_tool_call", my_tool_observer)
ctx.register_hook("post_tool_call", my_tool_logger)
ctx.register_hook("pre_llm_call", my_memory_callback)
ctx.register_hook("post_llm_call", my_sync_callback)
ctx.register_hook("on_session_start", my_init_callback)
ctx.register_hook("on_session_end", my_cleanup_callback)
所有钩子的通用规则:
- 回调接收关键字参数。始终接受
**kwargs以保持向前兼容性——未来版本可能添加新参数而不会破坏你的插件。 - 如果回调崩溃,会被记录日志并跳过。其他钩子和 Agent 正常继续运行。行为不端的插件永远不会破坏 Agent。
- 所有钩子都是即发即弃的观察者,其返回值被忽略——除了
pre_llm_call,它可以注入上下文。
快速参考
| 钩子 | 触发时机 | 返回值 |
|---|---|---|
pre_tool_call | 任何工具执行之前 | 被忽略 |
post_tool_call | 任何工具返回之后 | 被忽略 |
pre_llm_call | 每轮一次,工具调用循环之前 | 上下文注入 |
post_llm_call | 每轮一次,工具调用循环之后 | 被忽略 |
on_session_start | 新会话创建(仅首轮) | 被忽略 |
on_session_end | 会话结束 | 被忽略 |
pre_tool_call
在每个工具执行之前立即触发——内置工具和插件工具都包括。
回调签名:
def my_callback(tool_name: str, args: dict, task_id: str, **kwargs):
| 参数 | 类型 | 描述 |
|---|---|---|
tool_name | str | 即将执行的工具名称(如 "terminal"、"web_search"、"read_file") |
args | dict | 模型传递给工具的参数 |
task_id | str | 会话/任务标识符。未设置时为空字符串。 |
触发位置: 在 model_tools.py 的 handle_function_call() 中,工具处理函数运行之前。每次工具调用触发一次——如果模型并行调用 3 个工具,则触发 3 次。
返回值: 被忽略。
用例: 日志记录、审计追踪、工具调用计数、阻止危险操作(打印警告)、速率限制。
示例 — 工具调用审计日志:
import json, logging
from datetime import datetime
logger = logging.getLogger(__name__)
def audit_tool_call(tool_name, args, task_id, **kwargs):
logger.info("TOOL_CALL session=%s tool=%s args=%s",
task_id, tool_name, json.dumps(args)[:200])
def register(ctx):
ctx.register_hook("pre_tool_call", audit_tool_call)
示例 — 危险工具警告:
DANGEROUS = {"terminal", "write_file", "patch"}
def warn_dangerous(tool_name, **kwargs):
if tool_name in DANGEROUS:
print(f"⚠ Executing potentially dangerous tool: {tool_name}")
def register(ctx):
ctx.register_hook("pre_tool_call", warn_dangerous)
post_tool_call
在每个工具执行返回之后立即触发。
回调签名:
def my_callback(tool_name: str, args: dict, result: str, task_id: str, **kwargs):
| 参数 | 类型 | 描述 |
|---|---|---|
tool_name | str | 刚刚执行的工具名称 |
args | dict | 模型传递给工具的参数 |
result | str | 工具的返回值(始终为 JSON 字符串) |
task_id | str | 会话/任务标识符。未设置时为空字符串。 |
触发位置: 在 model_tools.py 的 handle_function_call() 中,工具处理函数返回之后。每次工具调用触发一次。如果工具抛出未处理的异常则不会触发(错误会被捕获并作为错误 JSON 字符串返回,此时 post_tool_call 会以该错误字符串作为 result 触发)。
返回值: 被忽略。
用例: 记录工具结果、指标采集、追踪工具成功/失败率、特定工具完成时发送通知。
示例 — 追踪工具使用指标:
from collections import Counter
import json
_tool_counts = Counter()
_error_counts = Counter()
def track_metrics(tool_name, result, **kwargs):
_tool_counts[tool_name] += 1
try:
parsed = json.loads(result)
if "error" in parsed:
_error_counts[tool_name] += 1
except (json.JSONDecodeError, TypeError):
pass
def register(ctx):
ctx.register_hook("post_tool_call", track_metrics)
pre_llm_call
每轮触发一次,在工具调用循环开始之前。这是唯一一个返回值会被使用的钩子——它可以向当前轮次的用户消息注入上下文。
回调签名:
def my_callback(session_id: str, user_message: str, conversation_history: list,
is_first_turn: bool, model: str, platform: str, **kwargs):
| 参数 | 类型 | 描述 |
|---|---|---|
session_id | str | 当前会话的唯一标识符 |
user_message | str | 当前轮次用户的原始消息(在任何技能注入之前) |
conversation_history | list | 完整消息列表的副本(OpenAI 格式:[{"role": "user", "content": "..."}]) |
is_first_turn | bool | 如果是新会话的第一轮则为 True,后续轮次为 False |
model | str | 模型标识符(如 "anthropic/claude-sonnet-4.6") |
platform | str | 会话运行的位置:"cli"、"telegram"、"discord" 等 |
触发位置: 在 run_agent.py 的 run_conversation() 中,上下文压缩之后、主 while 循环之前。每次 run_conversation() 调用触发一次(即每次用户轮次一次),而不是工具循环中每次 API 调用触发一次。
返回值: 如果回调返回一个包含 "context" 键的字典,或一个非空纯字符串,文本会被附加到当前轮次的用户消息。返回 None 表示不注入。
# 注入上下文
return {"context": "Recalled memories:\n- User likes Python\n- Working on hermes-agent"}
# 纯字符串(等效)
return "Recalled memories:\n- User likes Python"
# 不注入
return None
上下文注入位置: 始终注入到用户消息中,而不是系统提示词。这保留了提示词缓存——系统提示词在各轮次间保持不变,因此缓存的 Token 会被复用。系统提示词是 Hermes 的领地(模型指导、工具强制、人格、技能)。插件与用户输入并列贡献上下文。
所有注入的上下文都是临时的——仅在 API 调用时添加。对话历史中的原始用户消息永远不会被修改,也不会有任何内容被持久化到会话数据库。
当多个插件返回上下文时,它们的输出按插件发现顺序(按目录名字母排序)以双换行符连接。
用例: 记忆召回、RAG 上下文注入、护栏、每轮分析。
示例 — 记忆召回:
import httpx
MEMORY_API = "https://your-memory-api.example.com"
def recall(session_id, user_message, is_first_turn, **kwargs):
try:
resp = httpx.post(f"{MEMORY_API}/recall", json={
"session_id": session_id,
"query": user_message,
}, timeout=3)
memories = resp.json().get("results", [])
if not memories:
return None
text = "Recalled context:\n" + "\n".join(f"- {m['text']}" for m in memories)
return {"context": text}
except Exception:
return None
def register(ctx):
ctx.register_hook("pre_llm_call", recall)
示例 — 护栏:
POLICY = "Never execute commands that delete files without explicit user confirmation."
def guardrails(**kwargs):
return {"context": POLICY}
def register(ctx):
ctx.register_hook("pre_llm_call", guardrails)
post_llm_call
每轮触发一次,在工具调用循环完成且 Agent 已生成最终响应之后。仅在成功轮次触发——轮次被中断时不触发。
回调签名:
def my_callback(session_id: str, user_message: str, assistant_response: str,
conversation_history: list, model: str, platform: str, **kwargs):
| 参数 | 类型 | 描述 |
|---|---|---|
session_id | str | 当前会话的唯一标识符 |
user_message | str | 当前轮次用户的原始消息 |
assistant_response | str | Agent 当前轮次的最终文本响应 |
conversation_history | list | 轮次完成后的完整消息列表副本 |
model | str | 模型标识符 |
platform | str | 会话运行的位置 |
触发位置: 在 run_agent.py 的 run_conversation() 中,工具循环以最终响应退出之后。受 if final_response and not interrupted 保护——因此在用户中断轮次或 Agent 达到迭代上限但未生成响应时不会触发。
返回值: 被忽略。
用例: 将对话数据同步到外部记忆系统、计算响应质量指标、记录轮次摘要、触发后续操作。
示例 — 同步到外部记忆:
import httpx
MEMORY_API = "https://your-memory-api.example.com"
def sync_memory(session_id, user_message, assistant_response, **kwargs):
try:
httpx.post(f"{MEMORY_API}/store", json={
"session_id": session_id,
"user": user_message,
"assistant": assistant_response,
}, timeout=5)
except Exception:
pass # best-effort(尽力而为)
def register(ctx):
ctx.register_hook("post_llm_call", sync_memory)
示例 — 追踪响应长度:
import logging
logger = logging.getLogger(__name__)
def log_response_length(session_id, assistant_response, model, **kwargs):
logger.info("RESPONSE session=%s model=%s chars=%d",
session_id, model, len(assistant_response or ""))
def register(ctx):
ctx.register_hook("post_llm_call", log_response_length)
on_session_start
在全新会话创建时触发一次。在会话继续时(用户在现有会话中发送第二条消息)不会触发。
回调签名:
def my_callback(session_id: str, model: str, platform: str, **kwargs):
| 参数 | 类型 | 描述 |
|---|---|---|
session_id | str | 新会话的唯一标识符 |
model | str | 模型标识符 |
platform | str | 会话运行的位置 |
触发位置: 在 run_agent.py 的 run_conversation() 中,在新会话的第一轮——具体来说是在系统提示词构建之后、工具循环开始之前。检查条件是 if not conversation_history(没有先前消息 = 新会话)。
返回值: 被忽略。
用例: 初始化会话级状态、预热缓存、向外部服务注册会话、记录会话启动。
示例 — 初始化会话缓存:
_session_caches = {}
def init_session(session_id, model, platform, **kwargs):
_session_caches[session_id] = {
"model": model,
"platform": platform,
"tool_calls": 0,
"started": __import__("datetime").datetime.now().isoformat(),
}
def register(ctx):
ctx.register_hook("on_session_start", init_session)
on_session_end
在每次 run_conversation() 调用的最末尾触发,无论结果如何。如果用户退出时 Agent 正在处理中,也会从 CLI 的退出处理器触发。
回调签名:
def my_callback(session_id: str, completed: bool, interrupted: bool,
model: str, platform: str, **kwargs):
| 参数 | 类型 | 描述 |
|---|---|---|
session_id | str | 会话的唯一标识符 |
completed | bool | 如果 Agent 生成了最终响应则为 True,否则为 False |
interrupted | bool | 如果轮次被中断则为 True(用户发送新消息、/stop 或退出) |
model | str | 模型标识符 |
platform | str | 会话运行的位置 |
触发位置: 在两个地方:
run_agent.py— 在每次run_conversation()调用结束时,所有清理之后。始终触发,即使轮次出错。cli.py— 在 CLI 的 atexit 处理器中,但仅当退出时 Agent 正在处理中(_agent_running=True)。这会捕获处理过程中的 Ctrl+C 和/exit。在这种情况下,completed=False且interrupted=True。
返回值: 被忽略。
用例: 刷新缓冲区、关闭连接、持久化会话状态、记录会话持续时间、清理 on_session_start 中初始化的资源。
示例 — 刷新和清理:
_session_caches = {}
def cleanup_session(session_id, completed, interrupted, **kwargs):
cache = _session_caches.pop(session_id, None)
if cache:
# 将累积的数据刷新到磁盘或外部服务
status = "completed" if completed else ("interrupted" if interrupted else "failed")
print(f"Session {session_id} ended: {status}, {cache['tool_calls']} tool calls")
def register(ctx):
ctx.register_hook("on_session_end", cleanup_session)
示例 — 会话持续时间追踪:
import time, logging
logger = logging.getLogger(__name__)
_start_times = {}
def on_start(session_id, **kwargs):
_start_times[session_id] = time.time()
def on_end(session_id, completed, interrupted, **kwargs):
start = _start_times.pop(session_id, None)
if start:
duration = time.time() - start
logger.info("SESSION_DURATION session=%s seconds=%.1f completed=%s interrupted=%s",
session_id, duration, completed, interrupted)
def register(ctx):
ctx.register_hook("on_session_start", on_start)
ctx.register_hook("on_session_end", on_end)
完整演练请参见 构建插件指南,包括工具 Schema、处理函数和高级钩子模式。