生产者发送消息去往的地方,生产者只需要知道exchange名字和routingkey既可以发送消息,剩下的由exchange进行转发
接收消息的地方,消费者读取消息的地方,消费者连接到这个队列进行消息的读取。
虚拟主机,一个虚拟主机可以有多个exchange和queue,他们是隔离独立的
rabbitmq目录里包含两种类型的信息
1:元信息metadata,schema,topology,主要是保存exchange,queue,user,vhost等基本信息
2:消息数据信息:停留在队列里未被消费的消息
配置导入和导出,可以将A服务器的Exchange和Queue导入到B服务器
1:开启rabbitmq-management插件
2:获取rabbitmqadmin命令工具:通过wget http://localhost:15672/cli/rabbitmqadmin
3:安装python
4:python rabbitmqadmin export myrabbit.config -H localhost -P 15672 -u myuser -p mypass
5:配置导入:python rabbitmqadmin import myrabbit.config -H locahost -P 15672 -u myuser -p mypass
6:使用管理端口,不是使用5672端口
每个集群中的节点都是通过传输层连接的。所有节点对之间定期交换tick消息来维护连接来检测连接断开。
集群搭建:
1:安装erlang:这个包安装后足够支持rabbitmq-server运行
编辑/etc/yum.repos.d/rabbitmq-erlang.repo
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1
yum install -y erlang
2:安装rabbitmq-server:
rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm
yum install -y rabbitmq-server-3.7.7-1.el7.noarch.rpm
3:启动服务器:
chkconfig rabbitmq-server on
或者systemctl enable rabbitmq-server
systemctl start rabbitmq-server
查看状态:rabbitmqctl status
4:启动后会在/var/lib/rabbitmq下生成一个.erlang.cookie,需要ls -al查看
复制这个文件到其他节点上,需要修改文件的所有者,否则启动时读取权限出错
chown rabbitmq:rabbitmq .erlang.cookie
5:各个节点分别修改/etc/hosts和/etc/hostname
6:重启后关闭主节点外的各个节点:rabbitmqctl stop_app
7:其他节点加入集群:rabbitmqctl join_cluster rabbit@rabbitmq1
8:启动各个节点:rabbitmqctl start_app
9:查看集群信息:rabbitmqctl cluster_status
节点停止后可以直接重启,集群会自动加载
原理就是创建多一个镜像队列,默认,rabbitmq里的队列的内容位于队列声明所在的单个节点上。exchange和binding被认为在所有节点上的。当给队列做镜像后,相当于两个节点有一个队列的信息,默认是master节点起效果,当master崩溃,slave会自动提升为master,然后处理这个队列的消息。
队列镜像:
1:使用rabbitmqctl set_policy来设置,或者使用UI管理界面来设置
2:开启UI管理界面:rabbitmq-plugins enable rabbitmq_management
3:重启服务,然后访问ip:15672,用户名密码都是guest
4:在amind下面,右侧policy,添加一个policy,或者使用命令:
rabbitmqctl set_policy mypolicy “myReg”
以direct为例
1:引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
2:生产者:
ConnectionFactory factory = new ConnectionFactory();//创建连接工厂
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); //打开连接
Channel channel = connection.createChannel()) {
channel.queueDeclare(myQueueName, false, false, false, null); //绑定一个队列
String message = "Hello World!";
channel.basicPublish("", myQueueName, null, message.getBytes()); 发送消息到队列里
}
3:消费者:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(myQueueName, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。