开发者

How to most efficently read a list of files as one stream and hash pieces from it?

开发者 https://www.devze.com 2023-02-12 02:58 出处:网络
I have a list of files, which need to be read, in chunks, into a byte[], which is then passed to a hashing function. The tricky part is this: if I reach the end of a file, I need to continue reading t

I have a list of files, which need to be read, in chunks, into a byte[], which is then passed to a hashing function. The tricky part is this: if I reach the end of a file, I need to continue reading the next file untill I fill the buffer, like so:

read 16 bits as an example:

File 1: 00101010

File 2: 01101010111111111

would need to be read as 0010101001101010

The point is: these files can be as large as several gigabytes, and I don't want to completely load them into memory. Loading pieces into a buffer of, like, 30 MB would be perfectly fine.

I want to use threading, but would it be efficient to thread reading a file? I don't know if Disc I/O is such a large bottleneck that this would be worth it. Would the hashing be sped up sufficently if I only thread that part, and lock on the read of each chunk? It is important the h开发者_如何学编程ashes get saved in the correct order.

The second thing I need to do, is to generate the MD5sum from each file as well. Is there anyway to do this more efficiently than doing this as a separate step?

(This question has some overlap with Is there a built-in way to handle multiple files as one stream?, but I thought this differed enough)

I am really stumped what approach to take, as I am fairly new to C#, as well as to threading. I already tried the approaches listed below, but they do not suffice for me.

As I am new to C# I value every kind of input on any aspect of my code.


This piece of code was threaded, but does not 'append' the streams, and as such generates invalid hashes:

public void DoHashing()
{
    ParallelOptions options = new ParallelOptions();
    options.MaxDegreeOfParallelism = numThreads;
    options.CancellationToken = cancelToken.Token;
    Parallel.ForEach(files, options, (string f, ParallelLoopState loopState) =>
        {
            options.CancellationToken.ThrowIfCancellationRequested();

            using (BufferedStream fileStream = new BufferedStream(File.OpenRead(f), bufferSize))
            {

                // Get the MD5sum first:
                using (MD5CryptoServiceProvider md5 = new MD5CryptoServiceProvider())
                {
                    md5.Initialize();
                    md5Sums[f] = BitConverter.ToString(md5.ComputeHash(fileStream)).Replace("-", "");
                }

                //setup for reading:
                byte[] buffer = new byte[(int)pieceLength];
                //I don't know if the buffer will f*ck up the filelenghth
                long remaining = (new FileInfo(f)).Length;
                int done = 0;
                while (remaining > 0)
                {
                    while (done < pieceLength)
                    {
                        options.CancellationToken.ThrowIfCancellationRequested();
                        //either try to read the piecelength, or the remaining length of the file.
                        int toRead = (int)Math.Min(pieceLength - done, remaining);
                        int read = fileStream.Read(buffer, done, toRead);

                        //if read == 0, EOF reached
                        if (read == 0)
                        {
                            remaining = 0;
                            break;
                        }

                        //offsets
                        done += read;
                        remaining -= read;
                    }
                    // Hash the piece
                    using (SHA1CryptoServiceProvider sha1 = new SHA1CryptoServiceProvider())
                    {
                        sha1.Initialize();
                        byte[] hash = sha1.ComputeHash(buffer);
                        hashes[f].AddRange(hash);
                    }
                    done = 0;
                    buffer = new byte[(int)pieceLength];
                }
            }
        }
    );


}

This other piece of code isn't threaded (and doesn't calculate MD5):

void Hash()
{
    //examples, these got handled by other methods
    List<string> files = new List<string>();
    files.Add("a.txt");
    files.Add("b.blob");
    //....
    long totalFileLength;
    int pieceLength = Math.Pow(2,20);
    foreach (string file in files)
    {
        totalFileLength += (new FileInfo(file)).Length;
    }

    //Reading the file:
    long remaining = totalFileLength;
    byte[] buffer = new byte[Math.min(remaining, pieceSize)];
    int index = 0;
    FileStream fin = File.OpenRead(files[index]);
    int done = 0;
    int offset = 0;
    while (remaining > 0)
    {
        while (done < pieceLength)
        {
            int toRead = (int)Math.Min(pieceLength - offset, remaining);
            int read = fin.Read(buffer, done, toRead);

            //if read == 0, EOF reached
            if (read == 0)
            {
                index++;
                //if last file:
                if (index > files.Count)
                {
                    remaining = 0;
                    break;
                }
                //get ready for next round:
                offset = 0;
                fin.OpenRead(files[index]);
            }
            done += read;
            offset += read;
            remaining -= read;
        }
        //Doing the piece hash:
        HashPiece(buffer);

        //reset for next piece:
        done = 0;
        byte[] buffer = new byte[Math.min(remaining, pieceSize)];
    }
}
void HashPiece(byte[] piece)
{
    using (SHA1CryptoServiceProvider sha1 = new SHA1CryptoServiceProvider())
    {
        sha1.Initialize();
        //hashes is a List
        hashes.Add(sha1.ComputeHash(piece));
    }
}

