数据同步机制.md 15 KB

数据同步机制

本文引用的文件

  • src/index.ts
  • src/client/cosmoe.ts
  • src/command/index.ts
  • src/scheduler/index.ts
  • src/command/handlers/login.ts
  • src/command/handlers/events.ts
  • src/command/handlers/eventDetails.ts
  • src/command/handlers/history.ts
  • src/command/handlers/bookEvent.ts
  • src/command/handlers/cancel.ts
  • wrangler.jsonc
  • worker-configuration.d.ts

目录

  1. 引言
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖关系分析
  7. 性能考虑
  8. 故障排查指南
  9. 结论
  10. 附录

引言

本文件系统性阐述 Cosmoe 服务端 API 的数据同步机制,覆盖以下方面:

  • 实时数据获取:通过定时任务与用户交互触发的 API 调用,实现事件与预约等数据的拉取与推送。
  • 增量更新:基于“最新事件 ID”的简单增量策略,仅推送新增活动。
  • 批量同步:在定时任务中遍历所有已注册用户并批量发送通知。
  • 冲突解决:通过 KV 存储的原子写入与幂等操作,避免重复推送与状态不一致。
  • 缓存策略:KV 的 TTL 与列表接口配合,实现轻量级缓存与失效;结合业务层的“最新事件 ID”实现增量。
  • 性能优化:并发控制(逐用户串行发送)、重试机制(外层调度器重试)、超时与错误恢复。
  • 监控与日志:Cloudflare Worker 观测性与控制台日志;异常回滚与修复建议。

项目结构

该项目为 Cloudflare Workers 应用,采用模块化组织:

  • 入口与调度:入口文件负责 Webhook 与计划任务;计划任务用于增量事件通知。
  • 客户端封装:统一的 Cosmoe API 客户端,封装认证、查询与变更操作。
  • 命令与会话:基于 grammY 的命令路由与 KV 会话存储。
  • KV 命名空间:分别存储用户凭证与应用状态(如最新事件 ID)。

    graph TB
    A["入口: src/index.ts"] --> B["命令系统: src/command/index.ts"]
    A --> C["计划任务: src/scheduler/index.ts"]
    B --> D["客户端: src/client/cosmoe.ts"]
    C --> D
    B --> E["KV 会话存储<br/>COSMOE_STORAGE"]
    A --> F["KV 凭证存储<br/>COSMOE_CREDENTIALS"]
    C --> F
    

图表来源

  • src/index.ts
  • src/command/index.ts
  • src/scheduler/index.ts
  • src/client/cosmoe.ts

章节来源

  • src/index.ts
  • src/command/index.ts
  • src/scheduler/index.ts
  • src/client/cosmoe.ts
  • wrangler.jsonc

核心组件

  • CosmoeClient:封装认证、事件查询、个人资料、预约历史、优惠券、预约、改签、转让、取消等 API。
  • 命令系统:基于 grammY 的命令与回调处理,使用 KV 适配器持久化会话。
  • 计划任务:每分钟触发一次,对比“最新事件 ID”,向所有已注册用户推送新增活动。
  • KV 存储:COSMOE_CREDENTIALS 存放用户凭证;COSMOE_STORAGE 存放应用状态(如 latestEventId)。

章节来源

  • src/client/cosmoe.ts
  • src/command/index.ts
  • src/scheduler/index.ts
  • wrangler.jsonc

架构总览

下图展示从用户交互到服务端 API 的调用链路,以及计划任务的增量推送流程。

sequenceDiagram
participant U as "用户"
participant Bot as "Telegram Bot"
participant Cmd as "命令处理器"
participant KV as "KV 存储"
participant API as "Cosmoe API"
participant Sch as "计划任务"
U->>Bot : "/events" 或 "/event_<id>"
Bot->>Cmd : 路由到对应处理器
Cmd->>API : 查询事件/详情/历史
API-->>Cmd : 返回数据
Cmd-->>U : 发送消息
U->>Bot : "/login" 进入会话
Bot->>Cmd : 进入登录对话
Cmd->>API : 获取令牌
API-->>Cmd : 返回令牌
Cmd->>KV : 写入凭证
Sch->>API : 拉取全部事件
API-->>Sch : 返回事件列表
Sch->>KV : 读取 latestEventId
Sch->>Sch : 过滤新增事件
Sch->>Bot : 向所有已注册用户发送通知
Bot-->>U : 推送新活动
Sch->>KV : 更新 latestEventId

图表来源

  • src/command/handlers/events.ts
  • src/command/handlers/eventDetails.ts
  • src/command/handlers/login.ts
  • src/command/handlers/history.ts
  • src/scheduler/index.ts
  • src/client/cosmoe.ts

