美文网首页
BlockingCollection 源码分析(一)

BlockingCollection 源码分析(一)

作者: TakumiWu | 来源:发表于2017-12-12 16:27 被阅读0次

BlockingCollection是一个线程安全的生产者-消费者集合。

为线程安全集合提供阻塞和绑定功能,允许多线程安全的添加和删除元素;

一 属性介绍

1.获取此 BlockingCollection<T> 实例的限定容量,CheckDisposed方法检查该集合对象是否被释放,被释放会抛出ObjectDisposedException异常

 public int BoundedCapacity
{
    get
     {
      CheckDisposed();
      return m_boundedCapacity;
     }
} 

2.Count 返回集合对象内具体的数量 先检查对象是否被释放,然后返回对象总数,m_occupiedNodes是一个SemaphoreSlim对象。

public int Count
        {
            get
            {
                CheckDisposed();
                return m_occupiedNodes.CurrentCount;
            }
        }

3.IsAddingCompleted 获取此 BlockingCollection<T> 是否已标记为已完成添加,m_currentAdder是一个不被编译器优化的int类型参数

 public bool IsAddingCompleted
  {
            get
            {
                CheckDisposed();
                return (m_currentAdders == COMPLETE_ADDING_ON_MASK); }}

4.IsCompleted获取一个值,该值指示此 BlockingCollection<T> 是否已标记为已完成添加并且为空。

public bool IsCompleted
        {
            get
            {
                CheckDisposed();
                return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0));
            }
        }

二 方法
1.构造函数BlockingCollection 如果初始化的boundedCapacity参数小于1是会抛出参数范围溢出的异常,从资源文件获得错误信息字符串,默认是用ConcurrentQueue做为集合的存储数据的集合,ConcurrentQueue是安全的先进先出 (FIFO) 集合。
构造函数还允许传进一个实现了IProducerConsumerCollection生产消费者的接口的对象,如果改对象的数值大于BlockingCollection的boundedCapacity会抛出异常,这就要注意下集合的对象和传入的boundedCapacity是否一致。

 public BlockingCollection()
            : this(new ConcurrentQueue<T>())
        {
        }
 public BlockingCollection(int boundedCapacity)
            : this(new ConcurrentQueue<T>(), boundedCapacity)
        {
        }
 public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)
        {
            if (boundedCapacity < 1)
            {
                throw new ArgumentOutOfRangeException(
                    "boundedCapacity", boundedCapacity,
                    SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange));
            }
            if (collection == null)
            {
                throw new ArgumentNullException("collection");
            }
            int count = collection.Count;
            if (count > boundedCapacity)
            {
                throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity));
            }
            Initialize(collection, boundedCapacity, count);
        }
public BlockingCollection(IProducerConsumerCollection<T> collection)
        {
            if (collection == null)
            {
                throw new ArgumentNullException("collection");
            }
            Initialize(collection, NON_BOUNDED, collection.Count);
        }
private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)
        {
            Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED);
            m_collection = collection;
            m_boundedCapacity = boundedCapacity; ;
            m_isDisposed = false;
            m_ConsumersCancellationTokenSource = new CancellationTokenSource();
            m_ProducersCancellationTokenSource = new CancellationTokenSource();
            if (boundedCapacity == NON_BOUNDED)
            {
                m_freeNodes = null;
            }
            else
            {
                Debug.Assert(boundedCapacity > 0);
                m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
            }
            m_occupiedNodes = new SemaphoreSlim(collectionCount);
        }
  1. Add 往集合添加元素方法
    值可以是Null值,如果初始化的此实例时,指定了一个有限的容量,则在有空间可以存储提供的项之前,可能会阻止调用 Add。TryAddWithNoTimeValidation看字面意思是尝试增加没有时间验证的意思,会返回一个增加是否成功的值,第一个参数是项,第二个参数设置超时时间单位是毫秒,取消时间的token对象
 public void Add(T item)
        {
#if DEBUG
            bool tryAddReturnValue =
#endif
            TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken());
#if DEBUG
            Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
#endif
        }
 private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
        {
           /// 检查集合对象是否被销毁了
            CheckDisposed();
           /// 检查线程取消是不是被取消了
            if (cancellationToken.IsCancellationRequested)
                throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
            /// 检查是不是集合正在添加
            if (IsAddingCompleted)
            {
                throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
            }

            bool waitForSemaphoreWasSuccessful = true;
            /// 有剩余的位置的节点对象不为空
            if (m_freeNodes != null)
            {
                //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding()
                //was called concurrently with Adding which is not supported by BlockingCollection.
                CancellationTokenSource linkedTokenSource = null;
                try
                {
                    waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0);
                    if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
                    {
                        linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
                            cancellationToken, m_ProducersCancellationTokenSource.Token);
                        waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
                    }
                }
                catch (OperationCanceledException)
                {
                    //if cancellation was via external token, throw an OCE
                    if (cancellationToken.IsCancellationRequested)
                        throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);

                    //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
                    //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested);

                    throw new InvalidOperationException
                        (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd));
                }
                finally
                {
                    if (linkedTokenSource != null)
                    {
                        linkedTokenSource.Dispose();
                    }
                }
            }
            if (waitForSemaphoreWasSuccessful)
            {
                // Update the adders count if the complete adding was not requested, otherwise
                // spins until all adders finish then throw IOE
                // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
                // not been finished yet
                SpinWait spinner = new SpinWait();
                while (true)
                {
                    int observedAdders = m_currentAdders;
                    if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
                    {
                        spinner.Reset();
                        // CompleteAdding is requested, spin then throw
                        while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
                        throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
                    }
                    if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders)
                    {
                        Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit.");
                        break;
                    }
                    spinner.SpinOnce();
                }

                // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if:
                // 1- m_collection.TryAdd threw an exception
                // 2- m_collection.TryAdd succeeded
                // 3- m_collection.TryAdd returned false
                // so we put the decrement code in the finally block
                try
                {

                    //TryAdd is guaranteed to find a place to add the element. Its return value depends
                    //on the semantics of the underlying store. Some underlying stores will not add an already
                    //existing item and thus TryAdd returns false indicating that the size of the underlying
                    //store did not increase.


                    bool addingSucceeded = false;
                    try
                    {
                        //The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
                        //This fixes bug #702328, case 2 of 2.
                        cancellationToken.ThrowIfCancellationRequested();
                        addingSucceeded = m_collection.TryAdd(item);
                    }
                    catch
                    {
                        //TryAdd did not result in increasing the size of the underlying store and hence we need
                        //to increment back the count of the m_freeNodes semaphore.
                        if (m_freeNodes != null)
                        {
                            m_freeNodes.Release();
                        }
                        throw;
                    }
                    if (addingSucceeded)
                    {
                        //After adding an element to the underlying storage, signal to the consumers 
                        //waiting on m_occupiedNodes that there is a new item added ready to be consumed.
                        m_occupiedNodes.Release();
                    }
                    else
                    {
                        throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed));
                    }
                }
                finally
                {
                    // decrement the adders count
                    Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
                    Interlocked.Decrement(ref m_currentAdders);
                }


            }
            return waitForSemaphoreWasSuccessful;
        }

未完待续

相关文章

网友评论

      本文标题:BlockingCollection 源码分析(一)

      本文链接:https://www.haomeiwen.com/subject/dbpgixtx.html