RabbitMQ学习
RabbitMQ 简介
RabbitMQ——Rabbit Message Queue的简写,是一个由erlang开发,基于AMQP(Advanced Message Queue Protocol)协议的开源实现。RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署,适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。是当前最主流的消息中间件之一。
RabbitMQ的官网:http://www.rabbitmq.com
RabbitMQ 投递过程:
- 1.客户端连接到消息队列服务器,打开一个 channel。
- 2.客户端声明一个 exchange,并设置相关属性。
- 3.客户端声明一个 queue,并设置相关属性。
- 4.客户端使用 routing key,在 exchange 和 queue 之间建立好绑定关系。
- 5.客户端Producer投递消息到 exchange。
- 6.客户端Consumer从指定的 queue 中消费信息。
AMQP 简介
AMQP,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,同样,消息使用者也不用知道发送者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 概念
- Exchange : 消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue : 消息队列,每个消息都会被投入到一个或多个队列
- Routing Key : 路由关键字,exchange根据这个关键字进行消息投递。
- Binding Key : 绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来
- Vhost : 虚拟主机,可以开设多个 vhost,用作不同用户的权限分离
- Producer : 消息生产者,投递消息的程序,简写 :P
- Consumer : 消息消费者,接受消息的程序,简写 : C
- Broker :消息队列的服务器实体。
- Channel : 消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel 代表一个会话任务
消息发送原理
应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。
信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。
为什么不通过TCP直接发送命令?对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。
如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。
使用 RabbitMQ
在.NET中使用RabbitMQ需要下载RabbitMQ的客户端程序集.
1
| Install-Package RabbitMQ.Client -Version 5.1.0
|
https://www.nuget.org/packages/RabbitMQ.Client/
工作队列
工作队列(work queues, 又称任务队列Task Queues)的主要思想是为了避免立即执行并等待一些占用大量资源、时间的操作完成。而是把任务(Task)当作消息发送到队列中,稍后处理。一个运行在后台的工作者(worker)进程就会取出任务然后处理。当运行多个工作者(workers)时,任务会在它们之间共享。
这个在网络应用中非常有用,它可以在短暂的HTTP请求中处理一些复杂的任务。在一些实时性要求不太高的地方,我们可以处理完主要操作之后,以消息的方式来处理其他的不紧要的操作,比如写日志等等。
发送/接收消息
分别创建两个控制台项目Send、Receive。
安装 RabbitMQ.Client
注意:消息接收端和发送端的队列名称(queue)必须保持一致
源代码:https://github.com/syxdevcode/RabbitMqDemo
Send逻辑代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| namespace RabbitMqDemo.Send { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { UserName = "admin", Password = "admin", AutomaticRecoveryEnabled = true, Ssl = new SslOption() { CertPath = @"E:\git\RabbitMqDemo\RabbitMqDemo.Send\server.pfx", CertPassphrase = "123123", AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors, Enabled = true }, RequestedHeartbeat = 60, Port = 5673, TopologyRecoveryEnabled = true };
using (var connection = factory.CreateConnection(new string[1] { "192.168.0.115" })) { using (var channel = connection.CreateModel()) { var result = channel.QueueDeclare(queue: "test1", durable: true, exclusive: false, autoDelete: false, arguments: null);
for (int i = 0; i < int.MaxValue; i++) { var body = Encoding.UTF8.GetBytes(i.ToString() + "test"); channel.BasicPublish(exchange: "", routingKey: "test1", basicProperties: null, body: body); Console.WriteLine("{0} 推送成功", i); Thread.Sleep(500); } } } Console.Read(); } } }
|
Receive逻辑代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { UserName = "admin", Password = "admin", AutomaticRecoveryEnabled = true, Ssl = new SslOption() { CertPath = @"E:\git\RabbitMqDemo\RabbitMqDemo.Receive\server.pfx", CertPassphrase = "123123", AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors, Enabled = true }, RequestedHeartbeat = 60, Port = 5673, TopologyRecoveryEnabled = true };
using (var connection = factory.CreateConnection(new string[1] { "192.168.0.115" })) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "test1", durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(500); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "test1", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
|
轮询分发
我们先启动两个接收端,等待消息接收,再启动一个发送端进行消息发送。
从图中可知,发送的信息,被两个消息接收端按顺序循环分配。
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者将获得相同数量的消息。这种分发消息的方式叫做循环(round-robin)。
消息确认
当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在当前的代码中,当RabbitMQ将消息发送给消费者(consumers)之后,马上就会将该消息从队列中移除。此时,如果把处理这个消息的工作者(worker)停掉,正在处理的这条消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。
我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望该消息会重新发送给其他的工作者(worker)。
为了防止消息丢失,RabbitMQ提供了** 消息响应(message acknowledgments)**机制。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。
如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| namespace RabbitMqDemo.Receive { class Program { static void Main(string[] args) {
ConnectionFactory factory = new ConnectionFactory() { UserName = "admin", Password = "admin", AutomaticRecoveryEnabled = true, Port = 5672, TopologyRecoveryEnabled = true };
using (var connection = factory.CreateConnection(new string[1] { "192.168.0.115" })) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "test1", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(2000); Console.WriteLine(" [x] Done {0}", message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); };
channel.BasicConsume(queue: "test1", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
|
消息持久化
消息确认确保了即使消费端异常,消息也不会丢失能够被重新分发处理。但是如果RabbitMQ服务端异常,消息依然会丢失。除非我们指定durable:true,否则当RabbitMQ退出或奔溃时,消息将依然会丢失。通过指定durable:true,并指定Persistent=true,来告知RabbitMQ将消息持久化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| namespace RabbitMqDemo.Send { class Program { static void Main(string[] args) { ServicePointManager.ServerCertificateValidationCallback += (sender, cert, chain, sslPolicyErrors) => true; ConnectionFactory factory = new ConnectionFactory() { UserName = "admin", Password = "admin", AutomaticRecoveryEnabled = true, Ssl = new SslOption() { CertPath = @"E:\git\RabbitMqDemo\RabbitMqDemo.Send\server.pfx", CertPassphrase = "123123", AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors, Enabled = true }, RequestedHeartbeat = 60, Port = 5673, TopologyRecoveryEnabled = true };
using (var connection = factory.CreateConnection(new string[1] { "192.168.0.115" })) { using (var channel = connection.CreateModel()) { var properties = channel.CreateBasicProperties(); properties.Persistent = true;
var result = channel.QueueDeclare(queue: "test1", durable: true, exclusive: false, autoDelete: false, arguments: null);
for (int i = 0; i <21; i++) { var body = Encoding.UTF8.GetBytes(i.ToString() + "test"); channel.BasicPublish(exchange: "", routingKey: "test1", basicProperties: properties, body: body); Console.WriteLine("{0} 推送成功", i); Thread.Sleep(2000); } } } Console.Read(); } } }
|
需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms
公平分发
RabbitMQ的消息分发默认按照消费端的数量,按顺序循环分发。这样仅是确保了消费端被平均分发消息的数量,但却忽略了消费端的闲忙情况。这就可能出现某个消费端一直处理耗时任务处于阻塞状态,某个消费端一直处理一般任务处于空置状态,而只是它们分配的任务数量一样。
但我们可以通过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1
来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。
消费者Receive代码:
1 2 3 4
| channel.QueueDeclare(queue: "test1", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
|
这时你需要注意的是如果所有的消费端都处于忙碌状态,你的队列可能会被塞满。你需要注意这一点,要么添加更多的消费端,要么采取其他策略。
几种 Exchange 模式
AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。
RabbitMQ提供了Exchange,它类似于路由器的功能,它用于对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另一方面将消息推送到队列。但exchange必须知道如何处理接收到的消息,是将其附加到特定队列还是附加到多个队列,还是直接忽略。而这些规则由exchange type定义,exchange的原理如下图所示。
Exchange分类
RabbitMQ的Exchange(交换器)分为四类:
- direct:默认;明确的路由规则:消费端绑定的队列名称必须和消息发布时指定的路由名称一致
- fanout: 消息广播,将消息分发到exchange上绑定的所有队列上
- topic:模式匹配的路由规则:支持通配符
- headers:不常用,允许你匹配AMQP消息的header而非路由键,除此之外headers交换器和direct交换器完全一致,但性能却很差,几乎用不到
注意: fanout、topic交换器是没有历史数据的,也就是说对于中途创建的队列,获取不到之前的消息。
1,direct
路由机制如下图,即队列名称和消息发送时指定的路由完全匹配时,消息才会发送到指定队列上。
routingKey => direct exchange => queue
生产者Send代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| ConnectionFactory factory = new ConnectionFactory() { UserName = "admin", Password = "admin", AutomaticRecoveryEnabled = true, Ssl = new SslOption() { CertPath = @"E:\git\RabbitMqDemo\RabbitMqDemo.Send\server.pfx", CertPassphrase = "123123", AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors, Enabled = true }, RequestedHeartbeat = 60, Port = 5673, TopologyRecoveryEnabled = true };
using (var connection = factory.CreateConnection(new string[1] { "192.168.0.115" })) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var properties = channel.CreateBasicProperties(); properties.Persistent = true;
for (int i = 0; i < 21; i++) { var body = Encoding.UTF8.GetBytes(i.ToString() + "test"); channel.BasicPublish(exchange: "directEC", routingKey: "direct-key", basicProperties: null, body: body); Console.WriteLine("{0} 推送成功", i); } } }
|
消费者Receive代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| ConnectionFactory factory = new ConnectionFactory() { UserName = "admin", Password = "admin", AutomaticRecoveryEnabled = true, Port = 5672, TopologyRecoveryEnabled = true };
using (var connection = factory.CreateConnection(new string[1] { "192.168.0.115" })) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "directEC", routingKey: "direct-key");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); Console.WriteLine(" [x] Done1 {0}", message); };
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine(); } }
|
2,fanout
发布/订阅模式
fanout的路由机制如下图,即发送到 fanout 类型exchange的消息都会分发到所有绑定该exchange的队列上去。
生产者Send代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| using (var connection = factory.CreateConnection(new string[1] { "192.168.0.115" })) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var properties = channel.CreateBasicProperties(); properties.Persistent = true;
for (int i = 0; i < 21; i++) { var body = Encoding.UTF8.GetBytes(i.ToString() + "test");
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: properties, body: body); Console.WriteLine("{0} 推送成功", i); } } }
|
消费者Receive代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| using (var connection = factory.CreateConnection(new string[1] { "192.168.0.115" })) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "fanoutEC", routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); Console.WriteLine(" [x] Done1 {0}", message); };
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine(); } }
|
3,topic
匹配订阅模式
topic是direct的升级版,是一种模式匹配的路由机制。它支持使用两种通配符来进行模式匹配:符号#
和符号*
。其中*
匹配一个单词, #
则表示匹配0个或多个单词,单词之间用.
分割。如下图所示。
生产者Send代码:
1 2 3 4 5 6 7 8
| var queueName = channel.QueueDeclare().QueueName;
channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);
|
消费者Receive代码:
1 2 3 4 5 6
| channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
var queuename = channel.QueueDeclare ().QueueName;
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");
|
RPC
RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
项目:https://github.com/syxdevcode/RabbitMqDemo.git
参考官网:
http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
死信队列
为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
死信,在官网中对应的单词为 Dead Letter
,死信 是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况,该消息将成为死信:
- 消息被否定确认,使用
channel.basicNack
或 channel.basicReject
,并且此时requeue
属性被设置为false
。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
配置死信队列
步骤:
- 1,配置业务队列,绑定到业务交换机上
- 2,为业务队列配置死信交换机和路由key
- 3,为死信交换机配置死信队列
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。
死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。
一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
死信消息的变化
那么死信被丢到死信队列中后,会发生什么变化呢?
如果队列配置了参数 x-dead-letter-routing-key
的话,死信的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。
举个栗子:
如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列QueueA中,如果该队列没有配置参数x-dead-letter-routing-key
,则该消息成为死信后,将保留原有的路由keytestA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。
另外,由于被抛到了死信交换机,所以消息的Exchange Name也会被替换为死信交换机的名称。
总结
死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
总结一下死信消息的生命周期:
- 业务消息被投入业务队列
- 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
- 被nck或reject的消息由RabbitMQ投递到死信交换机中
- 死信交换机将消息投入相应的死信队列
- 死信队列的消费者消费死信消息
死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。
参考:
RabbitMQ Tutorials
RabbitMQ学习系列(一): 介绍
RabbitMQ学习系列(四): 几种Exchange 模式
RabbitMQ知多少
深入解读RabbitMQ工作原理及简单使用
RabbitMQ交换器Exchange介绍与实践
【RabbitMQ】一文带你搞定RabbitMQ死信队列