开发者

How might a class like .NET's ConcurrentBag<T> be implemented?

开发者 https://www.devze.com 2022-12-11 05:37 出处:网络
I find myself very intrigued by the existence of a ConcurrentBag<T> class in the upcoming .NET 4.0 framework:

I find myself very intrigued by the existence of a ConcurrentBag<T> class in the upcoming .NET 4.0 framework:

Bags are useful for storing objects when ordering doesn't matter, and unlike sets, bags support duplicates.

My question is: how might this idea be implemented? Most collections I'm familiar with essentially amount to (under the hood) some form of array, in which order may not "matter," but there is an order (which is why, even though it doesn't need to, enumeration will pretty much always go through an unchanged collection, be it List, Queue, Stack, etc. in the same sequence).

If I had to guess, I might suggest that internally it could be a Dictionary<T, LinkedList<T>>; but that actually seems quite dubious considering it wouldn't make sense to use just any type T as a key.

What I'm expecting/hoping is that this is actually an established object type that has already been "figured out" somewhere, and that somebody who knows of this established type can tell me about it. It's just so unusual to me--one of those concepts that's easy to understand in real life, but is difficult to translate into a usable class as a developer--which is why I'm curious as to the possibilities.

EDIT:

Some responders have suggested that a Bag could be a form of a hashtable internally. This was my initial thought as well, but I foresaw two problems with this idea:

  1. A hashtable is not all that useful when you don't have a suitable hashcode function for the type in question.
  2. Simply tracking an object's "count" in a collection is not the same as storing the object.

As Meta-Knight suggested, perhaps an example would make this more clear:

public class ExpensiveObject() {
    private ExpensiveObject() {
        // very intense operations happening in here
    }

    public Expen开发者_JAVA技巧siveObject CreateExpensiveObject() {
        return new ExpensiveObject();
    }
}

static void Main() {
    var expensiveObjects = new ConcurrentBag<ExpensiveObject>();

    for (int i = 0; i < 5; i++) {
        expensiveObjects.Add(ExpensiveObject.CreateExpensiveObject());
    }

    // after this point in the code, I want to believe I have 5 new
    // expensive objects in my collection

    while (expensiveObjects.Count > 0) {
        ExpensiveObject expObj = null;
        bool objectTaken = expensiveObjects.TryTake(out expObj);
        if (objectTaken) {
            // here I THINK I am queueing a particular operation to be
            // executed on 5 separate threads for 5 separate objects,
            // but if ConcurrentBag is a hashtable then I've just received
            // the object 5 times and so I am working on the same object
            // from 5 threads at the same time!
            ThreadPool.QueueUserWorkItem(DoWorkOnExpensiveObject, expObj);
        } else {
            break;
        }
    }
}

static void DoWorkOnExpensiveObject(object obj) {
    ExpensiveObject expObj = obj as ExpensiveObject;
    if (expObj != null) {
        // some work to be done
    }
}


If you look at the details of ConcurrentBag<T>, you'll find that it's, internally, basically a customized linked list.

Since Bags can contain duplicates, and are not accessible by index, a doubly linked list is a very good option for implementation. This allows locking to be fairly fine grained for insert and removal (you don't have to lock the entire collection, just the nodes around where you're inserting/removing). Since you're not worried about duplicates, no hashing is involved. This makes a double linked list perfect.


There's some good info on ConcurrentBag here: http://geekswithblogs.net/BlackRabbitCoder/archive/2011/03/03/c.net-little-wonders-concurrentbag-and-blockingcollection.aspx

The way that the ConcurrentBag works is to take advantage of the new ThreadLocal type (new in System.Threading for .NET 4.0) so that each thread using the bag has a list local to just that thread.

This means that adding or removing to a thread-local list requires very low synchronization. The problem comes in where a thread goes to consume an item but it’s local list is empty. In this case the bag performs “work-stealing” where it will rob an item from another thread that has items in its list. This requires a higher level of synchronization which adds a bit of overhead to the take operation.