详细组件分析

CosmoeClient 数据同步策略

  • 实时数据获取
    • 事件列表与详情:通过 GET 请求获取,返回结构体包含事件元数据与时间槽。
    • 个人资料与历史:需携带认证参数,返回用户统计与预约历史。
  • 增量更新
    • 计划任务通过比较“最新事件 ID”实现增量推送;若无新增则不推送。
  • 批量同步

    • 计划任务遍历 COSMOE_CREDENTIALS 中的所有键,向每个已注册用户发送通知。

      flowchart TD
      Start(["开始"]) --> Pull["拉取全部事件"]
      Pull --> Compare["与 latestEventId 比较"]
      Compare --> HasNew{"有新增事件?"}
      HasNew --> |否| End(["结束"])
      HasNew --> |是| Iterate["遍历所有已注册用户"]
      Iterate --> Notify["逐个发送通知"]
      Notify --> Update["更新 latestEventId"]
      Update --> End
      

图表来源

  • src/scheduler/index.ts

章节来源

  • src/client/cosmoe.ts
  • src/scheduler/index.ts

认证与凭证管理

  • 登录流程:交互式会话收集用户名与密码,调用 getToken 获取 user_id 与 token,并写入 KV。
  • 凭证读取:后续命令在 KV 中读取凭证,设置到 CosmoeClient,再发起需要认证的 API 调用。
  • 幂等性:KV 写入按用户 ID 键覆盖,避免重复登录导致的数据不一致。

    sequenceDiagram
    participant U as "用户"
    participant Conv as "登录会话"
    participant API as "Cosmoe API"
    participant KV as "COSMOE_CREDENTIALS"
    U->>Conv : 输入用户名/密码
    Conv->>API : getToken
    API-->>Conv : 返回 user_id/token
    Conv->>KV : put(telegramUserId, 凭证JSON)
    Conv-->>U : 登录成功
    

图表来源

  • src/command/handlers/login.ts

章节来源

  • src/command/handlers/login.ts

事件与预约流程

  • 事件列表:限制返回最近若干条,便于快速浏览。
  • 事件详情:排序时间槽,生成可预约链接;根据活动日期判断是否可预约。
  • 预约流程:支持优惠券选择与自动下单;失败时提示错误。
  • 取消流程:二次确认键盘,确认后调用取消接口。

    sequenceDiagram
    participant U as "用户"
    participant Bot as "Bot"
    participant Cmd as "命令处理器"
    participant API as "Cosmoe API"
    U->>Bot : "/event_<id>"
    Bot->>Cmd : 解析事件ID
    Cmd->>API : getEventDetail
    API-->>Cmd : 返回事件详情
    Cmd-->>U : 展示时间槽与预约链接
    U->>Bot : "/book_<event>_<slot>"
    Bot->>Cmd : 解析事件ID与槽位
    Cmd->>API : getAvailableCoupons
    API-->>Cmd : 返回可用优惠券
    Cmd->>API : bookEvent
    API-->>Cmd : 返回结果
    Cmd-->>U : 成功/失败消息
    

图表来源

  • src/command/handlers/eventDetails.ts
  • src/command/handlers/bookEvent.ts

章节来源

  • src/command/handlers/events.ts
  • src/command/handlers/eventDetails.ts
  • src/command/handlers/bookEvent.ts
  • src/command/handlers/history.ts
  • src/command/handlers/cancel.ts

计划任务与增量推送

  • 触发频率:每分钟一次(cron 表达式)。
  • 增量逻辑:读取 COSMOE_STORAGE 中的 latestEventId,过滤大于该值的事件,逐用户推送。
  • 幂等与一致性:KV 写入保证 latestEventId 原子更新,避免重复推送。

    flowchart TD
    Tick["计划任务触发"] --> Load["读取 latestEventId"]
    Load --> Fetch["获取全部事件"]
    Fetch --> Filter["筛选新增事件"]
    Filter --> Empty{"有新增?"}
    Empty --> |否| Done["结束"]
    Empty --> |是| ForEach["遍历已注册用户"]
    ForEach --> Send["发送通知"]
    Send --> Update["更新 latestEventId"]
    Update --> Done
    

图表来源

  • src/scheduler/index.ts
  • wrangler.jsonc

章节来源

  • src/scheduler/index.ts
  • wrangler.jsonc

