开发者

Thread & Queue vs Serial performance

开发者 https://www.devze.com 2022-12-11 13:45 出处:网络
I though it\'ll be interesting to look at threads and queues, so I\'ve written 2 scripts, one will break a file up and encrypt each chunk in a thread, the other will do it serially. I\'m still very ne

I though it'll be interesting to look at threads and queues, so I've written 2 scripts, one will break a file up and encrypt each chunk in a thread, the other will do it serially. I'm still very new to python and don't really know why the treading script takes so much longer.

Threaded Script:

#!/usr/bin/env python

from Crypto.Cipher import AES
from optparse import OptionParser
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue


BLOCK_SIZE = 32 #32 = 256-bit | 16 = 128-bit
TFILE = 'mytestfile.bin'
CHUNK_SIZE = 2048 * 2048
KEY = os.urandom(32)

class DataSplit():
    def __init__(self,fileObj, chunkSize):

        self.fileObj = fileObj
        self.chunkSize = chunkSize

    def split(self):
        while True:
            data = self.fileObj.read(self.chunkSize)
            if not data:
                break
            yield data

class encThread(threading.Thread):
    def __init__(self, seg_queue,result_queue, cipher):
        threading.Thread.__init__(self)
        self.seg_queue = seg_queue
        self.result_queue = result_queue
        self.cipher = cipher

    def run(self):
        while True:
            #Grab a data segment from the queue
            data = self.seg_queue.get()
            encSegment = []           
            for lines in data:
            encSegment.append(self.cipher.encrypt(lines))
            self.result_queue.put(encSegment)
            print "Segment Encrypted"
            self.seg_queue.task_done()

start = time.time()
def main():
    seg_queue = Queue.Queue()
    result_queue = Queue.Queue()
    estSegCount = (os.path.getsize(TFILE)/CHUNK_SIZE)+1
    cipher = AES.new(KEY, AES.MODE_CFB)
    #Spawn threads (one for each segment at the moment)
    for i in range(estSegCount):
        eT = encThread(seg_queue, result_queue, cipher)
        eT.setDaemon(True)
        eT.start()
        print ("thread spawned")

    fileObj = open(TFILE, "rb")
    splitter = DataSplit(fileObj, CHUNK_SIZE)
    for data in splitter.split():
        seg_queue.put(data)
        print ("Data sent to thread")

    seg_queue.join()
    #result_queue.join()
    print ("Seg Q: {0}".format(seg_queue.qsize()))
    print ("Res Q: {0}".format(result_queue.qsize()))



main()
print ("Elapsed Time: {0}".format(time.time()-start))

Serial Script:

#!/usr/bin/env python

from Crypto.Cipher import AES
from optparse import OptionParser
import os, base64, time, sys, hashlib, pickle开发者_如何学Python, threading, timeit, Queue

TFILE = 'mytestfile.bin'
CHUNK_SIZE = 2048 * 2048

class EncSeries():
    def __init(self):
        pass

    def loadFile(self,path):
        openFile = open(path, "rb")
        #fileData = openFile.readlines()
        fileData = openFile.read(CHUNK_SIZE)
        openFile.close()
        return fileData

    def encryptData(self,key, data):
        cipher = AES.new(key, AES.MODE_CFB)
        newData = []
        for lines in data:
            newData.append(cipher.encrypt(lines))
        return newData


start = time.time()
def main():
    print ("Start")
    key = os.urandom(32)
    run = EncSeries()
    fileData = run.loadFile(TFILE)

    encFileData=run.encryptData(key, fileData)
    print("Finish")

main()
print ("Elapsed Time: {0}".format(time.time()-start))

using readlines() instead of read seems to speed things up considerably on the serial version too, but it's already much fast than the threaded version.


  1. It seems like your second version only reads one chunk, while the first version reads the whole file - this would explain the big speedup. Edit: Another issue: I just noticed that you run for lines in data for no reason - this would actually encrypt the characters individually, which is much slower. Instead, just pass the data to encrypt directly.

  2. There is no point in starting more CPU-heavy threads than you have processor cores.

  3. The threads can only work in parallel if they call an extension module which unlocks the GIL while running. I don't think PyCrypto does this, so you won't get any parallel work done here.

  4. If the bottleneck was disk performance, you wouldn't see much of an improvement here anyway - in that case it would be better to have one thread that does disk I/O and another to do the encryption. GIL wouldn't be an issue since it is released while doing disk I/O.


Threads are not a magical way to speed up programs - splitting work into threads will usually slow it down unless the program is spending a significant part of its time waiting for I/O. Each new thread adds more overhead to the code in splitting the work up, and more overhead in the OS in switching between threads.

In theory if you are running on a multi-processor CPU then the threads could be run on different processors so the work is done in parallel, but even then there is no point in having more threads than processors.

In practice it is quite different, at least for the C version of Python. The GIL does not work well at all with multiple processors. See this presentation by David Beazley for the reasons why. IronPython and Jython do not have this problem.

If you really want to parallelize the work then it is better to spawn multiple processes and farm the work out to them, but there is the possibility that the inter-process communication overhead of passing around large blocks of data will negate any benefit of parallelism.


I watched the presentation that Dave Kirby linked to and tried the example counter which takes more that twice as long to run in two threads:

import time
from threading import Thread

countmax=100000000

def count(n):
    while n>0:
        n-=1

def main1():
    count(countmax)
    count(countmax)

def main2():
    t1=Thread(target=count,args=(countmax,))
    t2=Thread(target=count,args=(countmax,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

def timeit(func):
    start = time.time()
    func()
    end=time.time()-start
    print ("Elapsed Time: {0}".format(end))

if __name__ == '__main__':
    timeit(main1)
    timeit(main2)

Outputs:

Elapsed Time: 21.5470001698
Elapsed Time: 55.3279998302

However, if I change Thread for Process:

from multiprocessing import Process

and

t1=Process(target ....

etc. I get this output:

Elapsed Time: 20.5
Elapsed Time: 10.4059998989

Now its as if my Pentium CPU has two cores, I bet its the hyperthreading. Can anyone try this on their two or four core machine and run 2 or 4 threads?

See the python 2.6.4 documentation for multiprocessing


Threads have a couple different uses:

  1. They only provide speedup if they allow you to get multiple pieces of hardware working at the same time on your problem, whether that hardware is CPU cores or disk heads.

  2. They allow you to keep track of multiple sequences of I/O events that would be much more complicated without them, such as simultaneous conversations with multiple users.

The latter is not done for performance, but for clarity of code.


Just a quick note to update this thread: python 3.2 has a new implementation of the GIL which relieves a lot of the overheads associated with multithreading, but does not eliminate the locking. (i.e. it does not allow you to use more than one core, but it allows you to use multiple threads on that core efficiently).

0

精彩评论

暂无评论...
验证码 换一张
取 消