加入收藏 | 设为首页 | 会员中心 | 我要投稿 莱芜站长网 (https://www.0634zz.com/)- 云连接、建站、智能边缘云、设备管理、大数据!
当前位置: 首页 > 编程开发 > asp.Net > 正文

.Net Core和RabbitMQ限制循环消费的方法

发布时间:2023-02-17 10:10:44 所属栏目:asp.Net 来源:互联网
导读:当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。 循环场景 生产者发送100条消息到RabbitMQ中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当r
  当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。
  循环场景
  生产者发送100条消息到RabbitMQ中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当requeue为false时,设置了死信队列则进入死信队列,否则移除消息)。

  consumer.Received += (model, ea) =>
  {
      var message = ea.Body;
      Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
  
      if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
      {
          Console.WriteLine("拒收");
          ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
          return;
      }
  
      ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
  };
  当第50条消息拒收,则仍在队列中且处在队列头部,重新推送给消费者,再次拒收,再次推送,反反复复。
 
  最终其他消息全部消费完毕,仅剩第50条消息往复间不断消费,拒收,消费,这将可能导致RabbitMQ出现内存泄漏问题。
 

  解决方案
  RabbitMQ及AMQP协议本身没有提供这类重试功能,但可以利用一些已有的功能来间接实现重试限定(以下只考虑基于手动确认模式情况)。此处只想到或是只查到了如下几种方案解决消息循环消费问题。
 
  一次消费
  无论成功与否,消费者都对外返回ack,将拒收原因或是异常信息catch存入本地或是新队列中另作重试。
  消费者拒绝消息或是出现异常,返回Nack或Reject,消息进入死信队列或丢弃(requeue设定为false)。
  限定重试次数
  在消息的头中添加重试次数,并将消息重新发送出去,再每次重新消费时从头中判断重试次数,递增或递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。
  可以在Redis、Memcache或其他存储中存储消息唯一键(例如Guid、雪花Id等,但必须在发布消息时手动设置它),甚至在mysql中连同重试次数一起存储,然后在每次重新消费时递增/递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。
  队列使用Quorum类型,限制投递次数,超过次数消息被删除。
  队列消息过期
  设置过期时间,给队列或是消息设置TTL,重试一定次数消息达到过期时间后进入死信队列或丢弃(requeue设定为true)。
  也许还有更多好的方案...
 
  一次消费
  对外总是Ack
 
  消息到达了消费端,可因某些原因消费失败了,对外可以发送Ack,而在内部走额外的方式去执行补偿操作,比如将消息转发到内部的RabbitMQ或是其他处理方式,终归是只消费一次。

  var queueName = "alwaysack_queue";
  channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
  channel.BasicQos(0, 5, false);
  
  var consumer = new EventingBasicConsumer(channel);
  consumer.Received += (model, ea) =>
  {
      try
      {
          var message = ea.Body;
          Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
  
          if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
          {
              throw new Exception("模拟异常");
          }
      }
      catch (Exception ex)
      {
          Console.WriteLine(ex.Message);
      }
      finally
      {
          ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
      }
  };
  
  channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
  当消费端收到消息,处理时出现异常,可以另想办法去处理,而对外保持着ack的返回,以避免消息的循环消费。
 
 
 
 
  消息不重入队列
  在消费者端,因异常或是拒收消息时,对requeue设置为false时,如果设置了死信队列,则符合“消息被拒绝且不重入队列”这一进入死信队列的情况,从而避免消息反复重试。如未设置死信队列,则消息被丢失。
 
 
 
  此处假定接收100条消息,在接收到第50条消息时设置拒收,并且设置了requeue为false。

  var dlxExchangeName = "dlx_exchange";
  channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
  var dlxQueueName = "dlx_queue";
  channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
  channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
  
  var queueName = "nackorreject_queue";
  var arguments = new Dictionary<string, object>
  {
      { "x-dead-letter-exchange", dlxExchangeName }
  };
  channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
  channel.BasicQos(0, 5, false);
  
  var consumer = new EventingBasicConsumer(channel);
  consumer.Received += (model, ea) =>
  {
      var message = ea.Body;
      Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
  
      if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
      {
          Console.WriteLine("拒收");
          ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//关键在于requeue=false
          return;
      }
  
      ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
  };
  
  channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

(编辑:莱芜站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读