开发者

What's the best way to ping many network devices in parallel?

开发者 https://www.devze.com 2023-02-08 13:31 出处:网络
I poll a lot of devices in network (more than 300) by iterative ping. The program polls the devices sequentially, so it\'s slow.

I poll a lot of devices in network (more than 300) by iterative ping.

The program polls the devices sequentially, so it's slow. I'd like to enhance the speed of polling.

There some ways to do this in Delphi 7:

  1. Each device has a thread doing ping. Manage threads manually.
  2. Learn and use Indy 10. Need examples.
  3. Use overlapped I/O based on window messages.
  4. Use completion ports based开发者_高级运维 on events.

What is faster, easier? Please, provide some examples or links for example.


Flooding the network with ICMP is not a good idea.

You might want to consider some kind of thread pool and queue up the ping requests and have a fixed number of threads doing the requests.


Personally I would go with IOCP. I'm using that very successfully for the transport implementation in NexusDB.

If you want to perform 300 send/receive cycles using blocking sockets and threads in parallel, you end up needing 300 threads.

With IOCP, after you've associated the sockets with the IOCP, you can perform the 300 send operations, and they will return instantly before the operation is completed. As the operations are completed, so called completion packages will be queued to the IOCP. You then have a pool of threads waiting on the IOCP, and the OS wakes them up as the completion packets come in. In reaction to completed send operations you can then perform the receive operations. The receive operations also return instantly and once actually completed get queued to the IOCP.

