开发者

efficient way to do multi threaded calls to sql server?

开发者 https://www.devze.com 2023-01-31 12:38 出处:网络
i have an sql stored procedure that will call to TOP 1000 records from a table that function like a queue-in this table there will be more or less 30,000-40,000 records.the call to the SP takes ~4 sec

i have an sql stored procedure that will call to TOP 1000 records from a table that function like a queue-in this table there will be more or less 30,000-40,000 records.the call to the SP takes ~4 seconds (there's an xml column) so to finish the calls it will take ~2 minutes. i th开发者_开发技巧ought to use multi threaded calls and to insert the records to a sync dictionary\list. did someone did that before? any efficient way to end the calls as soon as possible? Thanks...


Consider optimizing the query before resorting to threads.

In my experience, when beginners at multi-threading implement threads, it usually does not improve performance. Worse, it usually introduces subtle errors which can be difficult to debug.

Optimize the query first, and you may find that you don't need threads.

Even if you implemented them, eventually you'll have SQL Server doing too much work, and the threaded requests will simply have to wait.


Basic mistake is wanting to insert into the database from multiple threads and overload server with connections, locks, and eventually bring it to its knees.

If you are READING the data, you will do much better if you find a query that will perform faster and fetch as much data as you can at once.

To me, it seems like your problem is not solvable on its level - maybe if you elaborate what you want to do you'll get better advice.

EDIT:

I did use SQL as a queue once - and I just remembered - to dequeue, you'll have to use result from the first query to get input to the second, so threads are out of the question. Or, you'll have to MARK your queued data 'done' in the database, and your READ will become UPDATE -> resulting to locking.

If you are reading, and you want to react as soon as possible, you can use DataReader, then read ALL of the data, and chunk your processing into threads - read 100 records, fork a thread and pass it to it... then next records and so on. That way you'll be able to balance your resource usage.


Try reading the data asynchronously using DataReader; fetch the columns that can uniquely identify the row in the database .Populate the Queue to hold the returned data value (Custom Object) and run work threads to perform the task against the queue.

You have to decide how many worker threads should be implemented to perform the task as threads have their own overheads and if not implemented correctly could be a nightmare.


If you really have to you can start BGWorkers that individually make connections to the server and report back with their progress.

I did the same thing for an elaborate export/import application to move roughly 50GB of data (4GB deflatestream'ed) except I only used the BGWorker to do the work consecutively, not concurrently, without locking up the UI-thread..


It isn't clear if you're selecting the 1000 most recently added rows, or the 1000 rows with the highest value in a particular column, nor is it clear whether your rows are mutable -- i.e. a row might qualify for the top 1000 yesterday but then get updated so that it no longer qualifies today. But if the individual rows are not mutable, you could have a separate table for the TOP1000, and when the 1001st row is inserted into it, an after-insert trigger would move the 1001st row (however you determine that row) to a HISTORY table. That would make the selection virtually instantaneous: select * from TOP1000. You'd simply combine the two tables with a UNION when you need to query the TOP1000 and HISTORY as though they were one table. Or instead of trigger you could wrap the insert and 1001st-row delete in a transaction.

Different can of worms, though, if the rows mutate, and can move in and out of the top 1000.


public struct BillingData
{
    public int CustomerTrackID, CustomerID;
    public DateTime BillingDate;
}

public Queue<BillingData> customerQueue = new Queue<BillingData>();
volatile static int ThreadProcessCount = 0;
readonly static object threadprocesslock = new object();
readonly static object queuelock = new object();
readonly static object countlock = new object();
AsyncCallback asyncCallback

// Pulling the Data Aync from the Database
private void StartProcess()
{
  SqlCommand command = SQLHelper.GetCommand("GetRecordsByBillingTrackID");
  command.Connection = SQLHelper.GetConnection("Con");SQLHelper.DeriveParameters(command);
  command.Parameters["@TrackID"].Value = trackid;
  asyncCallback = new AsyncCallback(FetchData);
  command.BeginExecuteXmlReader(asyncCallback, command);
}

public void FetchData(IAsyncResult c1)
    {
        SqlCommand comm1 = (SqlCommand)c1.AsyncState;
        System.Xml.XmlReader xr = comm1.EndExecuteXmlReader(c1);
        xr.Read();
        string data = "";
        while (!xr.EOF)
        {
        data = xr.ReadOuterXml();
        XmlDocument dom = new XmlDocument();
        dom.LoadXml("<data>" + data + "</data>");
        BillingData billingData;
        billingData.CustomerTrackID = Convert.ToInt32(dom.FirstChild.ChildNodes[0].Attributes["CustomerTrackID"].Value);
        billingData.CustomerID = Convert.ToInt32(dom.FirstChild.ChildNodes[0].Attributes["CustomerID"].Value);
        billingData.BillingDate = Convert.ToDateTime(dom.FirstChild.ChildNodes[0].Attributes["BillingDate"].Value);
lock (queuelock)
{
   if (!customerQueue.Contains(billingData))
   {
     customerQueue.Enqueue(billingData);
   }
}
   AssignThreadProcessToTheCustomer();


}



xr.Close();



}



// Assign the Threads based on the data pulled
private void AssignThreadProcessToTheCustomer()
{
int TotalProcessThreads =  5;
int TotalCustomersPerThread = 5;
if (ThreadProcessCount < TotalProcessThreads)
{
int ThreadsNeeded = (customerQueue.Count % TotalCustomersPerThread == 0) ?    (customerQueue.Count / TotalCustomersPerThread) : (customerQueue.Count / TotalCustomersPerThread + 1);
int count = 0;
 if (ThreadsNeeded > ThreadProcessCount)
{
 count = ThreadsNeeded - ThreadProcessCount;
 if ((count + ThreadProcessCount) > TotalProcessThreads)
    count = TotalProcessThreads - ThreadProcessCount;
}
for (int i = 0; i < count; i++)
{
ThreadProcess objThreadProcess = new ThreadProcess(this);
ThreadPool.QueueUserWorkItem(objThreadProcess.BillingEngineThreadPoolCallBack, count);
lock (threadprocesslock)
{
 ThreadProcessCount++;
}
}

public void BillingEngineThreadPoolCallBack(object threadContext)
{
BillingData? billingData = null;
while (true)
{
   lock (queuelock)
   {
      billingData = ProcessCustomerQueue();
   }
 if (billingData != null)
{
 StartBilling(billingData.Value);
}
else
 break;

More....
}
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号