在Golang中使用RabbitMQ实现任务分发与负载均衡的策略可以通过以下步骤实现:
安装RabbitMQ: 根据你的操作系统,在RabbitMQ官网上下载并安装RabbitMQ。
创建生产者和消费者: 在Golang中,使用RabbitMQ的AMQP库可以创建生产者和消费者。生产者负责将任务放入队列中,消费者则从队列中取出任务并执行。
// 生产者
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 创建一个channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
// 声明一个队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否独占连接
false, // 是否阻塞
nil, // 额外的属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 发布消息到队列中
body := "Hello RabbitMQ!"
err = ch.Publish(
"", // 交换器
q.Name, // 路由键
false, // 强制性
false, // 立即发送
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化消息
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}
log.Printf("Sent message: %s", body)
}
// 消费者
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 创建一个channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
// 声明一个队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否独占连接
false, // 是否阻塞
nil, // 额外的属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 设置每次从队列中获取的消息数量
err = ch.Qos(
1, // 每次获取的数量
0, // 预取数量
false, // 是否全局
)
if err != nil {
log.Fatalf("Failed to set QoS: %v", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名
"", // 消费者标识
false, // 自动回复
false, // 独占连接
false, // 不阻塞
false, // 额外的属性
nil, // 可选项
)
if err != nil {
log.Fatalf("Failed to consume messages: %v", err)
}
forever := make(chan bool)
// 处理并执行任务
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟任务执行,这里可以替换为实际的任务处理逻辑
doWork(d.Body)
log.Printf