Thank you very much for your time and effort.

I'm not looking for completely coded solutions, any pointer and idea where to go with this would be excellent.



Questions & remarks to yodaj007's answer:


Why if (currentChunk.Length >= Constants.CHUNK_SIZE_IN_BYTES)? Why not ==? If the chunk is larger than the chunk size, my SHA1 hash gets a different value.


            currentChunk.Sources.Add(new ChunkSource()
            {
                Filename = fi.FullName,
                StartPosition = 0,
                Length = (int)Math.Min(fi.Length, (long)needed)
            });

Is a really interesting idea. Postpone reading untill you need it. Nice!


    chunks.Add(currentChunk = new Chunk());

Why do this in the if (currentChunk != null) block and in the for (int i = 0; i < (fi.Length - offset) / Constants.CHUNK_SIZE_IN_BYTES; i++) block? Isn't the first a bit redundant?



Here is my complete answer. I tested it on one of my anime folders. It processes 14 files totaling 3.64GiB in roughly 16 seconds. In my opinion, using any sort of parallelism is more trouble than it is worth here. You're being limited by disc I/O, so multithreading will only get you so far. My solution can be easily parallelized though.

It starts by reading "chunk" source information: source file, offset, and length. All of this is gathered very quickly. From here, you can process the "chunks" using threading however you wish. Code follows:

public static class Constants
{
    public const int CHUNK_SIZE_IN_BYTES = 32 * 1024 * 1024;        // 32MiB
}

public class ChunkSource
{
    public string Filename { get; set; }
    public int StartPosition { get; set; }
    public int Length { get; set; }
}

public class Chunk
{
    private List<ChunkSource> _sources = new List<ChunkSource>();

    public IList<ChunkSource> Sources { get { return _sources; } }
    public byte[] Hash { get; set; }
    public int Length
    {
        get { return Sources.Select(s => s.Length).Sum(); }
    }
}

static class Program
{
    static void Main()
    {
        DirectoryInfo di = new DirectoryInfo(@"C:\Stuff\Anime\Shikabane Hime Aka");

        string[] filenames = di.GetFiles().Select(fi=> fi.FullName).OrderBy(n => n).ToArray();
        var chunks = ChunkFiles(filenames);
        ComputeHashes(chunks);
    }

    private static List<Chunk> ChunkFiles(string[] filenames)
    {
        List<Chunk> chunks = new List<Chunk>();
        Chunk currentChunk = null;
        int offset = 0;

        foreach (string filename in filenames)
        {
            FileInfo fi = new FileInfo(filename);
            if (!fi.Exists)
                throw new FileNotFoundException(filename);

            Debug.WriteLine(String.Format("File: {0}", filename));
            //
            // First, start off by either starting a new chunk or 
            // by finishing a leftover chunk from a previous file.
            //
            if (currentChunk != null)
            {
                //
                // We get here if the previous file had leftover bytes that left us with an incomplete chunk
                //
                int needed = Constants.CHUNK_SIZE_IN_BYTES - currentChunk.Length;
                if (needed == 0)
                    throw new InvalidOperationException("Something went wonky, shouldn't be here");

                offset = needed;
                currentChunk.Sources.Add(new ChunkSource()
                {
                    Filename = fi.FullName,
                    StartPosition = 0,
                    Length = (int)Math.Min(fi.Length, (long)needed)
                });

                if (currentChunk.Length >= Constants.CHUNK_SIZE_IN_BYTES)
                {
                    chunks.Add(currentChunk = new Chunk());
                }
            }
            else
            {
                offset = 0;
            }

            //
            // Note: Using integer division here
            //
            for (int i = 0; i < (fi.Length - offset) / Constants.CHUNK_SIZE_IN_BYTES; i++)
            {
                chunks.Add(currentChunk = new Chunk());
                currentChunk.Sources.Add(new ChunkSource()
                {
                    Filename = fi.FullName,
                    StartPosition = i * Constants.CHUNK_SIZE_IN_BYTES + offset,
                    Length = Constants.CHUNK_SIZE_IN_BYTES
                });

                Debug.WriteLine(String.Format("Chunk source created: Offset = {0,10}, Length = {1,10}", currentChunk.Sources[0].StartPosition, currentChunk.Sources[0].Length));
            }

            int leftover = (int)(fi.Length - offset) % Constants.CHUNK_SIZE_IN_BYTES;
            if (leftover > 0)
            {
                chunks.Add(currentChunk = new Chunk());
                currentChunk.Sources.Add(new ChunkSource()
                {
                    Filename = fi.FullName,
                    StartPosition = (int)(fi.Length - leftover),
                    Length = leftover
                });
            }
            else
            {
                currentChunk = null;
            }

        }

        return chunks;
    }

