开发者

C# Wait until all threads terminated in ThreadPool

开发者 https://www.devze.com 2022-12-20 07:40 出处:网络
I have one main thread, and many other background threads. The main usage of those background threads is to query data (many queries from the web, which is why I create multiple threads: to avoid the

I have one main thread, and many other background threads.

The main usage of those background threads is to query data (many queries from the web, which is why I create multiple threads: to avoid the lagging the user interface).

When it comes to exporting the data in the main thread (the user interface), I need to wait until all the other threads are finished.

My code is:

//...code to open save file dialog...

//this loop is to wait for all the threads finish their query
//QueryThread.threadCount is the count of the background threads
while (QueryThread.threadCount != 0)
{
    Thread.CurrentThread.Join(1000);
    Console.WriteLine(QueryThread.threadCount);
}

//...code to export data...

If I comment the while loop out, the program will run smoothly, but some of my exported data will have possibility of showing some "unwanted" material since some of the background threads haven't finished their work.

However, the above while loop is infinite, the threadCount never changes, which means during the "Join()" method, no background thread is running.

Why are the background threads blocked and how can I solve开发者_Go百科 the problem?

Thanks a lot!


You are calling the Join method on the current thread which doesn't make much sense. You should call it on your worker threads:

foreach (Thread thread in workerThreads)
{
    thread.Join(1000);
}

Unfortunately this kind of defeats the purpose of using threads because it will block the calling until all other threads are finished.

The RunWorkerCompleted event of a BackgroundWorker could be used to notify the completion of a background task and perform updates on the form.


Your implementation is wrong, and you shouldn't use Join as a synchronization primitive between your threads.

What you should do is implement the producer-consumer pattern. This will allow you to have the threads waiting to do work, and then come alive to do that work when you place it in the queue.

However, the change that I would make is that from the UI thread, don't add the data directly to the queue that the producer and consumer share. Rather, make a copy of that data and then put that in the queue.

For more information on how to implement the producer-consumer pattern in .NET, I suggest you read the MSDN documentation titled "How to: Synchronize a Producer and a Consumer Thread (C# Programming Guide)"


I think you want to look into signalling. Have signals (ManualResetEvent/AutoResetEvent) for your threads. Set() the associated signal handle in the worker thread when it´s done. In the main thread do a `WaitAll(signal1, signal2,signal3)´ to wait for the completion of your worker threads.

Hope this helps,


I couldn´t resist trying some myself. I´m sure there´s room for improvement but i think it shows how to deal with some multi threading issues, including the original question.

Form.cs


namespace STAFormWithThreadPoolSync
{
    internal delegate void WorkerEvent(WorkerEventInfo info);

    public partial class Form1 : Form
    {
        // We'll create a state object for each worker process
        List<WorkerState> workerStates = new List<WorkerState>();

        public Form1()
        {
            InitializeComponent();
        }

        // Executed in the main thread
        private void button1_Click(object sender, EventArgs e)
        {
            workersList.Items.Clear();

            // Read the amount of thread we should start from the form
            int threadCountToUse = (int)ThreadCount.Value;

            WorkerEvent woEvent = new WorkerEvent(this.workerEventOccured);

            // Start up all threads
            for (int counter = 0; counter < threadCountToUse; ++counter)
            {
                // An object we can pass values into for the worker process to use.
                WorkerState workerState = new WorkerState();

                workerState.OnStarted += woEvent;
                workerState.OnFinished += woEvent;

                // Register for the signal (and store its registered wait handle in the stateObj, which we also pass into the parameters!)
                workerState.registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject(workerState.finishSignal, this.ItemHasFinished, workerState, -1, true); 

                // Store the state object for later use.
                workerStates.Add(workerState);
            }

            WorkersProgress.Minimum = 0;
            WorkersProgress.Maximum = workerStates.Count;

            workerStates.ForEach(workerState =>
                {
                    // Fire of the worker thread (with the state object)
                    ThreadPool.QueueUserWorkItem(this.ProcessItem, workerState);
                }
            );



            button1.Enabled = false;
            CurrentResult.Value = 0;
            CurrentResultLabel.Text = "Current value";
            ProgressTimer.Start();
        }

        // event is run on the callers thread, so carefull accessing our controls on our form.
        internal void workerEventOccured(WorkerEventInfo info)
        {
            if (this.workersList.InvokeRequired)
            {
                WorkerEvent workerEvent = new WorkerEvent(workerEventOccured);
                this.Invoke(workerEvent, new object[] { info });
            }
            else
            {
                switch (info.eventType)
                {
                    case EventType.WorkerStarted:
                        this.workersList.Items.Add(String.Format("Worker started on thread : {0}", info.workerState.threadId));
                        break;

                    case EventType.WorkerEnded:
                        this.workersList.Items.Add(String.Format("Worker finished on thread : {0}", info.workerState.threadId));
                        break;
                    case EventType.AllWorkersFinished:
                        this.workersList.Items.Add("ALL workers finished");
                        ProgressTimer.Stop();
                        button1.Enabled = true;
                        CurrentResultLabel.Text = "Final value";
                        break;
                }
            }
        }

        // Executed in threadpool thread.
        private void ProcessItem(object state)
        {
            WorkerState workerState = state as WorkerState;
            int threadId = Thread.CurrentThread.ManagedThreadId;
            workerState.threadId = threadId.ToString();

            WorkerEventInfo weInfo = new WorkerEventInfo();
            weInfo.eventType = EventType.WorkerStarted;
            weInfo.workerState = workerState;
            workerState.Started(weInfo);

            // Simulate work for ((threadid / 2) seconds.
            Thread.Sleep((threadId * 500));

            // Set the result in the state object to the threadId; 
            workerState.result = threadId;

            // Signal that this thread is done.
            workerState.finishSignal.Set();
        }

        // Executed in threadpool thread
        private void ItemHasFinished(object state, bool timedOut)
        {
            // get our state object
            WorkerState workerState = state as WorkerState;

            WorkerEventInfo weInfo = new WorkerEventInfo();
            weInfo.eventType = EventType.WorkerEnded;
            weInfo.workerState = workerState;

            workerState.Finished(weInfo);
        }

        private void ProgressTimer_Tick(object sender, EventArgs e)
        {
            List<WorkerState> removeStates = new List<WorkerState>();
            workerStates.ForEach(workerState =>
                {
                    if (workerState.finishSignal.WaitOne(0))
                    {
                        CurrentResult.Value += workerState.result;
                        removeStates.Add(workerState);
                    }
                }
            );

            removeStates.ForEach(workerState =>
                {
                    workerState.registeredWaitHandle.Unregister(workerState.finishSignal);
                    workerStates.Remove(workerState);
                }
            );


            WorkersProgress.Value = workerStates.Count;
            if (workerStates.Count == 0)
            {
                WorkerEventInfo weInfo = new WorkerEventInfo();
                weInfo.eventType = EventType.AllWorkersFinished;
                weInfo.workerState = null;
                this.workerEventOccured(weInfo);
            }

        }
    }

    internal class WorkerState
    {
        internal string threadId = "";
        internal int    result = 0;
        internal RegisteredWaitHandle registeredWaitHandle = null;
        internal AutoResetEvent finishSignal = new AutoResetEvent(false);
        internal event WorkerEvent OnStarted = new WorkerEvent( (info) => {});
        internal event WorkerEvent OnFinished = new WorkerEvent((info) => { });

        internal void Started(WorkerEventInfo info)
        {
            OnStarted(info);
        }

        internal void Finished(WorkerEventInfo info)
        {
            OnFinished(info);
            this.finishSignal.Set();
        }
    }

    internal enum EventType
    {
        WorkerStarted,
        WorkerEnded,
        AllWorkersFinished
    }

    internal class WorkerEventInfo
    {
        internal EventType eventType;
        internal WorkerState workerState;
    }
}


Form.Designer.cs


namespace STAFormWithThreadPoolSync
{
    partial class Form1
    {
        /// 
        /// Required designer variable.
        /// 
        private System.ComponentModel.IContainer components = null;

        /// 
        /// Clean up any resources being used.
        /// 
        /// true if managed resources should be disposed; otherwise, false.
        protected override void Dispose(bool disposing)
        {
            if (disposing && (components != null))
            {
                components.Dispose();
            }
            base.Dispose(disposing);
        }

        #region Windows Form Designer generated code

        /// 
        /// Required method for Designer support - do not modify
        /// the contents of this method with the code editor.
        /// 
        private void InitializeComponent()
        {
            this.components = new System.ComponentModel.Container();
            this.button1 = new System.Windows.Forms.Button();
            this.ThreadCount = new System.Windows.Forms.NumericUpDown();
            this.workersList = new System.Windows.Forms.ListView();
            this.WorkerProcessColumn = new System.Windows.Forms.ColumnHeader();
            this.ProgressTimer = new System.Windows.Forms.Timer(this.components);
            this.WorkersProgress = new System.Windows.Forms.ProgressBar();
            this.CurrentResultLabel = new System.Windows.Forms.Label();
            this.CurrentResult = new System.Windows.Forms.NumericUpDown();
            this.label2 = new System.Windows.Forms.Label();
            ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).BeginInit();
            ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).BeginInit();
            this.SuspendLayout();
            // 
            // button1
            // 
            this.button1.Location = new System.Drawing.Point(212, 19);
            this.button1.Name = "button1";
            this.button1.Size = new System.Drawing.Size(93, 23);
            this.button1.TabIndex = 0;
            this.button1.Text = "Start threads";
            this.button1.UseVisualStyleBackColor = true;
            this.button1.Click += new System.EventHandler(this.button1_Click);
            // 
            // ThreadCount
            // 
            this.ThreadCount.Location = new System.Drawing.Point(23, 21);
            this.ThreadCount.Minimum = new decimal(new int[] {
            2,
            0,
            0,
            0});
            this.ThreadCount.Name = "ThreadCount";
            this.ThreadCount.Size = new System.Drawing.Size(183, 20);
            this.ThreadCount.TabIndex = 1;
            this.ThreadCount.Value = new decimal(new int[] {
            4,
            0,
            0,
            0});
            // 
            // workersList
            // 
            this.workersList.Columns.AddRange(new System.Windows.Forms.ColumnHeader[] {
            this.WorkerProcessColumn});
            this.workersList.Location = new System.Drawing.Point(23, 80);
            this.workersList.Name = "workersList";
            this.workersList.Size = new System.Drawing.Size(486, 255);
            this.workersList.TabIndex = 3;
            this.workersList.UseCompatibleStateImageBehavior = false;
            this.workersList.View = System.Windows.Forms.View.Details;
            // 
            // WorkerProcessColumn
            // 
            this.WorkerProcessColumn.Text = "Worker process";
            this.WorkerProcessColumn.Width = 482;
            // 
            // ProgressTimer
            // 
            this.ProgressTimer.Interval = 200;
            this.ProgressTimer.Tick += new System.EventHandler(this.ProgressTimer_Tick);
            // 
            // WorkersProgress
            // 
            this.WorkersProgress.Location = new System.Drawing.Point(112, 341);
            this.WorkersProgress.Name = "WorkersProgress";
            this.WorkersProgress.Size = new System.Drawing.Size(397, 24);
            this.WorkersProgress.TabIndex = 4;
            // 
            // CurrentResultLabel
            // 
            this.CurrentResultLabel.AutoSize = true;
            this.CurrentResultLabel.Location = new System.Drawing.Point(578, 266);
            this.CurrentResultLabel.Name = "CurrentResultLabel";
            this.CurrentResultLabel.Size = new System.Drawing.Size(74, 13);
            this.CurrentResultLabel.TabIndex = 5;
            this.CurrentResultLabel.Text = "Current Result";
            // 
            // CurrentResult
            // 
            this.CurrentResult.Location = new System.Drawing.Point(581, 282);
            this.CurrentResult.Maximum = new decimal(new int[] {
            -1593835520,
            466537709,
            54210,
            0});
            this.CurrentResult.Name = "CurrentResult";
            this.CurrentResult.ReadOnly = true;
            this.CurrentResult.Size = new System.Drawing.Size(169, 20);
            this.CurrentResult.TabIndex = 6;
            // 
            // label2
            // 
            this.label2.AutoSize = true;
            this.label2.Location = new System.Drawing.Point(25, 352);
            this.label2.Name = "label2";
            this.label2.Size = new System.Drawing.Size(81, 13);
            this.label2.TabIndex = 7;
            this.label2.Text = "processing load";
            // 
            // Form1
            // 
            this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 13F);
            this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font;
            this.ClientSize = new System.Drawing.Size(762, 377);
            this.Controls.Add(this.label2);
            this.Controls.Add(this.CurrentResult);
            this.Controls.Add(this.CurrentResultLabel);
            this.Controls.Add(this.WorkersProgress);
            this.Controls.Add(this.workersList);
            this.Controls.Add(this.ThreadCount);
            this.Controls.Add(this.button1);
            this.Name = "Form1";
            this.Text = "Form1";
            ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).EndInit();
            ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).EndInit();
            this.ResumeLayout(false);
            this.PerformLayout();

        }

        #endregion

        private System.Windows.Forms.Button button1;
        private System.Windows.Forms.NumericUpDown ThreadCount;
        private System.Windows.Forms.ListView workersList;
        private System.Windows.Forms.ColumnHeader WorkerProcessColumn;
        private System.Windows.Forms.Timer ProgressTimer;
        private System.Windows.Forms.ProgressBar WorkersProgress;
        private System.Windows.Forms.Label CurrentResultLabel;
        private System.Windows.Forms.NumericUpDown CurrentResult;
        private System.Windows.Forms.Label label2;
    }
}

Hope this helps,


I solved the problem by changing my approach to Producer-Consumer model.

Thanks all. Please look at this link (provided by casperOne above), but be caution don't follow the implementation of microsoft....

Go here instead will give you a better answer.

Of course I've made some changes, the type of the Queue is Delegate in my case.

public static class QueryThread
{
    private static SyncEvents _syncEvents = new SyncEvents();
    private static Queue<Delegate> _queryQueue = new Queue<Delegate>();

    static Producer queryProducer;
    static Consumer queryConsumer;

    public static void init()
    {
        queryProducer = new Producer(_queryQueue, _syncEvents);
        queryConsumer = new Consumer(_queryQueue, _syncEvents);

        Thread producerThread = new Thread(queryProducer.ThreadRun);
        Thread consumerThread = new Thread(queryConsumer.ThreadRun);

        producerThread.IsBackground = true;
        consumerThread.IsBackground = true;

        producerThread.Start();
        consumerThread.Start();
    }

    public static void Enqueue(Delegate item)
    {
        queryQueue.Enqueue(item);
    }
}

When a query is needed in the main thread, enqueue a delegate points to the function that makes query by calling the Enqueue(Delegate item). This add a delegate to the "private" queue of the Producer.

The producer will add the items in its own queue to the shared queue on the suitable occasion (like generating random number and put it into the shared queue in the msdn example).

The consumer dequeue the delegates and run them.

Thanks all for helping. =]

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号