温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么使用nsq消息中间件

发布时间:2021-11-16 14:03:31 来源:亿速云 阅读:126 作者:iii 栏目:大数据

本篇内容主要讲解“怎么使用nsq消息中间件”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么使用nsq消息中间件”吧!

组成

nsq是一款轻量级的消息中间件,查看nsq官网给出的解释,可知nsq的组成和分工:


nsqd is the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.

从上面可以看出:

  • nsqd是一个守护进程,负责与客户端打交道,负责缓存来自客户端的消息

  • nsqd可以作为一个单实例独自运行,通常在nsqlookupd实例的协同下组成集群(集群场景下,nsqd能用于发现topics和channels)

  • nsqd监听两个TCP端口,分别用于客户端(默认4150)和HTTP API(默认4151),另外可监听用于HTTPS的端口


nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.

从上面可以看出:

  • nsqlookupd是一个守护进程,负责管理拓扑信息,可供客户端查询得到nsqd节点(nsqd广播topics和channels信息)

  • nsqlookupd提供两种接口,TCP接口(默认4160)被nsqd用来发送广播,HTTP接口(默认4161)被客户端用于发现nsqd和连接nsqadmin

nsqadmin is a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.

从上面可以看出:

  • nsqadmin是一个后台管控Web进程,可实时浏览集群状态,可发起多种管理任务(nsqadmin依赖nsqlookupd来处理用户操作)

安装

这里为了快速搭建,使用docker compose方式安装(docker-compose.yaml见附件)

拷贝docker-compose.yaml到虚拟机,相关命令如下:

怎么使用nsq消息中间件

分别启动 nsqlookupd/nsqadmin/nsqd,对应三个容器和端口映射

浏览器中可打开 http://192.168.1.91:32770 访问 nsqadmin(虚拟机IP为192.168.1.91)

测试

package main

import (
	"bufio"
	"fmt"
	"github.com/bitly/go-nsq"
	"nsq-demo/src/config"
	"os"
)

var producer *nsq.Producer

func InitProducer(addr string) {
    var err error
	producer, err = nsq.NewProducer(addr, nsq.NewConfig())
	if err != nil {
		panic(err)
	}
	fmt.Println("connect to ", producer.String())
}

func Publish(topic, msg string) error {
	if producer == nil {// check producer
		return fmt.Errorf("producer is nil")
	}
	if msg == "" {// void empty msg
		return nil
	}
	return producer.Publish(topic, []byte(msg))// publish msg
}

func main() {
	InitProducer(config.Nsqd01)
	running := true

	reader := bufio.NewReader(os.Stdin)
	for running {
		data, _, _ := reader.ReadLine()
		command := string(data)
		if command == "stop" {
			running = false
		}

		for err := Publish(config.Topic, command); err != nil; err = Publish(config.Topic, command) {
			config.ExchangeNsqdIPs()
			InitProducer(config.Nsqd01)
		}
	}
	producer.Stop()
}

// producer直连nsqd后,接收来自控制台的输入,然后将消息发送给nsqd

package main

import (
	"fmt"
	"github.com/bitly/go-nsq"
	"nsq-demo/src/config"
	"time"
)

type MyConsumer struct{}

func (*MyConsumer) HandleMessage(msg *nsq.Message) error {// implementation Handler interface
	fmt.Println("receive from ", msg.NSQDAddress, "msg:", string(msg.Body))
	return nil
}

func InitConsumer(topic, channel, addr string) {
	conf := nsq.NewConfig()
	conf.LookupdPollInterval = time.Second
	c, err := nsq.NewConsumer(topic, channel, conf)
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0)// set system log
	c.AddHandler(&MyConsumer{})// set Hander to handle msg

	//if err := c.ConnectToNSQLookupd(addr); err != nil {
	//	panic(err)
	//}

	//if err := c.ConnectToNSQDs(config.GetNsqdIPs()); err != nil {
	//	panic(err)
	//}

	if err := c.ConnectToNSQD(config.Nsqd01); err != nil {
		panic(err)
	}
}

func main() {
	InitConsumer(config.Topic, config.Channel, config.Lookupd)
	select {}
}

// consumer直连nsqd后,通过自定义的Handler来处理消息

附录

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160" # for the nsqd
      - "4161" # for the nsqadmin
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 # connect to nsqlookupd
    depends_on:
      - nsqlookupd
    ports:
      - "4150" # for clients
      - "4151" # for the HTTP API
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 # connect to nsqlookupd
    depends_on:
      - nsqlookupd  
    ports:
      - "4171"

# docker-compose.yaml of simple nsq

到此,相信大家对“怎么使用nsq消息中间件”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

nsq
AI