本文共 13931 字,大约阅读时间需要 46 分钟。
前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制 。
队列通过路由键绑定到交换器,生产者把消息发送到了交换器,交换器根据绑定的路由键将消息路由到特定的队列,订阅了队列的消费者进行接收。
生产者和消费者都可以调用declareQueue方法创建队列,当没有目标队列时才会创建,如果已经存在相同的队列了就不在重复创建。
相关参数:exclusive 队列为应用程序私有,auto-delete 最后一个消费者取消订阅时,队列会自动删除,durable 队列持久化。 如果消费者订阅了队列,就不能再声明队列了。先贴上后面会说到的一段代码,演示是怎么创建队列的Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器 String queueName = "direct_queue"; //创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数 channel.queueDeclare(queueName,false,false,false,null);
可以通过调用方法QueueDeclarePassive判断队列是否存在,看方法名是“消极的声明创建”的意思,事实上它没有去声明队列,所谓消极,去看看有没有名为xxx的queue,如果有我就把名字什么的信息告诉你,没有就直接报错,用来确认queue是否存在。
路由键完全匹配时,消息投放到对应队列。Amqp实现都必须有一个direct交换器(默认交换器),名称为空白字符。队列不声明交换器,会自动绑定到默认交换器,队列的名称作为路由键。
补充:一个direct队列可以绑定多个路由键,一条消息可以发给多个direct队列(都有绑定相同的路由键),一个direct队列(有多个路由键)也可以接收不同类型的消息。可以理解为广播,绑定这中交换器的队列,可以接收该交换器上任何类型的消息。
主题,使来自不同源头的消息到达同一个队列
“.”会把路由键分为好几个标识符,“*”匹配一个标识符,“#”匹配一个或者多个;
例如:xxx.yyy.zzzz 可以: xxx.*. zzzz , xxx.# , #.zzzz匹配消息头,其余与direct一样,实用性不大
日志处理场景:
1、有交换器(topic)log_exchange,日志级别有 error,info,warning,应用模块有 user,order,email,路由键的规则是 日志级别+“.”+应用模块名(例如info.user)2、发送邮件失败,报告一个email的error,basicPublic(message,"log-exchange","error.email")队列的绑定:queueBind("email-error-queue","log-exchange","error.email")要监听email所有的日志怎么办?queueBind("email-log-queue","log-exchange"," *.email")监听所有模块所有级别日志?queuebind(“all-log-queue”,"log-exchange","#")“.”会把路由键分为好几个标识符,“*”匹配一个标识符,“#”匹配一个或者多个(xxx.yyy.zzzz 可以: xxx.*. zzzz , xxx.# , #.zzzz)。补充:生产者要把消息发送到消费者,两者必须绑定同一个的交换器。
Vhost,真实rabbitmq服务器上的mini型虚拟的mq服务器。有自己的权限机制。Vhost提供了一个逻辑上的分离,可以区分客户端,避免队列和交换器的名称冲突。RabbitMq包含了一个缺省的vhost :“/”,用户名guest,口令 guest(guest用户只能在本机访问)。
1、队列是必须持久化
2、交换器也必须是持久化3、消息的投递模式必须(int型) 2 以上条件全部满足,消息才能持久化问题:持久化会带来性能的严重下降(下降10倍)消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel(信道)。(生产者和消费者都可操作)(2)客户端声明一个exchange(交换器),并设置相关属性。(生产者和消费者都可操作)(3)客户端声明一个queue(队列),并设置相关属性。(一般在消费者中操作)(4)客户端使用routing key(路由键),在exchange(交换器)和queue(队列)之间建立好绑定关系。(一般在消费者中操作)(5)客户端投递消息到exchange(交换器)。(一般在生产者中进行)客户端先给指定交换器(exchange)发送消息,交换器再根据路由键把消息发送给相应的队列,而订阅了相应队列的客户端就能收到消息。客户端Jar包和源码包下载地址:
如果是引入jar包的形式还需要引入slf4j-api-1.6.1.jar。如果是Maven工程加入:
com.rabbitmq amqp-client 5.0.0
注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本。
AMQP一样要连接工厂、连接,与JMS中间件不同的是,AMQP多了信道、交换器、路由键等这些概念。处理过程看下面代码:
package dongnaoedu.normal;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class DirectProducer { private final static String EXCHANGE_NAME = "direct_logs";//交换器 public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //gust用户只能在本机访问 //非本机访问需要设置以下属性/* factory.setUsername(..); factory.setPort(); factory.setVirtualHost();*/ Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//定义连接交换器 String[]serverities = {"error","info","warning"};//路由键 for(int i=0;i<3;i++){ String server = serverities[i]; String message = "Hello world_"+(i+1); //basicProperties是设置一些消息属性的,不需要可以传null //通过信道把消息、路由键传给交换器,交换器会根据路由键把消息传给相应的队列 channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes()); System.out.println("Sent "+server+":"+message); } channel.close(); connection.close(); }}
FANOUT和DIRECT在原生代码的实现方式上基本一样,只有在调用channel的exchangeDeclare方法声明交换器时把交换器类型改成FANOUT即可。
package dongnaoedu.normal;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class FanoutProducer { private final static String EXCHANGE_NAME = "fanout_logs";//交换器 public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1");/* factory.setUsername(..); factory.setPort(); factory.setVirtualHost();*/ Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器 String[]serverities = {"error","info","warning"};//路由键 for(int i=0;i<3;i++){ String server = serverities[i]; String message = "Hello world_"+(i+1); channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes()); System.out.println("Sent "+server+":"+message); } channel.close(); connection.close(); }}
package dongnaoedu.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConsumerError { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器 //声明随机队列 String queueName = channel.queueDeclare().getQueue(); String server = "error"; //队列和交换器的绑定 channel.queueBind(queueName,EXCHANGE_NAME,server);//把队列按路由键绑定到交换器上 System.out.println("Waiting message......."); Consumer consumerB = new DefaultConsumer(channel){ //消息的回调监听 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; //三个参数:队列名,是否自动确认,消息监听回调 channel.basicConsume(queueName,true,consumerB);//对消息进行消费 }}
补充:这里涉及到rabbit的确认机制,相关内容我会在下一篇博客再做详细介绍。
package dongnaoedu.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConsumerAll { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器 //声明随机队列 String queueName = channel.queueDeclare().getQueue(); String[]serverities = {"error","info","warning"}; for(String server:serverities){ //队列和交换器的绑定 channel.queueBind(queueName,EXCHANGE_NAME,server);//server是路由建,一个队列可以绑定多个路由建 } System.out.println("Waiting message......."); Consumer consumerA = new DefaultConsumer(channel){ //消息的回调监听 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);//getRoutingKey拿到路由建 } }; //三个参数:队列名,是否自动确认,消息监听回调 channel.basicConsume(queueName,true,consumerA);//对消息进行消费 }}
启动消费者之后,两个消费者的打印信息都是一样的,会一直监听消息,当生产者发了消息的时候消费者会调用handleDelivery监听方法。
此时有两个消费者,连接,通道,交换器队列的信息如下
可以看到产生了两个队列,所以这两个消费者接收消息是互不影响的。
再启动生产者DirectProducer
这个简单的生产者执行完了就退出了,ConsumerError通过路由键的匹配只能收到error类的消息ConsumerAll收到了全部的消息:在上面例子的基础上进行改动,
调用通道的queueDeclare方法即可创建指定名字的队列,相关参数在目录二解释过了package dongnaoedu.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConsumerAll { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器 //声明随机队列// String queueName = channel.queueDeclare().getQueue(); String queueName = "direct_queue"; //创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数 channel.queueDeclare(queueName,false,false,false,null); String[]serverities = {"error","info","warning"}; for(String server:serverities){ //队列和交换器的绑定 channel.queueBind(queueName,EXCHANGE_NAME,server);//server是路由建,一个队列可以绑定多个路由建 } System.out.println("Waiting message......."); Consumer consumerA = new DefaultConsumer(channel){ //消息的回调监听 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);//getRoutingKey拿到路由建 } }; //三个参数:队列,自动确认,消息监听回调 channel.basicConsume(queueName,true,consumerA);//对消息进行消费 }}
package dongnaoedu.normal;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConsumerError { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器// channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器 //声明随机队列// String queueName = channel.queueDeclare().getQueue(); String queueName = "direct_queue"; // direct_queue这个队列已经在ConsumerAll 绑定了三个路由键,这里不用再绑定了,也不用重复创建了 //参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数// channel.queueDeclare(queueName,false,false,false,null); // String server = "error";// channel.queueBind(queueName,EXCHANGE_NAME,server);//把队列按路由键绑定到交换器上 System.out.println("Waiting message......."); Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); } }; channel.basicConsume(queueName,true,consumerB); }}
两个类运行后的队列:
可以看到只有一个队列,两个消费者都是订阅这个队列。
生产者DirectProducer不用改变,运行DirectProducer后消费者接收信息的打印结果:
可以看到消息是轮询的方式发送给消费者,这时再把消费者关掉,可以看到队列已经没有消息了,由于创建队列的时候auto-delete设置为false,这时队列并没有删除。
再一次运行生产者,可以看到队列收到了3条新消息,没有被消费者消费。
在生产者这边,添加一个没有队列绑定的路由键,发现在direct交换器下队列和消费者和都没有受到任何影响。String[]serverities = {"error","info","warning","test"};//路由键 for(int i=0;i<4;i++){ String server = serverities[i]; String message = "Hello world_"+(i+1); //basicProperties是设置一些消息属性的,不需要可以传null //通过信道把消息、路由键传给交换器,交换器会根据路由键把消息传给相应的队列 channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes()); System.out.println("Sent "+server+":"+message); }
这证实了上文说到的:
转载地址:http://xxaao.baihongyu.com/