    private static void ComputeHashes(IList<Chunk> chunks)
    {
        if (chunks == null || chunks.Count == 0)
            return;

        Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();

        foreach (var chunk in chunks)
        {
            MemoryMappedFile mms = null;
            byte[] buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES];

            Stopwatch sw = Stopwatch.StartNew();
            foreach (var source in chunk.Sources)
            {
                lock (files)
                {
                    if (!files.TryGetValue(source.Filename, out mms))
                    {
                        Debug.WriteLine(String.Format("Opening {0}", source.Filename));
                        files.Add(source.Filename, mms = MemoryMappedFile.CreateFromFile(source.Filename, FileMode.Open));
                    }
                }

                var view = mms.CreateViewStream(source.StartPosition, source.Length);
                view.Read(buffer, 0, source.Length);
            }

            Debug.WriteLine("Done reading sources in {0}ms", sw.Elapsed.TotalMilliseconds);
            sw.Restart();
            MD5 md5 = MD5.Create();
            chunk.Hash = md5.ComputeHash(buffer);
            sw.Stop();

            Debug.WriteLine(String.Format("Computed hash: {0} in {1}ms", String.Join("-", chunk.Hash.Select(h=> h.ToString("X2")).ToArray()), sw.Elapsed.TotalMilliseconds));
        }

        foreach (var x in files.Values)
        {
            x.Dispose();
        }
    }
}

I don't guarantee everything is spotlessly free of bugs. But I did have fun working on it. Look at the output window in Visual Studio for the debug information. It looks like this:

File: C:\Stuff\Anime\Shikabane Hime Aka\Episode 02.mkv
Chunk source created: Offset = 26966010, Length = 33554432
Chunk source created: Offset = 60520442, Length = 33554432
Chunk source created: Offset = 94074874, Length = 33554432
Chunk source created: Offset = 127629306, Length = 33554432
Chunk source created: Offset = 161183738, Length = 33554432
Chunk source created: Offset = 194738170, Length = 33554432
Chunk source created: Offset = 228292602, Length = 33554432

...

Opening C:\Stuff\Anime\Shikabane Hime Aka\Episode 02.mkv
Done reading sources in 42.9362ms
The thread '' (0xc10) has exited with code 0 (0x0).
Computed hash: 3C-81-A5-2C-90-02-24-23-42-5B-19-A2-15-56-AB-3F in 94.2481ms
Done reading sources in 0.0053ms
Computed hash: 58-F0-6D-D5-88-D8-FF-B3-BE-B4-6A-DA-63-09-43-6B in 98.9263ms
Done reading sources in 29.4805ms
Computed hash: F7-19-8D-A8-FE-9C-07-6E-DB-D5-74-A6-E1-E7-A6-26 in 85.0061ms
Done reading sources in 28.4971ms
Computed hash: 49-F2-CB-75-89-9A-BC-FA-94-A7-DF-E0-DB-02-8A-99 in 84.2799ms
Done reading sources in 31.106ms
Computed hash: 29-7B-18-BD-ED-E9-0C-68-4B-47-C6-5F-D0-16-8A-44 in 84.1444ms
Done reading sources in 31.2204ms
Computed hash: F8-91-F1-90-CF-9C-37-4E-82-68-C2-44-0D-A7-6E-F8 in 84.2592ms
Done reading sources in 31.0031ms
Computed hash: 65-97-ED-95-07-31-BF-C8-3A-BA-2B-DA-03-37-FD-00 in 95.331ms
Done reading sources in 33.0072ms
Computed hash: 9B-F2-83-E6-A8-DF-FD-8D-6C-5C-9E-F4-20-0A-38-4B in 85.9561ms
Done reading sources in 31.6232ms
Computed hash: B6-7C-6B-95-69-BC-9C-B2-1A-07-B3-13-28-A8-10-BC in 84.1866ms

