开发者

Best approach for thread synchronized queue

开发者 https://www.devze.com 2022-12-13 21:10 出处:网络
I have a queue in which I can enqueue different threads, so I can assure two things: Request are processed one by one.

I have a queue in which I can enqueue different threads, so I can assure two things:

  1. Request are processed one by one.
  2. Request are processed in the arriving order

Second point is important. Otherwise a simple critical section would be enough. I have different groups of requests and only inside a single group these points must be fulfilled. Requests from different groups can run concurrent.

It looks like this:

FTaskQueue.Enqueu('MyGroup');
try
  Do Something (running in context of some thread)
finally
  FTaskQueue.Dequeu('MyGroup');
end;

EDIT: I have removed 开发者_开发技巧the actual implementation because it hides the problem I want to solve

I need this because I have an Indy based web server that accepts http requests. First I find a coresponding session for the request. Then the request (code) is executed for that session. I can get multiple requests for the same session (read I can get new requests while the first is still processing) and they must execute one by one in correct order of arrival. So I seek a generic synchronization queue that can be use in such situations so requests can be queued. I have no control over the threads and each request may be executed in a different thread.

What is best (ususal) approach to this sort of problem? The problem is that Enqueue and Dequeue must be atomic opeations so that correct order is preserverd. My current implementation has a substantial bottleneck, but it works.

EDIT: Bellow is the problem of atomic Enqueue / Dequeue operations

You wold normaly do something like this:

procedure Enqueue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoEnqueue;
  finally 
    LeaveCriticalSection(FCritSec);
  end;

  BlockTheCurrentThread; // here the thread blocks itself
end;

procedure Dequeue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoDequeue;
    UnblockTheNextThread; // here the thread unblocks another thread
  finally 
    LeaveCriticalSection(FCritSec);
  end;
end;

Now the problem here is that this is not atomic. If you have one thread already in the queue and another one comes and calls Enqueue, it can happen, that the second thread will just leave the critical section and try to block itself. Now the thread scheduler will resume the first thread, which will try to unblock the next (second) thread. But second thread is not blocked yet, so nothing happens. Now the second thread continues and blocks itself, but that is not correct because it will not be unblocked. If blocking is inside critical section, that the critical section is never leaved and we have a deadlock.


Another approach:

Let each request thread have a manual reset event that is initially unset. The queue manager is a simple object which maintains a thread-safe list of such events. The Enqueue() and Dequeue() methods both take the event of the request thread as a parameter.

type
  TRequestManager = class(TObject)
  strict private
    fCritSect: TCriticalSection;
    fEvents: TList<TEvent>;
  public
    constructor Create;
    destructor Destroy; override;

    procedure Enqueue(ARequestEvent: TEvent);
    procedure Dequeue(ARequestEvent: TEvent);
  end;

{ TRequestManager }

constructor TRequestManager.Create;
begin
  inherited Create;
  fCritSect := TCriticalSection.Create;
  fEvents := TList<TEvent>.Create;
end;

destructor TRequestManager.Destroy;
begin
  Assert((fEvents = nil) or (fEvents.Count = 0));
  FreeAndNil(fEvents);
  FreeAndNil(fCritSect);
  inherited;
end;

procedure TRequestManager.Dequeue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(fEvents.Count > 0);
    Assert(fEvents[0] = ARequestEvent);
    fEvents.Delete(0);
    if fEvents.Count > 0 then
      fEvents[0].SetEvent;
  finally
    fCritSect.Release;
  end;
end;

procedure TRequestManager.Enqueue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(ARequestEvent <> nil);
    if fEvents.Count = 0 then
      ARequestEvent.SetEvent
    else
      ARequestEvent.ResetEvent;
    fEvents.Add(ARequestEvent);
  finally
    fCritSect.Release;
  end;
end;

Each request thread calls Enqueue() on the queue manager and afterwards waits for its own event to become signalled. Then it processes the request and calls Dequeue():

{ TRequestThread }

type
  TRequestThread = class(TThread)
  strict private
    fEvent: TEvent;
    fManager: TRequestManager;
  protected
    procedure Execute; override;
  public
    constructor Create(AManager: TRequestManager);
  end;

constructor TRequestThread.Create(AManager: TRequestManager);
begin
  Assert(AManager <> nil);
  inherited Create(TRUE);
  fEvent := TEvent.Create(nil, TRUE, FALSE, '');
  fManager := AManager;
  Resume;
end;

procedure TRequestThread.Execute;
begin
  fManager.Enqueue(fEvent);
  try
    fEvent.WaitFor(INFINITE);
    OutputDebugString('Processing request');
    Sleep(1000);
    OutputDebugString('Request processed');
  finally
    fManager.Dequeue(fEvent);
  end;
end;

{ TForm1 }

procedure TForm1.Button1Click(Sender: TObject);
var
  i: integer;
begin
  for i := 1 to 10 do
    TRequestThread.Create(fRequestManager);
end;

The queue manager locks the list of events both in Enqueue() and in Dequeue(). If the list is empty in Enqueue() it sets the event in the parameter, otherwise it resets the event. Then it appends the event to the list. Thus the first thread can continue with the request, all others will block. In Dequeue() the event is removed from the top of the list, and the next event is set (if there is any).

That way the last request thread will cause the next request thread to unblock, completely without suspending or resuming threads. This solution does also not need any additional threads or windows, a single event object per request thread is all that is needed.


I'll answer with the additional information from your comment taken into consideration.

If you have a number of threads that need to be serialized then you could make use of the serialization mechanism Windows provides for free. Let each queue be a thread with its own window and a standard message loop. Use SendMessage() instead of PostThreadMessage(), and Windows will take care of blocking the sending threads until the message has been processed, and of making sure that the correct execution order is maintained. By using a thread with its own window for each request group you make sure that multiple groups are still processed concurrently.

This is a simple solution that will work only if the request itself can be handled in a different thread context than it originated in, which shouldn't be a problem in many cases.


Did you try the TThreadList object provided by Delphi ?

It is thread safe and it manage the locks for you. You manage the list "outside" the thread, within your main thread.

As requests ask for a new task, you add it to the list. When a thread finishes, with the OnTerminate event you can call the next thread in the list.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号