开发者

Thread locking issue with FileHelpers between calling engine.ReadNext() method and readign engine.LineNumber property

开发者 https://www.devze.com 2022-12-26 07:22 出处:网络
I use producer/consumer pattern with FileHelpers library to import data from one file (which can be huge) using multiple threads. Each thread is supposed to import a chunk of that file and I would lik

I use producer/consumer pattern with FileHelpers library to import data from one file (which can be huge) using multiple threads. Each thread is supposed to import a chunk of that file and I would like to use LineNumber property of the FileHelperAsyncEngine instance that is reading the file as primary key for imported rows. FileHelperAsyncEngine internally has an IEnumerator IEnumerable.GetEnumerator(); which is iterated over using engine.ReadNext() method. That internally sets LineNumber property (which seems is not thread safe).

Consumers will have Producers assiciated with them that will supply DataTables to Consumers which will consume them via SqlBulkLoad class which will use IDataReader implementation which will iterate over a collection of DataTables which are internal to a Consumer instance. Each instance of will have one SqlBulkCopy instance associate with it.

I have thread locking issue. Below is how I create multiple Producer threads. I start each thread afterwords. Produce method on a producer instance will be called determining which chunk of input file will be proce开发者_开发问答ssed. It seems that engine.LineNumber is not thread safe and I doesn't import a proper LineNumber in the database. It seems that by the time engine.LineNumber is read some other thread called engine.ReadNext() and changed engine.LineNumber property. I don't want to lock the loop that is supposed to process a chunk of input file because I loose parallelism. How to reorganize the code to solve this threading issue?

Thanks Rad

            for (int i = 0; i < numberOfProducerThreads; i++)
            DataConsumer consumer = dataConsumers[i];

            //create a new producer
            DataProducer producer = new DataProducer();

            //consumer has already being created
            consumer.Subscribe(producer);

            FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
            orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
            orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;

            int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;

            Thread newThread = new Thread(() =>
            {
                producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
                consumer.SetEndOfData(producer);
            }); 
            producerThreads.Add(newThread); thread.Start();}

    public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
    {
        lock (this)
        {
            engine.Options.IgnoreFirstLines = skipLines;
            engine.BeginReadFile(inputFilePath);
        }

        int rowCount = 1;

        DataTable buffer = consumer.BufferDataTable;
        while (engine.ReadNext() != null)
        {
            lock (this)
            {
                dict[lineNumberFieldName] = engine.LineNumber;
                buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
                if (rowCount % DataBuffer.MaxBufferRowCount == 0)
                {
                    consumer.AddBufferDataTable(buffer);
                    buffer = consumer.BufferDataTable;
                }
                if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
                {
                    break;
                }
                rowCount++;
            }
        }
        if (buffer.Rows.Count > 0)
        {
            consumer.AddBufferDataTable(buffer);
        }
        engine.Close();
    }


Dictionary<> is not thread safe. Is the dictionary in the above code being properly locked or is it only used in your lock(this)?

As an aside I would avoid the lock(this) paradigm and use generic objects to lock your code. You may be running into other locking issues not related to specific resources. I detail that issue on my blog (Smart Resource Locking in C# .Net for Thread Safe Code). HTH


You are right the LineNumber is not thread safe :(

I just investigation the code and found that we read the line from our internal reader and later update the LineNumber so not thread safety at all.

The problem is that if we add some sincronization code inside we can make things really slower, maybe we need to create a thread safe version of the internal code to avoid that overhead.

Anyway I think that from the performance perspective the slower part of the code are the file operations so you get no speed up using multiple threads to read. Maybe you need only one thread to read the file to a working queue and has multiple threads that read it and work with each record, in that case you get the thread safety that you need

Cheers


I think I corrected the issue. It was Dictionary<> that needed lock

lock (dict) { dict[lineNumberFieldName] = engine.LineNumber; buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer)); } Thank you OmegaMan for a good clue.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号