Here is the parallel version. It's basically the same really. Using parallelism = 3 cut the processing time down to 9 seconds.

    private static void ComputeHashes(IList<Chunk> chunks)
    {
        if (chunks == null || chunks.Count == 0)
            return;

        Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();
        Parallel.ForEach(chunks, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (chunk, state, index) =>
            {
                MemoryMappedFile mms = null;
                byte[] buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES];

                Stopwatch sw = Stopwatch.StartNew();
                foreach (var source in chunk.Sources)
                {
                    lock (files)
                    {
                        if (!files.TryGetValue(source.Filename, out mms))
                        {
                            Debug.WriteLine(String.Format("Opening {0}", source.Filename));
                            files.Add(source.Filename, mms = MemoryMappedFile.CreateFromFile(source.Filename, FileMode.Open));
                        }
                    }

                    var view = mms.CreateViewStream(source.StartPosition, source.Length);
                    view.Read(buffer, 0, source.Length);
                }

                Debug.WriteLine("Done reading sources in {0}ms", sw.Elapsed.TotalMilliseconds);
                sw.Restart();
                MD5 md5 = MD5.Create();
                chunk.Hash = md5.ComputeHash(buffer);
                sw.Stop();

                Debug.WriteLine(String.Format("Computed hash: {0} in {1}ms", String.Join("-", chunk.Hash.Select(h => h.ToString("X2")).ToArray()), sw.Elapsed.TotalMilliseconds));
            });

        foreach (var x in files.Values)
        {
            x.Dispose();
        }
    }

EDIT

I found a bug, or what I think is a bug. Need to set the read offset to 0 if we're starting a new file.

EDIT 2 based on feedback

This processes the hashes in a separate thread. It's necessary to throttle the I/O. I was running into OutOfMemoryException without doing so. It doesn't really perform that much better, though. Beyond this... I'm not sure how it can be improved any further. Perhaps by reusing the buffers, maybe.

public class QueueItem
{
    public Chunk Chunk { get; set; }
    public byte[] buffer { get; set; }
}

private static void ComputeHashes(IList<Chunk> chunks)
{
    if (chunks == null || chunks.Count == 0)
        return;

    Dictionary<string, MemoryMappedFile> files = new Dictionary<string, MemoryMappedFile>();
    foreach (var filename in chunks.SelectMany(c => c.Sources).Select(c => c.Filename).Distinct())
    {
        files.Add(filename, MemoryMappedFile.CreateFromFile(filename, FileMode.Open));
    }
        
    AutoResetEvent monitor = new AutoResetEvent(false);
    ConcurrentQueue<QueueItem> hashQueue = new ConcurrentQueue<QueueItem>();
    CancellationToken token = new CancellationToken();

    Task.Factory.StartNew(() =>
        {
            int processCount = 0;
            QueueItem item = null;
            while (!token.IsCancellationRequested)
            {
                if (hashQueue.TryDequeue(out item))
                {
                    MD5 md5 = MD5.Create();
                    item.Chunk.Hash = md5.ComputeHash(item.buffer);

                    if (processCount++ > 1000)
                    {
                        processCount = 0;
                        monitor.Set();
                    }
                }
            }
        }, token);

    foreach (var chunk in chunks)
    {
        if (hashQueue.Count > 10000)
        {
            monitor.WaitOne();
        }

        QueueItem item = new QueueItem()
        {
            buffer = new byte[Constants.CHUNK_SIZE_IN_BYTES],
            Chunk = chunk
        };

        Stopwatch sw = Stopwatch.StartNew();
        foreach (var source in chunk.Sources)
        {
            MemoryMappedFile mms = files[source.Filename];
            var view = mms.CreateViewStream(source.StartPosition, source.Length);
            view.Read(item.buffer, 0, source.Length);
        }

        sw.Restart();
        sw.Stop();
        hashQueue.Enqueue(item);
    }

    foreach (var x in files.Values)
    {
        x.Dispose();
    }
}


I'm new to C# too, but I think what your are looking for is System.IO.MemoryMappedFiles namespace since C# 4.0

Using this API functions the operating system itself takes care how to manage the current file region in memory.

In stead of copy&paste code here, continue reading this article: http://www.developer.com/net/article.php/3828586/Using-Memory-Mapped-Files-in-NET-40.htm

Regarding the MD5 use the System.Security.Cryptography.MD5CryptoServiceProvider class. Maybe it's faster.

In your case where you have to go over the "boundaries" of one file, do it. Let the operating system handle how the memory mapped files are represented in memory. Work as you would do with "small" sized buffers.


In .Net 4 you now have System.IO.MemoryMappedFiles

You can create a ViewAccessor of a particular chuck size to match your hash function, and then just keep filling your hash function buffer from the current ViewAccessor, when you run out of file, start chunking the next file using the current hash chuck offset as your ViewAccessor offset

0

精彩评论

暂无评论...
验证码 换一张
取 消