开发者

Implementing a buffer to write data from multiple threads?

开发者 https://www.devze.com 2023-02-04 21:38 出处:网络
My program uses an iterator to traverse through a map, and spawns off a number of worker threads to process the points from the read iterator, which is all good. Now, I\'d like to write the output for

My program uses an iterator to traverse through a map, and spawns off a number of worker threads to process the points from the read iterator, which is all good. Now, I'd like to write the output for each point, and for that I'm using a memory buffer to ensure data is collected from the threads in the correct order before it is written to the file (via another iterator for writing):

public class MapMain
{
    // Multiple threads used here, each thread starts in Run() 
    // requests and processes map points

    public void Run()
    {
        // Get point from somewhere and process point
        int pointIndex = ...

        bufferWriter.StartPoint(pointIndex);

        // Perform a number of computations.
        // For simplicity, numberOfComputations = 1 in this example   
        bufferedWriter.BufferValue(pointIndex, value);

        bufferWriter.EndPoint(pointIndex); 
    }
}

My attempt at implementing a buffer:

public class BufferWriter
{
  private const int BufferSize = 4;

  private readonly IIterator iterator;
  private readonly float?[] bufferArray;
  private readonly bool[] bufferingCompleted;
  private readonly SortedDictionary<long, int> pointIndexToBufferIndexMap;
  private readonly object syncObject = new object();  

  private int bufferCount = 0;
  private int endBufferCount = 0;

  public BufferWriter(....)
  {
      iterator = ...
      bufferArray = new float?[BufferSize];
      bufferingCompleted = new bool[BufferSize];
      pointIndexToBufferIndexMap = new SortedDictionary<long, int>();
  }

  public void StartPoint(long pointIndex)
  {
    lock (syncObject)
    {
        if (bufferCount == BufferSize)
        {
            Monitor.Wait(syncObject);
        }

        pointIndexToBufferIndexMap.Add(pointIndex, bufferCount);   
        bufferCount++;
    }
  }

  public void BufferValue(long pointIndex, float value)
  {
      lock (syncObject)
      {
          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
          bufferArray[bufferIndex] = value;          
      }
  }

  public void EndPoint(long pointIndex)
  {
      lock (syncObject)
      {
          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
          bufferingCompleted[bufferIndex] = true;

          endBufferCount++;
          if (endBufferCount == BufferSize)
          {
              FlushBuffer();
              Monitor.PulseAll(syncObject);
          }
      }
  }

  private void FlushBuffer()
  {
      // Iterate in order of points
      foreach (long pointIndex in pointIndexToBufferIndexMap.Keys)
      {
          // Move iterator 
          iterator.MoveNext();

          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];

          if (bufferArray[bufferIndex].HasValue)
          {                  
              iterator.Current = bufferArray[bufferIndex];

              // Clear to null
              bufferArray[bufferIndex] = null;                  
          }
      }

      bufferCount = 0;
      endBufferCount = 0;
      pointIndexToBufferIndexMap.Clear();
  }        
}

I'm looking for feedback to fix and correct the bugs in my code and resolve any performance issues:

[1] In short: I have a fixed-size buffer that collects data from multiple threads processing points in somewhat random order. When the buffer gets completely filled with data, it has to be flushed. But what if I collected points 0 to 9 but point 8 was missing ? My buffer is already full and any point trying to use the buffer will block until a flush is performed, which needs point 8.

[2] Order of values in the buffer does not correspond to the order of the map points the values refer to. If this was the case, then I think flushing would be easier (array access faster than SortedDictionary retrieval time ?). In addition, this might allow us to reuse the flushed slots for incoming data (circular buffer ?)

But I can't think of a working model to achieve this.

[3] Buffer waits until it gets completely filled before flushing. There are many instances where a thread invokes EndPoint() and iterator.Current happens to refer to that point. It might make more sense to instantly "write" (i.e. call 'iterator.Current' and en开发者_开发问答umerate once) for that point, but how can this be done ?

Just to be clear, the writing iterator in BufferWriter has a buffer at its own level to cache values invoked on its Current property before writing to output, but I don't have to worry about it.

I feel like the whole thing needs to be rewritten from scratch !

Any help appreciated, Thank you.


I wouldn't do parallelism "by hand", farm it out to TPL or PLINQ. Since you are talking about a map you have fixed set of points that you could enumerate by coordinates and let PLINQ worry about parallelism.

Example:

// first get your map points, could be just a lazy iterator over every map point
IEnumerable<MapPoint> mapPoints = ...
//Now use PLINQ to compute in parallel, maintain order
var computedMapPoints = mapPoints.AsParallel()
                        .AsOrdered()
                        .Select(mappoint => ComputeMapPoint(mappoint)).ToList();


That's my solution that should work, although I haven't tested it. Add a new field:

private readonly Queue<AutoResetEvent> waitHandles = new Queue<AutoResetEvent>();

Two if's (Start and End) require changing to:

Start:

if (bufferCount == BufferSize)
{
    AutoResetEvent ev = new AutoResetEvent( false );
    waitHandles.Enqueue( ev );
    ev.WaitOne();
}

End:

if (endBufferCount == BufferSize)
{
   FlushBuffer();
   for ( int i = 0; i < Math.Min( waitHandles.Count, BufferSize ); ++i )
   {
      waitHandles.Dequeue().Set();
   }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消