冲突解决机制

  • 数据一致性检查
    • 计划任务在推送前比较事件 ID 与存储值,仅推送新增项,避免重复。
    • 登录成功后写入 KV,后续读取凭证进行认证,减少凭据丢失风险。
  • 冲突检测与解决
    • KV 写入按用户键覆盖,天然具备幂等性,避免并发写入导致的覆盖问题。
    • 事件推送采用“读-比对-写”的顺序,确保 latestEventId 单调递增。
  • 解决方案
    • 若出现重复推送,可通过重置 latestEventId 或删除重复消息进行修复。
    • 对于并发写入,建议在 KV 层面增加版本号或使用带条件的写入(如 CAS)以进一步增强一致性。

章节来源

  • src/scheduler/index.ts
  • src/command/handlers/login.ts

缓存策略

  • 本地缓存
    • 客户端未实现本地缓存;KV 作为应用状态与会话存储,承担轻量缓存职责。
  • 缓存失效
    • 计划任务通过 latestEventId 控制失效窗口;用户侧未见显式 TTL 策略。
  • 缓存预热
    • 无专门预热流程;可通过首次登录或访问事件详情触发数据拉取。
  • 缓存更新
    • 计划任务更新 latestEventId;用户侧通过 KV 会话存储维持对话状态。

章节来源

  • src/command/index.ts
  • src/scheduler/index.ts

性能优化方案

  • 并发控制
    • 计划任务逐用户发送通知,避免并发风暴;可在用户维度引入队列或分批处理。
  • 重试机制
    • 使用 Cloudflare Worker 的计划任务重试能力;对网络抖动导致的失败进行自动重试。
  • 超时与错误恢复
    • API 调用应设置合理超时;对 5xx/网络错误进行指数退避重试;对 4xx 错误直接失败并记录。
  • 日志与可观测性
    • 启用观测性开关;在关键路径输出日志(如推送成功/失败、KV 读写错误)。

章节来源

  • wrangler.jsonc
  • src/scheduler/index.ts

依赖关系分析

  • 组件耦合
    • 命令处理器依赖 CosmoeClient 与 KV 存储;计划任务依赖 CosmoeClient 与 KV。
    • KV 通过绑定名称暴露给运行时,命令系统与计划任务共享命名空间。
  • 外部依赖
  • 潜在循环依赖

    • 当前结构清晰,无明显循环导入。

      graph LR
      Cmd["命令系统"] --> Client["CosmoeClient"]
      Sch["计划任务"] --> Client
      Cmd --> KV["KV 存储"]
      Sch --> KV
      Client --> API["Cosmoe API"]
      

图表来源

  • src/command/index.ts
  • src/scheduler/index.ts
  • src/client/cosmoe.ts

章节来源

  • src/command/index.ts
  • src/scheduler/index.ts
  • src/client/cosmoe.ts
  • worker-configuration.d.ts

性能考虑

  • 网络调用
    • 将认证与非认证接口分离,优先复用已认证实例;减少不必要的重复登录。
  • KV 访问
    • 合理使用 KV 的 list/get/put;避免在热路径上进行大量小对象写入。
  • 速率限制
    • Telegram API 对消息发送有限速,计划任务应避免在同一轮次内对同一用户多次发送。
  • 资源隔离
    • 将 KV 与 API 调用置于独立的执行单元,降低相互影响。

故障排查指南

  • 登录失败
    • 检查 KV 写入是否成功;确认凭证 JSON 结构与字段类型。
  • 事件推送异常
    • 查看 KV list 结果与发送响应;核对用户 ID 是否存在;关注控制台日志。
  • 预约失败
    • 核对优惠券可用性与时间槽剩余容量;查看 API 返回码与消息。
  • 取消失败
    • 确认用户凭证有效;检查预约状态是否允许取消。

章节来源

  • src/command/handlers/login.ts
  • src/scheduler/index.ts
  • src/command/handlers/bookEvent.ts
  • src/command/handlers/cancel.ts

结论

Cosmoe 项目的同步机制以“KV + 计划任务 + 显式增量”为核心,实现了事件的准实时推送与用户侧的按需查询。通过 KV 的原子写入与命令系统的幂等设计,系统在无复杂分布式锁的情况下保持了基本的一致性。未来可在 KV 层引入更强的一致性保障(如 CAS)与更细粒度的缓存策略,以进一步提升可靠性与性能。

附录

  • KV 接口要点
    • 支持 get/put/list/delete;可设置 TTL 与元数据;适用于轻量级缓存与状态存储。
  • 配置要点
    • 计划任务 cron:每分钟触发;观测性开启;KV 命名空间绑定。

章节来源

  • worker-configuration.d.ts
  • wrangler.jsonc
  • wrangler.jsonc