golang gin 监听rabbitmq队列无限消费
连接rabbitmq
package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func InitRabbitmq() { var err error RabbitConn, err = amqp.Dial(config.Config.RabbitUrl) if err != nil { log.Println("连接RabbitMQ失败") panic(err) } RabbitChannel, err = RabbitConn.Channel() if err != nil { log.Println("获取RabbitMQ channel失败") panic(err) } } // 0表示channel未关闭,1表示channel已关闭 func CheckRabbitClosed(ch amqp.Channel) int6python4 { d := reflect.ValueOf(ch) i := d.FieldByName("closed").Int() return i }
创建生产者
package service import ( "encoding/json" "github.com/streadway/amqp" "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Producer() { // 声明队列,没有则创建 // 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列) declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil) if err != nil { log.Printf("声明队列 %v 失败, error: %v", config.Config.HawkSaveQueueName, err) panic(err) } request := model.Request{} marshal, _ := json.Marshal(request ) // exchange、routing key、mandatory、immediate err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(marshal), }) if err != nil { log.Printf("生产者发送消息失败, error: %v", err) } else { log.Println("生产者发送消息成功") } }
创建消费者
package service import ( "encoding/json" "log" "os" "strings" "sync" "time" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Consumer() { // 声明队列,没有则创建 // 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列) _, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil) if err != nil { log.Printf("声明队列 %v 失败, error: %v", config.Config.QueueName, err) panic(err) } // 队列名称、candroidonsumer、auto-ack、是否独享 // deliveries是一个管道,有消息到队列,就会消费,消费者的消息只需要从deliveries这个管道获取 deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil) if err != nil { log.Printf("从队列 %v 获取数据失败, error: %v", config.Config.QueueName, err) } else { log.Println("从消费队列编程获取任务成功") } // 阻塞住 for { select { case message := <-deliveries: closed := database.CheckRabbitClosed(*database.RabbitChannel) if closed == 1 { // channel 已关闭,重连一下 database.InitRabbitmq() } else { msgData := string(message.Body) request := model.Request{} err := json.Unmarshal([]byte(msgData), &request) if err != nil { log.Printf("解析rabbitmq数据 %v 失败, error: %v", msgData, err) } else { // TODO... 编程客栈// 处理逻辑 开发者_Python入门 } } } } }
main方法协程调用
package main import ( "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/router" "yy-data-processing/service" ) func main() { // 初始化路由 routers := router.InitRouters() // 初始化RabbitMQ database.InitRabbitmq() go service.Producer() go service.Consumer() port := config.Config.Port if err := routers.Run(":" + port); err != nil { log.Printf("启动服务失败: ",QJKEZeolvE err) } }
到此这篇关于golang gin 监听rabbitmq队列无限消费的文章就介绍到这了,更多相关golang监听rabbitmq内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!
精彩评论