前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解阻塞队列

深入理解阻塞队列

作者头像
Ryan_OVO
发布2023-10-18 20:22:41
1940
发布2023-10-18 20:22:41
举报
文章被收录于专栏:程序随笔程序随笔

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

先放张图:

根据前面的描述, 我们来考虑下阻塞队列在程序中会出现的问题: 阻塞队列 需要实现两个功能: 使线程等待与唤醒线程. 具体介绍如下: 在极端条件下, 需要挂起线程, 等待队列满足条件后,再去执行添加或提取 操作 待队列满足了条件之后, 通知线程去继续其挂起之前的操作.... 涉及到的技术: 线程同步 与 线程间通信 可能产生死锁的分析: 在某个时刻,队列为空或者是已满, 此时生产者未能存入数据或者还在存入数据到队列中, 这就会产生使得队列出错 如果此时, 消费者对队列在进行操作就会产生死锁...由于之前的生产者的操作使得队列出了问题并没有释放锁, 此时就会造成死锁 这是从预防死锁的角度来解决死锁问题 首先就是同步资源-队列的锁定,既然有锁那么就要考虑死锁问题,最后就是线程间的通信。

也就是说,实现阻塞队列需要考虑这三个点。

查了下资料,大多都是java的封装好的类库,不过没事,反正思想,理论都是一样的,不同的就是实现不同。但还是有个不错的C#实现----<< http://www.cnblogs.com/samgk/p/4772806.html C# 实现生产者消费者队列 >>。该文其实也道出了阻塞队列在除去生产者-消费者模型外的应用,昨天查资料的时候,阿里程序员写了篇文章关于邮件接收下载的,就是使用阻塞队列,但是我忘了原文在哪了。当时看的时候,想起来当初看<<C#高级编程>>第十章的管道。书上介绍的是:开一个task去读取文件名,放到阻塞队列中,然后开一个队列根据文件名读取内容,这个应用于邮件接收下载是一样的。暂时先不说这个了,有兴趣的可以自己去看看那本书。 那么我们如何自己实现阻塞队列呢?正如上面说到的考虑点,同步,线程通信,防止死锁。看看代码:

代码语言:javascript
复制
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace SuiBao.Utility
{
    ///阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
    
    //阻塞队列 需要实现两个功能: 使线程等待与唤醒线程. 具体介绍如下:
    // 在极端条件下, 需要挂起线程, 等待队列满足条件后,再去执行添加或提取 操作
    // 待队列满足了条件之后, 通知线程去继续其挂起之前的操作....
    //涉及到的技术:
    //线程同步(此实例用到了lock) 与 线程间通信(此示例用到了event)
    //
    
    // 可能产生死锁的分析:
    // 在某个时刻,队列为空或者是已满, 此时生产者未能存入数据或者还在存入数据到队列中, 这就会产生使得队列出错
    // 如果此时, 消费者对队列在进行操作就会产生死锁...由于之前的生产者的操作使得队列出了问题并没有释放锁, 此时就会造成死锁
    // 这是从预防死锁的角度来解决死锁问题
    
    public class BlockQueue<T>
    {
        private Queue<T> _inner_queue = null;

        private ManualResetEvent _dequeue_wait = null;

        public int Count
        {
            get { return _inner_queue.Count; }
        }

        public BlockQueue(int capacity = -1)
        {
            this._inner_queue = capacity == -1 ? new Queue<T>() : new Queue<T>(capacity);
            this._dequeue_wait = new ManualResetEvent(false);
        }

        // 入队加锁
        public void EnQueue(T item)
        {
            if (this._IsShutdown == true) throw new InvalidOperationException("服务未开启.[EnQueue]");

            lock (this._inner_queue)
            {
                this._inner_queue.Enqueue(item);
                this._dequeue_wait.Set();
            }
        }

        // 出队加锁
        public T DeQueue(int waitTime)
        {
            bool _queueEmpty = false;
            T item = default(T);
            while (true)
            {
                lock (this._inner_queue)
                {
                    // 判断队列中是否有元素....
                    if (this._inner_queue.Count > 0)
                    {
                        item = this._inner_queue.Dequeue();
                        this._dequeue_wait.Reset();
                        //break;
                    }
                    else
                    {
                        if (this._IsShutdown == true)
                        {
                            throw new InvalidOperationException("服务未开启[DeQueue].");
                        }
                        else
                        {
                            _queueEmpty = true;
                        }
                    }
                }
                if (item != null)
                {
                    return item;
                }
                if (_queueEmpty)
                {
                    this._dequeue_wait.WaitOne(waitTime);
                }
            }

        }

        private bool _IsShutdown = false;

        public void Shutdown()
        {
            this._IsShutdown = true;
            this._dequeue_wait.Set();
        }

        public void Clear()
        {
            this._inner_queue.Clear();
        }
    }
}

那么.net中有没有封装好的阻塞队列?有啊!BlockingCollection<>类,其实我之前写的好些关于线程的文章都说到了这个类库,用到的地方也多。该类默认的容器是ConcurrentQueue,因此,同步就做好了,而且该类还实现了阻塞的功能: 多个线程或任务可同时向集合添加项,如果集合达到其指定最大容量,则制造线程将发生阻塞,直到移除集合中的某个项。 多个使用者可以同时移除项,如果集合变空,则使用线程将发生阻塞,直到制造者添加某个项。 制造线程可调用 CompleteAdding 来指示不再添加项。 使用者将监视 IsCompleted 属性以了解集合何时为空且不再添加项。 原文说明 BlockingCollection 概述 (https://docs.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview ) 感慨一句,微软的好东西是真多,为什么不能像java那样轻易地被人发现使用呢? 没错,我们使用这个类就可以轻易地实现阻塞队列了,而且是完美的实现.

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-12-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com