Skip to content

Cloudflare Queues

将 Hono 与 Cloudflare Queues 结合使用。

ts
import { Hono } from 'hono'

type Environment = {
  readonly ERROR_QUEUE: Queue<Error> // 错误队列
  readonly ERROR_BUCKET: R2Bucket // 错误 Bucket
}

const app = new Hono<{
  Bindings: Environment
}>()

app.get('/', (c) => { // GET 请求处理
  if (Math.random() < 0.5) { // 模拟随机成功
    return c.text('Success!') // 返回成功
  }
  throw new Error('Failed!') // 抛出错误
})

app.onError(async (err, c) => { // 错误处理中间件
  await c.env.ERROR_QUEUE.send(err) // 发送错误信息到队列
  return c.text(err.message, { status: 500 }) // 返回错误响应
})

export default {
  fetch: app.fetch, // HTTP 请求处理器
  async queue(batch: MessageBatch<Error>, env: Environment) { // 队列消费者
    let file = '' // 初始化错误日志文件内容
    for (const message of batch.messages) { // 遍历消息批次
      const error = message.body // 获取消息体 (错误对象)
      file += error.stack || error.message || String(error) // 将错误堆栈或消息添加到日志文件
      file += '\r\n' // 添加换行符
    }
    await env.ERROR_BUCKET.put(`errors/${Date.now()}.log`, file) // 将错误日志上传到 R2 Bucket
  },
}
toml
name = "my-worker" // Worker 名称

[[queues.producers]] // 队列生产者配置
  queue = "my-queue" // 队列名称
  binding = "ERROR_QUEUE" // 绑定到环境变量 ERROR_QUEUE

[[queues.consumers]] // 队列消费者配置
  queue = "my-queue" // 队列名称
  max_batch_size = 100 // 最大批处理大小
  max_batch_timeout = 30 // 最大批处理超时时间 (秒)

[[r2_buckets]] // R2 Bucket 配置
  bucket_name = "my-bucket" // Bucket 名称
  binding = "ERROR_BUCKET" // 绑定到环境变量 ERROR_BUCKET

另请参阅

在 MIT 许可证下发布。