最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

.NetCore利用BlockingCollection实现简易消息队列

来源:动视网 责编:小采 时间:2020-11-27 22:34:50
文档

.NetCore利用BlockingCollection实现简易消息队列

.NetCore利用BlockingCollection实现简易消息队列:消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。 我们用BlockingCollection来实现简单的消息队列。 BlockingCollection实现了生产者/消费者模式,是对IProducerConsumerCollection<T>接口的实现。与其他Concu
推荐度:
导读.NetCore利用BlockingCollection实现简易消息队列:消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。 我们用BlockingCollection来实现简单的消息队列。 BlockingCollection实现了生产者/消费者模式,是对IProducerConsumerCollection<T>接口的实现。与其他Concu


消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。

我们用BlockingCollection来实现简单的消息队列。

BlockingCollection实现了生产者/消费者模式,是对IProducerConsumerCollection<T>接口的实现。与其他Concurrent集合一样,每次Add或Take元素,都会导致对集合的lock。只有当确定需要在内存中创建一个生产者,消费者模式时,再考虑这个类。

MSDN中的示例用法:

using (BlockingCollection<int> bc = new BlockingCollection<int>())
 {
 Task.Factory.StartNew(() =>
 {
 for (int i = 0; i < 1000; i++)
 {
 bc.Add(i);
 Thread.Sleep(50); 
 }
 
 
 // Need to do this to keep foreach below from hanging
 bc.CompleteAdding();
 });
 
 
 // Now consume the blocking collection with foreach.
 // Use bc.GetConsumingEnumerable() instead of just bc because the
 // former will block waiting for completion and the latter will
 // simply take a snapshot of the current state of the underlying collection.
 foreach (var item in bc.GetConsumingEnumerable())
 {
 Console.WriteLine(item);
 }
 }

实现消息队列

用Vs2017创建一个控制台应用程序。创建DemoQueueBlock类,封装一些常用判断。

  • HasEle,判断是否有元素
  • Add向队列中添加元素
  • Take从队列中取出元素
  • 为了不把BlockingCollection直接暴漏给使用者,我们封装一个DemoQueueBlock类

     /// <summary>
     /// BlockingCollection演示消息队列
     /// </summary>
     /// <typeparam name="T"></typeparam>
     public class DemoQueueBlock<T> where T : class
     {
     private static BlockingCollection<T> Colls;
     public DemoQueueBlock()
     {
    
     }
     public static bool IsComleted() {
     if (Colls != null && Colls.IsCompleted) {
     return true;
     }
     return false;
     }
     public static bool HasEle()
     {
     if (Colls != null && Colls.Count>0)
     {
     return true;
     }
     return false;
     }
     
     public static bool Add(T msg)
     {
     if (Colls == null)
     {
     Colls = new BlockingCollection<T>();
     }
     Colls.Add(msg);
     return true;
     }
     public static T Take()
     {
     if (Colls == null)
     {
     Colls = new BlockingCollection<T>();
     }
     return Colls.Take();
     }
     }
    
     /// <summary>
     /// 消息体
     /// </summary>
     public class DemoMessage
     {
     public string BusinessType { get; set; }
     public string BusinessId { get; set; }
     public string Body { get; set; }
     }
    
    

    添加元素进队列

    通过控制台,添加元素

     //添加元素
     while (true)
     {
     Console.WriteLine("请输入队列");
     var read = Console.ReadLine();
     if (read == "exit")
     {
     return;
     }
    
     DemoQueueBlock<DemoMessage>.Add(new DemoMessage() { BusinessId = read });
     }
    

    消费队列

    通过判断IsComleted,来确定是否获取队列

     Task.Factory.StartNew(() =>
     {
     //从队列中取元素。
     while (!DemoQueueBlock<DemoMessage>.IsComleted())
     {
     try
     {
     var m = DemoQueueBlock<DemoMessage>.Take();
     Console.WriteLine("已消费:" + m.BusinessId);
     }
     catch (Exception ex)
     {
     Console.WriteLine(ex.Message);
     }
     }
     });
    

    查看运行结果

    运行结果

    这样我们就实现了简易的消息队列。

    示例源码:简易队列

    文档

    .NetCore利用BlockingCollection实现简易消息队列

    .NetCore利用BlockingCollection实现简易消息队列:消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。 我们用BlockingCollection来实现简单的消息队列。 BlockingCollection实现了生产者/消费者模式,是对IProducerConsumerCollection<T>接口的实现。与其他Concu
    推荐度:
    • 热门焦点

    最新推荐

    猜你喜欢

    热门推荐

    专题
    Top