这篇文章主要介绍“最简消息队列的实现方法”,在日常操作中,相信很多人在最简消息队列的实现方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”最简消息队列的实现方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
结合其他 mq
的使用经历,基本的使用流程:
创建 producer
或 consumer
启动 mq
生产消息/消费消息
对应到 queue
中,大致也是这个:
// 生产者创建工厂 producer := newMockedProducer() // 消费者创建工厂 consumer := newMockedConsumer() // 将生产者以及消费者的创建工厂函数传递给 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
我们看看 NewQueue
需要什么构建条件:
producer constructor
consumer constructor
将双方的工厂函数传递给 queue
,由它去执行以及重试。
这两个需要的目的是将生产者/消费者的构建和消息生产/消费都封装在 mq
中,而且将生产者/消费者的整套逻辑交给开发者处理:
type ( // 开发者需要实现此接口 Producer interface { AddListener(listener ProduceListener) Produce() (string, bool) } ... // ProducerFactory定义了生成Producer的方法 ProducerFactory func() (Producer, error) )
其实也就是将生产者的逻辑交个开发者自己完成,mq
只负责生产者/消费者的消息传递和之间的调度。
工厂方法的设计,是将生产者本身和生产消息,这两个任务都交给 queue
自己来做调度或者重试。
生产消息当然要回到生产者本身:
type mockedProducer struct { total int32 count int32 // 使用waitgroup来模拟任务的完成 wait sync.WaitGroup } // 实现 Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生产者编写都必须实现:
Produce()
:由开发者编写生产消息的逻辑
AddListener()
:生产者
和生产者类似:
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
启动,然后验证我们上述的生产者和消费者之间的数据是否传输成功:
func TestQueue(t *testing.T) { producer := newMockedProducer(rounds) consumer := newMockedConsumer() // 创建 queue q := NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil }) // 当生产者生产完毕,执行 Stop() 关闭生产端生产 go func() { producer.wait.Wait() // mq生产端停止生产,不是mq本身 Stop 运行 q.Stop() }() // 启动 q.Start() // 验证生产消费端是否消息消费完成 assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) }
以上就是 queue
最简易的入门使用代码。开发者可以根据自己的业务实际情况:自由定义生产者/消费者已经生产/消费逻辑。
![image-20210506224102836](/Users/dyhxl/Library/Application Support/typora-user-images/image-20210506224102836.png)
整体流程如上图:
全体的通信都由 channel
进行
通过加入监听器 listener
,以及事件触发 event
,相当于将触发器逻辑分离出来
生产者有 produceone
,这个是生产消息的逻辑,但是其中的 Produce()
是由开发者编写【上面的 interface
中正是这个函数】
同理消费者,Consume()
基本的消息流动就入上图以及上述描写的,具体的代码分析我们就留到下一篇,我们????分析里面,尤其是如何控制 channel
是整个设计的核心。
到此,关于“最简消息队列的实现方法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。