文档章节
-
简介
-
消息与消息队列
-
性能对比
-
了解什么是AMQP协议?
-
AMQP的协议模型
-
-
AMQP核心概念
-
AMQP消息路由
-
Exchange类型
-
-
RabbitMQ特点
-
RabbitMQ 中的概念模型
-
消息模型
-
RabbitMQ 基本概念
-
-
RabbitMQ的作用和使用场景
-
RabbitMQ的核心组件
-
Hello RabbitMQ World
-
pom依赖
-
RabbitMQ web客户端
-
简介
目前,主流的消息中间件主要有:ActiveMQ、Kafka、RabbitMQ、RocketMQ等等......,而我们今天的主角是:RabbitMQ,RabbitMQ是一个开元基于 erlang 语言开发具有高可用高并发的优点,适合集群消息代理和队列服务器,它是基于AMQP协议来实现的,AMQP的和主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全,RabbitMQ支持多种语言,有消息确认机制和持久化机制,保证数据不丢失的前提做到可靠性、可用性。
消息与消息队列
性能对比
什么是AMQP协议?
AMQP的协议模型
-
生产者只需要将消息投递到Exchange交换机中,不需要关注消息被投递到哪个队列。
-
消费者只需要监听队列来消费消息,不需要关注消息来自于哪个Exchange。
-
Exchange和Message Queue存在着绑定的关系,一个Exchage可以绑定多个消息队列。
AMQP核心概念
-
Server:又称Broker,接受客户端的连接,实现AMQP实体服务;
-
Connection:连接,应用于程序与Broker的网络连接;
-
Channel:网络通道,几乎所有的操作都是在channel中进行的,channel是进行消息读写的通道,客户可以建立多个channel,每个channel代表一个会话任务;
-
Message:消息,服务器与应用程序之间传送的数据,有Properties和Body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body是消息体内容;
-
Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtrual host里面可以有若干个Exchange和Queue,用一个Virtrual host里面不能有相同名字的Exchange或Queue;
-
Exchange:交换机,接收消息,很久路由键转发消息到绑定的队列;
-
Routingkey:生产者架构消息发给交换器的时候,会指定一个RoutingKey,用来置顶这个消息的路由规则,通过RoutingKey来决定消息流向哪里;
-
Binding:绑定,RabbitMQ中通过绑定将交换器跟队列关联起来,在绑定的时候会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确的将消息路由到对应的队列中去了,也就是生产者将信息发送给交换器时,需要一个RoutingKey,当RoutingKey与BindingKey完全匹配时,消息会被路由到对应的队列中去;
-
Queue:也成Message Queue,消息队列,保存消息并将他们转发给消费者;
AMQP消息路由
Exchange类型
-
在发送消息的时候设置路由键为“info”或者“debug”,消息只会路由到Queue2,如果以其他路邮件发送消息,则消息不会路由到这两个队里中,这就是路由键和Binding key的完全匹配。
-
RoutingKey为一个点号 ".",分隔的字符串(被点号 "."号分隔开的一段独立的字符串称为单词),比如:
"com.rabbit.client"、"java.util.Map";
-
BindingKey和RoutingKey为一个点号 ".",分隔的字符串;
-
BindingKey中可以存在两种特殊的字符串 "*"和 "#",用于做模糊匹配,其中"*"用于匹配一个单词, "#" 用于匹配多规则单词;
-
"com.#" 可以匹配到 com.rabbitmq.aaa
-
"com.*" 可以匹配到 com.rabbitmq
-
-
路由键为 "com.rabbitmq.client" 的消息会同时路由到 Queue1 和 Queue2
-
路由键为 "com.hidden.client" 的消息会只会路由到 Queue2
-
路由键为 "com.hidden.data" 的消息会只会路由到 Queue2
-
路由键为 "java.rabbitmq.data" 的消息会只会路由到 Queue1
-
路由键为 "java.util.map" 的消息将会被丢弃或者返回给生产者,因为它没有匹配任何的路由键。
RabbitMQ特点
-
可靠性 RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
-
灵活的路由(Flexible Routing) 在消息进入队列之前,通过Exchange来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的Exchange 。
-
消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
-
高可用 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
-
多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
-
多语言客户端 RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
-
管理界面 RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
-
跟踪机制 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
-
插件机制 RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。。
RabbitMQ 中的概念模型
消息模型
RabbitMQ 基本概念
RabbitMQ的作用和使用场景
RabbitMQ的核心组件
Hello RabbitMQ World
-
生产意味着发送,发送消息的程序是生产者
-
队列就是RabbitMQ内部的邮箱名称,消息是存储在队列中的,尽管消息流经RabbitMQ和你的应用程序,生产者可以发送一个队列信息,许多消费者可以尝试从一个队列里接收数据
-
消费与接受者是同一个身份,一个消费者是一个程序,主要是等待接收信息
pom依赖
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
/**
* @Author 林必昭
* @Date 2019/11/29 22:12
* @descr
*/
public class MassageProducer {
private final static String QUEUE_NAME = "myQueue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ所在主机的ip或者主机名
factory.setHost("localhost");
//设置端口号
factory.setPort(5672);
Connection conn = factory.newConnection();
//创建一个通道
Channel channel = conn.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义要发送的消息
String message = "Hello RabbitMQ World!";
//往队列里发送message消息
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [生产者] 发送消息:'" + message + "'");
}
}
/**
* @Author 林必昭
* @Date 2019/11/29 22:12
* @descr
*/
public class MessageConsumer {
private final static String QUEUE_NAME = "myQueue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("消费者正在等待消息,退出请按CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [消费者] 接收到了: '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
-
注意的是:这里必须下载安装Erlang安装包,因为RabbitMQ是基于Erlang进行开发的。
RabbitMQ web客户端