Since ordering doesn't matter a ConcurrentBag could be using a hashtable behind the scenes to allow for fast retrieval of data. But unlike a Hashset a bag accepts duplicates. Maybe each item could be paired with a Count property which is set to 1 when an item is added. If you add the same item for a second time, you could just increment the Count property of this item.

Then, to remove an item which has a count greater than one, you could just decrease the Count for this item. If the count was one, you would remove the Item-Count pair from the hashtable.


Well, in smalltalk (where the notion of a Bag came from), the collection is basically the same as a hash, albeit one that allows duplicates. Instead of storing the duplicate object though, it maintains an "occurrence count", e.g., a refcount of each object. If ConcurrentBag is a faithful implementation, this should give you a starting point.


I believe the concept of a 'Bag' is synonymous with 'Multiset'.

There are a number of "Bag"/"Multiset" implementations (these happen to be java) that are open source if you are interested in how they are implemented.

These implementations show that a 'Bag' can be implemented in any number of ways depending on your needs. There are examples of TreeMultiset, HashMultiset, LinkedHashMultiset, ConcurrentHashMultiset.

Google Collections
Google has a number of "MultiSet" implementations, one being a ConcurrentHashMultiset.

Apache Commons
Apache has a number of "Bag" implementations.


The System.Collections.Concurrent namespace is now open source, and the implementation for ConcurrentBag can now be found here:

https://github.com/dotnet/runtime/blob/main/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentBag.cs

Below is the implementation as of Jan 30, 2022. It is MIT licensed.

    // Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;

namespace System.Collections.ObjectModel
{
    [Serializable]
    [DebuggerTypeProxy(typeof(CollectionDebugView<>))]
    [DebuggerDisplay("Count = {Count}")]
    [System.Runtime.CompilerServices.TypeForwardedFrom("mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089")]
    public abstract class KeyedCollection<TKey, TItem> : Collection<TItem> where TKey: notnull
    {
        private const int DefaultThreshold = 0;

        private readonly IEqualityComparer<TKey> comparer; // Do not rename (binary serialization)
        private Dictionary<TKey, TItem>? dict; // Do not rename (binary serialization)
        private int keyCount; // Do not rename (binary serialization)
        private readonly int threshold; // Do not rename (binary serialization)

        protected KeyedCollection() : this(null, DefaultThreshold)
        {
        }

        protected KeyedCollection(IEqualityComparer<TKey>? comparer) : this(comparer, DefaultThreshold)
        {
        }

        protected KeyedCollection(IEqualityComparer<TKey>? comparer, int dictionaryCreationThreshold)
            : base(new List<TItem>()) // Be explicit about the use of List<T> so we can foreach over
                                      // Items internally without enumerator allocations.
        {
            if (dictionaryCreationThreshold < -1)
            {
                throw new ArgumentOutOfRangeException(nameof(dictionaryCreationThreshold), SR.ArgumentOutOfRange_InvalidThreshold);
            }

            this.comparer = comparer ?? EqualityComparer<TKey>.Default;
            threshold = dictionaryCreationThreshold == -1 ? int.MaxValue : dictionaryCreationThreshold;
        }

        /// <summary>
        /// Enables the use of foreach internally without allocations using <see cref="List{T}"/>'s struct enumerator.
        /// </summary>
        private new List<TItem> Items
        {
            get
            {
                Debug.Assert(base.Items is List<TItem>);
                return (List<TItem>)base.Items;
            }
        }

        public IEqualityComparer<TKey> Comparer => comparer;

        public TItem this[TKey key]
        {
            get
            {
                TItem item;
                if (TryGetValue(key, out item!))
                {
                    return item;
                }

                throw new KeyNotFoundException(SR.Format(SR.Arg_KeyNotFoundWithKey, key.ToString()));
            }
        }

        public bool Contains(TKey key)
        {
            if (key == null)
            {
                throw new ArgumentNullException(nameof(key));
            }

            if (dict != null)
            {
                return dict.ContainsKey(key);
            }

            foreach (TItem item in Items)
            {
                if (comparer.Equals(GetKeyForItem(item), key))
                {
                    return true;
                }
            }

            return false;
        }

