博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息中间件系列四、认识AMQP和RabbiyMq的简单使用
阅读量:6683 次
发布时间:2019-06-25

本文共 13931 字,大约阅读时间需要 46 分钟。

前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:

AMQP

AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制 。

一、AMQP的一些技术术语

  • AMQP模型(AMQP Model):一个由关键实体和语义表示的逻辑框架,遵从AMQP规范的服务器必须提供这些 实体和语义。为了实现本规范中定义的语义,客户端可以发送命令来控制AMQP服务器。
  • 连接(Connection):一个网络连接,比如TCP/IP套接字连接。
  • 会话(Session):端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
  • 信道(Channel):多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。虚拟的连接,建立在真实的tcp连接之上的。信道的创建没有限制的。
  • 交换器(Exchange):服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • 消息队列(Message Queue):一个命名实体,用来保存消息直到发送给消费者。
  • 绑定器(Binding):消息队列和交换器之间的关联。
  • 绑定器关键字(Binding Key):绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
  • 路由关键字(路由键)(Routing Key):一个消息头,交换器可以用这个消息头决定如何路由某条消息。
  • 持久存储(Durable):一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
  • 临时存储(Transient):一种服务器资源,当服务器重启时,保存的消息数据会丢失。
  • 持久化(Persistent):服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
  • 非持久化(Non-Persistent):服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
  • 消费者(Consumer):一个从消息队列中请求消息的客户端应用程序。
  • 生产者(Producer):一个向交换器发布消息的客户端应用程序。
    消息处理过程:

队列通过路由键绑定到交换器,生产者把消息发送到了交换器,交换器根据绑定的路由键将消息路由到特定的队列,订阅了队列的消费者进行接收。

_1

说明(下文会证明这三点)

  • 如果消息达到无人订阅的队列会一直在队列中等待,rabbitmq会默认队列是无限长度的。
  • 如果多个消费者订阅到同一队列,消息会轮询的方式发送给消费者,每个消息只会发送给一个消费者
  • 消息路由到了不存在的队列,消息会忽略,当消息不存在,消息丢失了。

二、创建自定义队列

  生产者和消费者都可以调用declareQueue方法创建队列,当没有目标队列时才会创建,如果已经存在相同的队列了就不在重复创建。

  相关参数:exclusive 队列为应用程序私有,auto-delete 最后一个消费者取消订阅时,队列会自动删除,durable 队列持久化。
  如果消费者订阅了队列,就不能再声明队列了。
先贴上后面会说到的一段代码,演示是怎么创建队列的

Channel channel = connection.createChannel();//信道        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器        String queueName = "direct_queue";        //创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数        channel.queueDeclare(queueName,false,false,false,null);

检测队列是否存在

可以通过调用方法QueueDeclarePassive判断队列是否存在,看方法名是“消极的声明创建”的意思,事实上它没有去声明队列,所谓消极,去看看有没有名为xxx的queue,如果有我就把名字什么的信息告诉你,没有就直接报错,用来确认queue是否存在。

三、四种交换器:

1、direct:

路由键完全匹配时,消息投放到对应队列。Amqp实现都必须有一个direct交换器(默认交换器),名称为空白字符。队列不声明交换器,会自动绑定到默认交换器,队列的名称作为路由键。

补充:一个direct队列可以绑定多个路由键,一条消息可以发给多个direct队列(都有绑定相同的路由键),一个direct队列(有多个路由键)也可以接收不同类型的消息。

_2

2、Fanout:

可以理解为广播,绑定这中交换器的队列,可以接收该交换器上任何类型的消息。

_3

3、Topic:

主题,使来自不同源头的消息到达同一个队列

_4

路由键中的“*”和“#”

“.”会把路由键分为好几个标识符,“*”匹配一个标识符,“#”匹配一个或者多个;

例如:xxx.yyy.zzzz 可以: xxx.*. zzzz , xxx.# , #.zzzz

4、Headers:

匹配消息头,其余与direct一样,实用性不大

举例说明

日志处理场景:

1、有交换器(topic)log_exchange,日志级别有 error,info,warning,应用模块有 user,order,email,路由键的规则是 日志级别+“.”+应用模块名(例如info.user)
2、发送邮件失败,报告一个email的error,basicPublic(message,"log-exchange","error.email")
队列的绑定:queueBind("email-error-queue","log-exchange","error.email")
要监听email所有的日志怎么办?
queueBind("email-log-queue","log-exchange"," *.email")
监听所有模块所有级别日志?
queuebind(“all-log-queue”,"log-exchange","#")
“.”会把路由键分为好几个标识符,“*”匹配一个标识符,“#”匹配一个或者多个(xxx.yyy.zzzz 可以: xxx.*. zzzz , xxx.# , #.zzzz)。

补充:生产者要把消息发送到消费者,两者必须绑定同一个的交换器。

四、虚拟主机

Vhost,真实rabbitmq服务器上的mini型虚拟的mq服务器。有自己的权限机制。Vhost提供了一个逻辑上的分离,可以区分客户端,避免队列和交换器的名称冲突。RabbitMq包含了一个缺省的vhost :“/”,用户名guest,口令 guest(guest用户只能在本机访问)。

