开发者

Preload data from a file using a separate thread

开发者 https://www.devze.com 2023-03-30 00:40 出处:网络
I have a small application that process a large quantity of (relatively small) files. It runs sequentially: it loads data from a file, perform operations on it, and move to the next file.

I have a small application that process a large quantity of (relatively small) files. It runs sequentially: it loads data from a file, perform operations on it, and move to the next file. I noticed that during run time, the开发者_如何学Go CPU usage is not 100%, and I guess this is due to the time taken by the I/O operations on the hard drive.

So the idea would be to load the next data in memory in parallel with the processing of the current data, using a separate thread (the data in question would simply be a sequence of int, stored in a vector). This seems a very common problem, but I have a hard time finding a simple, plain C++ example to do that! And now C++0x is on its way, a simple demo code using the new thread facility, with no external library, would be very nice.

Also, although I know this depends on a lot of things, is it possible to have an educated guess on the benefits (or setbacks) of such an approach, in respect to the size of the data file to load for example? I guess that with large files, the disk I/O operations are very seldom anyway, since the data is already buffered (with fstream(?))

Olivier


A toy program on how to use some C++0x threading and synchronization facilities. No idea on what the performance of this (I recommend Matt's answer), my focus is on clarity and correctness for the sake of making an example.

The files are read separately, as you requested. They're not converted to a sequence of int however, as I feel this is more related to processing rather than strict I/O. So the files are dumped into a plain std::string.

#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <deque>
#include <future>
#include <mutex>
#include <condition_variable>

int
main()
{
    // this is shared
    std::mutex mutex;
    std::condition_variable condition;
    bool more_to_process = true;
    std::deque<std::string> to_process;

    /* Reading the files is done asynchronously */
    std::vector<std::string> filenames = /* initialize */
    auto process = std::async(std::launch::async, [&](std::vector<std::string> filenames)
    {
        typedef std::lock_guard<std::mutex> lock_type;
        for(auto&& filename: filenames) {
            std::ifstream file(filename);
            if(file) {
                std::ostringstream stream;
                stream << file.rdbuf();
                if(stream) {
                    lock_type lock(mutex);
                    to_process.push_back(stream.str());
                    condition.notify_one();
                }
            }
        }
        lock_type lock(mutex);
        more_to_process = false;
        condition.notify_one();
    }, std::move(filenames));

    /* processing is synchronous */
    for(;;) {
        std::string file;
        {
            std::unique_lock<std::mutex> lock(mutex);
            condition.wait(lock, [&]
            { return !more_to_process || !to_process.empty(); });

            if(!more_to_process && to_process.empty())
                break;
            else if(to_process.empty())
                continue;

            file = std::move(to_process.front());
            to_process.pop_front();
        }

        // use file here
    }

    process.get();
}

Some notes:

  • the mutex, condition variable, stop flag and std::string container are all logically related. You may as well replace them with a thread-safe container/channel
  • I use std::async instead of std::thread because it has better exception-safety characteristics
  • there is no error handling to speak of; if a file can't be read for some reason, it is silently skipped. You have several options: signal that there is no more to process and throw to handle as soon as possible; or use a boost::variant<std::string, std::exception_ptr> to pass the error on to the processing side of things (here the error is passed as an exception but you can use an error_code or anything you fancy). Not an exhaustive list by any means.


Use of threading for an IO bound problem like this will give you negligible performance gains. You may fill some "gaps" in your desire to saturate the available IO resources by opening several files ahead in advance, and by overlapping system calls, via threads as you've indicated.

I would recommend you instead look at giving the kernel hints about how you intend to do IO, which will improve read ahead, and improving the physical read bandwidth, such as by verifying that the file-system, kernel, and hard drive (or whatever your storage source is) is as fast as possible.

  • posix_fadvise()
  • posix_madvise()
  • readahead()


I would create two threads and two buffers:

  • first who reads data from file to the buffers
  • second who process the received data

If a file doesn't fit in a buffer, just add a flag of the file end. If the second thread doesn't find it in the end of a buffer it should read it from the second.

A number and size of buffers and probably numbers of threads are parameters to optimize. The main idea is to let disk controller work continuously.

** EDIT **

An ideal situation is you have all of an execution time spent on reading data from HDD. However it depends on "proceed time per datum portion"/"HDD reading time per datum portion" as this can variate.


Since your file size are relatively smaller and as you have to deal with number of files, the a better design would be to create two threads,

1. First thread reading and processing only files placed at even number 
in the file listing (ls -l in *nix).
2. Second thread reading the oddly placed file in the listing.

The disadvantage of the method you mentioned of "one thread reading data into a vector and another thread reading from it" would be that you will have to be concerned of thread races and needs to prevent it by using mutex's and condition varibles.

Where as this method would not require any locking [Hope there is no dependency between the data between files]

Also, the faster way to read data from a file would be to binary read a file into a buffer of suitable size.

Hope the answer helps you.

**EDIT:**

As per your comment, it seems that you will have to go with one thread reading data into a queue data structure [a queue of char buffer maybe] and the second thread reading data from queue and processing it.

As mentioned earlier the problem would be to read and write from the same queue as STL containers are not thread safe.

So what I can recommend here is that manage your shared data structure i.e queue here using locaks and all else go for :

1. Boost Lock free :  Boost lock free 
2. Write your own loack free implementation :  Lock free impl 

0

精彩评论

暂无评论...
验证码 换一张
取 消