The real special thing about an IOCP is that it knows which threads belong to it and are currently processing completion packages. And the IOCP only wakes up new threads if the total number of active threads (not in a kernel mode wait state) is lower than the concurrency number of the IOCP (by default that equals the number of logical cores available on the machine). Also, if there are threads waiting for completion packages on the IOCP (which haven't been started yet despite completion packages being queued because the number of active threads was equal to the concurrancy number), the moment one of the threads that is currently processing a completion package enters a kernel mode wait state for any reason, one of the waiting threads is started.

Threads returning to the IOCP pick up completion packages in LIFO order. That is, if a thread is returning to the IOCP and there are completion packages still waiting, that thread directly picks up the next completion package, instead of being put into a wait state and the thread waiting for the longest time waking up.

Under optimal conditions, you will have a number of threads equal to the number of available cores running concurrently (one on each core), picking up the next completion package, processing it, returning to the IOCP and directly picking up the next completion package, all without ever entering a kernel mode wait state or a thread context switch having to take place.

If you would have 300 threads and blocking operations instead, not only would you waste at least 300 MB address space (for the reserved space for the stacks), but you would also have constant thread context switches as one thread enters a wait state (waiting for a send or receive to complete) and the next thread with a completed send or receive waking up. – Thorsten Engler 12 hours ago


Direct ICMP access is deprecated on windows. Direct access to the ICMP protocol on Windows is controlled. Due to malicious use of ICMP/ping/traceroute style raw sockets, I believe that on some versions of Windows you will need to use Windows own api. Windows XP, Vista, and Windows 7, in particular, don't let user programs access raw sockets.

I have used the canned-functionality in ICMP.dll, which is what some Delphi ping components do, but a comment below alerted me to the fact that this is considered "using an undocumented API interface".

Here's a sample of the main delphi ping component call itself:

function TICMP.ping: pIcmpEchoReply;
{var  }
begin
  // Get/Set address to ping
  if ResolveAddress = True then begin
    // Send packet and block till timeout or response
    _NPkts := _IcmpSendEcho(_hICMP, _Address,
                            _pEchoRequestData, _EchoRequestSize,
                            @_IPOptions,
                            _pIPEchoReply, _EchoReplySize,
                           _TimeOut);
    if _NPkts = 0 then begin
      result := nil;
      status := CICMP_NO_RESPONSE;
    end else begin
      result := _pIPEchoReply;
    end;
  end else begin
    status := CICMP_RESOLVE_ERROR;
    result := nil;
  end;
end;

I believe that most modern Ping component implementations are going to be based on a similar bit of code to the one above, and I have used it to run this ping operation in a background thread, without any probems. (Demo program included in link below).

Full sample source code for the ICMP.DLL based demo is here.

UPDATE A more modern IPHLPAPI.DLL sample is found at About.com here.


Here's an article from Delphi3000 showing how to use IOCP to create a thread pool. I am not the author of this code, but the author's information is in the source code.

I'm re-posting the comments and code here:

Everyone by now should understand what a thread is, the principles of threads and so on. For those in need, the simple function of a thread is to separate processing from one thread to another, to allow concurrent and parallel execution. The main principle of threads is just as simple, memory allocated which is referenced between threads must be marshalled to ensure safety of access. There are a number of other principles but this is really the one to care about.

And on..

A thread safe queue will allow multiple threads to add and remove, push and pop values to and from the queue safely on a First on First off basis. With an efficient and well written queue you can have a highly useful component in developing threaded applications, from helping with thread safe logging, to asynchronous processing of requests.

A thread pool is simply a thread or a number of threads which are most commonly used to manage a queue of requests. For example a web server which would have a continuous queue of requests needing to be processed use thread pools to manage the http requests, or a COM+ or DCOM server uses a thread pool to handle the rpc requests. This is done so there is less impact from the processing of one request to another, say if you ran 3 requests synchronously and the first request took 1 minute to complete, the second two requests would not complete for at least 1 minute adding on top there own time to process, and for most of the clients this is not acceptable.

So how to do this..

Starting with the queue!!

Delphi does provides a TQueue object which is available but is unfortunately not thread safe nor really too efficient, but people should look at the Contnrs.pas file to see how borland write there stacks and queues. There are only two main functions required for a queue, these are add and remove/push and pop. Add/push will add a value, pointer or object to the end of a queue. And remove/pop will remove and return the first value in the queue.

You could derive from TQueue object and override the protected methods and add in critical sections, this will get you some of the way, but I would want my queue to wait until new requests are in the queue, and put the thread into a state of rest while it waits for new requests. This could be done by adding in Mutexes or signaling events but there is an easier way. The windows api provides an IO completion queue which provides us with thread safe access to a queue, and a state of rest while waiting for new request in the queue.

Implementing the Thread Pool

The thread pool is going to be very simple and will manage x number of threads desired and pass each queue request to an event provided to be processed. There is rarely a need to implement a TThread class and your logic to be implemented and encapsulated within the execute event of the class, thus a simple TSimpleThread class can be created which will execute any method in any object within the context of another thread. Once people understand this, all you need to concern yourself with is allocated memory.

Here is how it is implemented.

TThreadQueue and TThreadPool implementation

(* Implemented for Delphi3000.com Articles, 11/01/2004
        Chris Baldwin
        Director & Chief Architect
        Alive Technology Limited
        http://www.alivetechnology.com
*)
unit ThreadUtilities;

uses Windows, SysUtils, Classes;

type
    EThreadStackFinalized = class(Exception);
    TSimpleThread = class;

    // Thread Safe Pointer Queue
    TThreadQueue = class
    private
        FFinalized: Boolean;
        FIOQueue: THandle;
    public
        constructor Create;
        destructor Destroy; override;
        procedure Finalize;
        procedure Push(Data: Pointer);
        function Pop(var Data: Pointer): Boolean;
        property Finalized: Boolean read FFinalized;
    end;

    TThreadExecuteEvent = procedure (Thread: TThread) of object;

    TSimpleThread = class(TThread)
    private
        FExecuteEvent: TThreadExecuteEvent;
    protected
        procedure Execute(); override;
    public
        constructor Create(CreateSuspended: Boolean; ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
    end;

    TThreadPoolEvent = procedure (Data: Pointer; AThread: TThread) of Object;

    TThreadPool = class(TObject)
    private
        FThreads: TList;
        FThreadQueue: TThreadQueue;
        FHandlePoolEvent: TThreadPoolEvent;
        procedure DoHandleThreadExecute(Thread: TThread);
    public
        constructor Create( HandlePoolEvent: TThreadPoolEvent; MaxThreads: Integer = 1); virtual;
        destructor Destroy; override;
        procedure Add(const Data: Pointer);
    end;

implementation

{ TThreadQueue }

constructor TThreadQueue.Create;
begin
    //-- Create IO Completion Queue
    FIOQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
    FFinalized := False;
end;

destructor TThreadQueue.Destroy;
begin
    //-- Destroy Completion Queue
    if (FIOQueue <> 0) then
        CloseHandle(FIOQueue);
    inherited;
end;

procedure TThreadQueue.Finalize;
begin
    //-- Post a finialize pointer on to the queue
    PostQueuedCompletionStatus(FIOQueue, 0, 0, Pointer($FFFFFFFF));
    FFinalized := True;
end;

(* Pop will return false if the queue is completed *)
function TThreadQueue.Pop(var Data: Pointer): Boolean;
var
    A: Cardinal;
    OL: POverLapped;
begin
    Result := True;
    if (not FFinalized) then
//-- Remove/Pop the first pointer from the queue or wait
        GetQueuedCompletionStatus(FIOQueue, A, Cardinal(Data), OL, INFINITE);

    //-- Check if we have finalized the queue for completion
    if FFinalized or (OL = Pointer($FFFFFFFF)) then begin
        Data := nil;
        Result := False;
        Finalize;
    end;
end;

procedure TThreadQueue.Push(Data: Pointer);
begin
    if FFinalized then
        Raise EThreadStackFinalized.Create('Stack is finalized');
    //-- Add/Push a pointer on to the end of the queue
    PostQueuedCompletionStatus(FIOQueue, 0, Cardinal(Data), nil);
end;

{ TSimpleThread }

constructor TSimpleThread.Create(CreateSuspended: Boolean;
  ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
begin
    FreeOnTerminate := AFreeOnTerminate;
    FExecuteEvent := ExecuteEvent;
    inherited Create(CreateSuspended);
end;

procedure TSimpleThread.Execute;
begin
    if Assigned(FExecuteEvent) then
        FExecuteEvent(Self);
end;

{ TThreadPool }

procedure TThreadPool.Add(const Data: Pointer);
begin
    FThreadQueue.Push(Data);
end;

constructor TThreadPool.Create(HandlePoolEvent: TThreadPoolEvent;
  MaxThreads: Integer);
begin
    FHandlePoolEvent := HandlePoolEvent;
    FThreadQueue := TThreadQueue.Create;
    FThreads := TList.Create;
    while FThreads.Count < MaxThreads do
        FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
end;

destructor TThreadPool.Destroy;
var
    t: Integer;
begin
    FThreadQueue.Finalize;
    for t := 0 to FThreads.Count-1 do
        TThread(FThreads[t]).Terminate;
    while (FThreads.Count > 0) do begin
        TThread(FThreads[0]).WaitFor;
        TThread(FThreads[0]).Free;
        FThreads.Delete(0);
    end;
    FThreadQueue.Free;
    FThreads.Free;
    inherited;
end;

procedure TThreadPool.DoHandleThreadExecute(Thread: TThread);
var
    Data: Pointer;
begin
    while FThreadQueue.Pop(Data) and (not TSimpleThread(Thread).Terminated) do begin
        try
            FHandlePoolEvent(Data, Thread);
        except
        end;
    end;
end;

end. 

As you can see it's quite straight forward, and with this you can implement very easily any queuing of requests over threads and really any type of requirement that requires threading can be done using these object and save you a lot of time and effort.

You can use this to queue requests from one thread to multiple threads, or queue requests from multiple threads down to one thread which makes this quite a nice solution.

Here are some examples of using these objects.

Thread safe logging

To allow multiple threads to asynchronously write to a log file.

uses Windows, ThreadUtilities,...;

type
    PLogRequest = ^TLogRequest;
    TLogRequest = record
        LogText: String;
    end;

    TThreadFileLog = class(TObject)
    private
        FFileName: String;
        FThreadPool: TThreadPool;
        procedure HandleLogRequest(Data: Pointer; AThread: TThread);
    public
        constructor Create(const FileName: string);
        destructor Destroy; override;
        procedure Log(const LogText: string);
    end;

implementation

(* Simple reuse of a logtofile function for example *)
procedure LogToFile(const FileName, LogString: String);
var
    F: TextFile;
begin
    AssignFile(F, FileName);
    if not FileExists(FileName) then
        Rewrite(F)
    else
        Append(F);
    try
        Writeln(F, DateTimeToStr(Now) + ': ' + LogString);
    finally
        CloseFile(F);
    end;
end;

constructor TThreadFileLog.Create(const FileName: string);
begin
    FFileName := FileName;
    //-- Pool of one thread to handle queue of logs
    FThreadPool := TThreadPool.Create(HandleLogRequest, 1);
end;

destructor TThreadFileLog.Destroy;
begin
    FThreadPool.Free;
    inherited;
end;

procedure TThreadFileLog.HandleLogRequest(Data: Pointer; AThread: TThread);
var
    Request: PLogRequest;
begin
    Request := Data;
    try
        LogToFile(FFileName, Request^.LogText);
    finally
        Dispose(Request);
    end;
end;

procedure TThreadFileLog.Log(const LogText: string);
var
    Request: PLogRequest;
begin
    New(Request);
    Request^.LogText := LogText;
    FThreadPool.Add(Request);
end;

As this is logging to a file it will process all requests down to a single thread, but you could do rich email notifications with a higher thread count, or even better, process profiling with what’s going on or steps in your program which I will demonstrate in another article as this one has got quite long now.

For now I will leave you with this, enjoy.. Leave a comment if there's anything people are stuck with.

Chris


Do you need a response from every machine on the network, or are these 300 machines just a subset of the larger network?

If you need a response from every machine, you could consider using a broadcast address or multicast address for your echo request.


Please give a try on "chknodes" parallel ping for Linux which will send a single ping to all nodes of your network. It will do also dns reverse lookup and request http response if specified so. It's written completely in bash i.e. you can easily check it or modify it to your needs. Here is a printout of help:

chknodes -h

chknodes ---- fast parallel ping

chknodes [-l|--log] [-h|--help] [-H|--http] [-u|--uninstall] [-v|--version] [-V|--verbose]

-l | --log Log to file -h | --help Show this help screen -H | --http Check also http response -n | --names Get also host names -u | --uninstall Remove installation -v | --version Show version -V | --verbose Show each ip address pinged

You need to give execute right for it (like with any sh/bash script) in order to run it:

chmod +x chknodes

On the first run i.e.

./chknodes

it will suggest to install itself to /usr/local/bin/chknodes, after that giving just

chknodes

will be enough. You can find it here:

www.homelinuxpc.com/download/chknodes

0

精彩评论

暂无评论...
验证码 换一张
取 消