五、消息持久化

1、队列是必须持久化

2、交换器也必须是持久化
3、消息的投递模式必须(int型) 2
以上条件全部满足,消息才能持久化
问题:持久化会带来性能的严重下降(下降10倍)

六、Rabbit简要架构介绍

_5

消息队列的使用过程大概如下:

(1)客户端连接到消息队列服务器,打开一个channel(信道)。(生产者和消费者都可操作)
(2)客户端声明一个exchange(交换器),并设置相关属性。(生产者和消费者都可操作)
(3)客户端声明一个queue(队列),并设置相关属性。(一般在消费者中操作)
(4)客户端使用routing key(路由键),在exchange(交换器)和queue(队列)之间建立好绑定关系。(一般在消费者中操作)
(5)客户端投递消息到exchange(交换器)。(一般在生产者中进行)
客户端先给指定交换器(exchange)发送消息,交换器再根据路由键把消息发送给相应的队列,而订阅了相应队列的客户端就能收到消息。

七、使用RabbitMq原生Java客户端进行消息通信

1、添加依赖

客户端Jar包和源码包下载地址:

如果是引入jar包的形式还需要引入slf4j-api-1.6.1.jar。

如果是Maven工程加入:

com.rabbitmq
amqp-client
5.0.0

注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本。

2、生产者通过DIRECT类型的交换器发布消息

AMQP一样要连接工厂、连接,与JMS中间件不同的是,AMQP多了信道、交换器、路由键等这些概念。处理过程看下面代码:

package dongnaoedu.normal;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class DirectProducer {    private final static String EXCHANGE_NAME = "direct_logs";//交换器    public static void main(String[] args) throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        //gust用户只能在本机访问        //非本机访问需要设置以下属性/*        factory.setUsername(..);        factory.setPort();        factory.setVirtualHost();*/        Connection connection = factory.newConnection();//连接        Channel channel = connection.createChannel();//信道        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//定义连接交换器        String[]serverities = {"error","info","warning"};//路由键        for(int i=0;i<3;i++){            String server = serverities[i];            String message = "Hello world_"+(i+1);            //basicProperties是设置一些消息属性的,不需要可以传null            //通过信道把消息、路由键传给交换器,交换器会根据路由键把消息传给相应的队列            channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());            System.out.println("Sent "+server+":"+message);        }        channel.close();        connection.close();    }}

3、生产者通过FANOUT类型的交换器发布消息

FANOUT和DIRECT在原生代码的实现方式上基本一样,只有在调用channel的exchangeDeclare方法声明交换器时把交换器类型改成FANOUT即可。

package dongnaoedu.normal;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class FanoutProducer {    private final static String EXCHANGE_NAME = "fanout_logs";//交换器    public static void main(String[] args) throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");/*        factory.setUsername(..);        factory.setPort();        factory.setVirtualHost();*/        Connection connection = factory.newConnection();//连接        Channel channel = connection.createChannel();//信道        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器        String[]serverities = {"error","info","warning"};//路由键        for(int i=0;i<3;i++){            String server = serverities[i];            String message = "Hello world_"+(i+1);            channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());            System.out.println("Sent "+server+":"+message);        }        channel.close();        connection.close();    }}

4、指定路由键的消费者

package dongnaoedu.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConsumerError {    private static final String EXCHANGE_NAME = "direct_logs";    public static void main(String[] argv) throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        Connection connection = factory.newConnection();//连接        Channel channel = connection.createChannel();//信道        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器        //声明随机队列        String queueName = channel.queueDeclare().getQueue();        String server = "error";        //队列和交换器的绑定        channel.queueBind(queueName,EXCHANGE_NAME,server);//把队列按路由键绑定到交换器上        System.out.println("Waiting message.......");        Consumer consumerB = new DefaultConsumer(channel){                //消息的回调监听            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);            }        };        //三个参数:队列名,是否自动确认,消息监听回调        channel.basicConsume(queueName,true,consumerB);//对消息进行消费    }}

补充:这里涉及到rabbit的确认机制,相关内容我会在下一篇博客再做详细介绍。

5、绑定多个路由键的消费者

package dongnaoedu.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConsumerAll {    private static final String EXCHANGE_NAME = "direct_logs";    public static void main(String[] argv) throws IOException,            InterruptedException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        Connection connection = factory.newConnection();//连接        Channel channel = connection.createChannel();//信道        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器        //声明随机队列        String queueName = channel.queueDeclare().getQueue();        String[]serverities = {"error","info","warning"};        for(String server:serverities){            //队列和交换器的绑定            channel.queueBind(queueName,EXCHANGE_NAME,server);//server是路由建,一个队列可以绑定多个路由建        }        System.out.println("Waiting message.......");        Consumer consumerA = new DefaultConsumer(channel){            //消息的回调监听            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);//getRoutingKey拿到路由建            }        };        //三个参数:队列名,是否自动确认,消息监听回调        channel.basicConsume(queueName,true,consumerA);//对消息进行消费    }}

启动消费者之后,两个消费者的打印信息都是一样的,会一直监听消息,当生产者发了消息的时候消费者会调用handleDelivery监听方法。

image

此时有两个消费者,连接,通道,交换器队列的信息如下

image

image
image
image

可以看到产生了两个队列,所以这两个消费者接收消息是互不影响的。

再启动生产者DirectProducer

image
这个简单的生产者执行完了就退出了,
ConsumerError通过路由键的匹配只能收到error类的消息
image
ConsumerAll收到了全部的消息:
image

6、自己创建队列,不同消费者订阅相同的队列

在上面例子的基础上进行改动,

调用通道的queueDeclare方法即可创建指定名字的队列,相关参数在目录二解释过了

package dongnaoedu.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConsumerAll {    private static final String EXCHANGE_NAME = "direct_logs";    public static void main(String[] argv) throws IOException,            InterruptedException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        Connection connection = factory.newConnection();//连接        Channel channel = connection.createChannel();//信道        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器        //声明随机队列//        String queueName = channel.queueDeclare().getQueue();        String queueName = "direct_queue";        //创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数        channel.queueDeclare(queueName,false,false,false,null);        String[]serverities = {"error","info","warning"};        for(String server:serverities){            //队列和交换器的绑定            channel.queueBind(queueName,EXCHANGE_NAME,server);//server是路由建,一个队列可以绑定多个路由建        }        System.out.println("Waiting message.......");        Consumer consumerA = new DefaultConsumer(channel){            //消息的回调监听            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);//getRoutingKey拿到路由建            }        };        //三个参数:队列,自动确认,消息监听回调        channel.basicConsume(queueName,true,consumerA);//对消息进行消费    }}
package dongnaoedu.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConsumerError {    private static final String EXCHANGE_NAME = "direct_logs";    public static void main(String[] argv) throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        Connection connection = factory.newConnection();//连接        Channel channel = connection.createChannel();//信道        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器//        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器        //声明随机队列//        String queueName = channel.queueDeclare().getQueue();        String queueName = "direct_queue"; //       direct_queue这个队列已经在ConsumerAll 绑定了三个路由键,这里不用再绑定了,也不用重复创建了        //参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数//        channel.queueDeclare(queueName,false,false,false,null); //       String server = "error";//        channel.queueBind(queueName,EXCHANGE_NAME,server);//把队列按路由键绑定到交换器上                System.out.println("Waiting message.......");        Consumer consumerB = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body,"UTF-8");                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);            }        };        channel.basicConsume(queueName,true,consumerB);    }}

两个类运行后的队列:

image

可以看到只有一个队列,两个消费者都是订阅这个队列。

生产者DirectProducer不用改变,运行DirectProducer后消费者接收信息的打印结果:

image

image

可以看到消息是轮询的方式发送给消费者,这时再把消费者关掉,可以看到队列已经没有消息了,由于创建队列的时候auto-delete设置为false,这时队列并没有删除。

image

再一次运行生产者,可以看到队列收到了3条新消息,没有被消费者消费。

image

在生产者这边,添加一个没有队列绑定的路由键,发现在direct交换器下队列和消费者和都没有受到任何影响。

String[]serverities = {"error","info","warning","test"};//路由键        for(int i=0;i<4;i++){            String server = serverities[i];            String message = "Hello world_"+(i+1);            //basicProperties是设置一些消息属性的,不需要可以传null            //通过信道把消息、路由键传给交换器,交换器会根据路由键把消息传给相应的队列            channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());            System.out.println("Sent "+server+":"+message);        }

这证实了上文说到的:

  • 如果多个消费者订阅到同一队列,消息会轮询的方式发送给消费者,每个消息只会发送给一个消费者。
  • 如果消息达到无人订阅的队列会一直在队列中等待,rabbitmq会默认队列是无限长度的。
  • 消息路由到了不存在的队列,消息会忽略,当消息不存在,消息丢失了。

转载地址:http://xxaao.baihongyu.com/

你可能感兴趣的文章
经典SQL语句大全
查看>>
测试LCD1602的显示,显示时间,提示语
查看>>
Linux常用命令
查看>>
SecureCRT 连接Ubuntu乱码解决
查看>>
Zabbix man手册
查看>>
APUE读书笔记-09进程关系(02)
查看>>
来南京工作两个月了!
查看>>
EDM邮件营销的七个基本原则
查看>>
dropna(thresh=n) 的用法
查看>>
Harbor镜像仓库漏洞扫描功能
查看>>
杂谈(20)写给妹妹的信-完整版
查看>>
边界在消失——2014年七大技术趋势预测
查看>>
综合考虑各系统的平衡——中科曙光数据中心产品事业部总经理沈卫东谈云数据中心节能...
查看>>
MongoDB +node.js图片读取服务
查看>>
关于jmeter里的自动重定向的使用-小强性能测试培训班学生作品
查看>>
我的友情链接
查看>>
将CDM中所有以Relatonship_开头的关系全部重命名,避免生成数据库因为重复关系名报错...
查看>>
C++ 标准库之algorithm
查看>>
系统限制和选项limit(一)
查看>>
Boson 6.0 试验笔记一
查看>>