        public bool TryGetValue(TKey key, [MaybeNullWhen(false)] out TItem item)
        {
            if (key == null)
            {
                throw new ArgumentNullException(nameof(key));
            }

            if (dict != null)
            {
                return dict.TryGetValue(key, out item!);
            }

            foreach (TItem itemInItems in Items)
            {
                TKey keyInItems = GetKeyForItem(itemInItems);
                if (keyInItems != null && comparer.Equals(key, keyInItems))
                {
                    item = itemInItems;
                    return true;
                }
            }

            item = default;
            return false;
        }

        private bool ContainsItem(TItem item)
        {
            TKey key;
            if ((dict == null) || ((key = GetKeyForItem(item)) == null))
            {
                return Items.Contains(item);
            }

            TItem itemInDict;
            if (dict.TryGetValue(key, out itemInDict!))
            {
                return EqualityComparer<TItem>.Default.Equals(itemInDict, item);
            }

            return false;
        }

        public bool Remove(TKey key)
        {
            if (key == null)
            {
                throw new ArgumentNullException(nameof(key));
            }

            if (dict != null)
            {
                TItem item;
                return dict.TryGetValue(key, out item!) && Remove(item);
            }

            for (int i = 0; i < Items.Count; i++)
            {
                if (comparer.Equals(GetKeyForItem(Items[i]), key))
                {
                    RemoveItem(i);
                    return true;
                }
            }

            return false;
        }

        protected IDictionary<TKey, TItem>? Dictionary => dict;

        protected void ChangeItemKey(TItem item, TKey newKey)
        {
            if (!ContainsItem(item))
            {
                throw new ArgumentException(SR.Argument_ItemNotExist, nameof(item));
            }

            TKey oldKey = GetKeyForItem(item);
            if (!comparer.Equals(oldKey, newKey))
            {
                if (newKey != null)
                {
                    AddKey(newKey, item);
                }
                if (oldKey != null)
                {
                    RemoveKey(oldKey);
                }
            }
        }

        protected override void ClearItems()
        {
            base.ClearItems();
            dict?.Clear();
            keyCount = 0;
        }

        protected abstract TKey GetKeyForItem(TItem item);

        protected override void InsertItem(int index, TItem item)
        {
            TKey key = GetKeyForItem(item);
            if (key != null)
            {
                AddKey(key, item);
            }

            base.InsertItem(index, item);
        }

        protected override void RemoveItem(int index)
        {
            TKey key = GetKeyForItem(Items[index]);
            if (key != null)
            {
                RemoveKey(key);
            }

            base.RemoveItem(index);
        }

        protected override void SetItem(int index, TItem item)
        {
            TKey newKey = GetKeyForItem(item);
            TKey oldKey = GetKeyForItem(Items[index]);

            if (comparer.Equals(oldKey, newKey))
            {
                if (newKey != null && dict != null)
                {
                    dict[newKey] = item;
                }
            }
            else
            {
                if (newKey != null)
                {
                    AddKey(newKey, item);
                }

                if (oldKey != null)
                {
                    RemoveKey(oldKey);
                }
            }

            base.SetItem(index, item);
        }

        private void AddKey(TKey key, TItem item)
        {
            if (dict != null)
            {
                dict.Add(key, item);
            }
            else if (keyCount == threshold)
            {
                CreateDictionary();
                dict!.Add(key, item);
            }
            else
            {
                if (Contains(key))
                {
                    throw new ArgumentException(SR.Format(SR.Argument_AddingDuplicate, key), nameof(key));
                }

                keyCount++;
            }
        }

        private void CreateDictionary()
        {
            dict = new Dictionary<TKey, TItem>(comparer);
            foreach (TItem item in Items)
            {
                TKey key = GetKeyForItem(item);
                if (key != null)
                {
                    dict.Add(key, item);
                }
            }
        }

        private void RemoveKey(TKey key)
        {
            Debug.Assert(key != null, "key shouldn't be null!");
            if (dict != null)
            {
                dict.Remove(key);
            }
            else
            {
                keyCount--;
            }
        }
    }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消