开发者

Race condition with Python mmap and multiprocessing.semaphore

开发者 https://www.devze.com 2023-03-17 13:48 出处:网络
I am writing a script that processes some mmaps concurrently with multiprocessing.Process and updates a result list stored in an mmap and locked with a mutex.

I am writing a script that processes some mmaps concurrently with multiprocessing.Process and updates a result list stored in an mmap and locked with a mutex.

My function to write 开发者_开发问答to the result list looks like this

def update_result(result_mmap, new_value, new_value_index, sema):
    sema.acquire()
    result_mmap.seek(0)
    old_result = result_mmap.readline().split("\t")
    old_result[new_value_index] = new_value
    new_result = "\t".join(map(str, old_result))
    result_mmap.resize(len(new_result))
    result_mmap.seek(0)
    result_mmap.write(new_result)
    sema.release()

This works SOMETIMES, but other times, depending on the order of execution of the processes, it seems that the result_mmap isn't resizing properly. I am not sure where to look from here- I know that a race condition exists but I don't know why.

Edit: This is the function that calls update_result:

def apply_function(mmapped_files, function, result_mmap, result_index, sema):
    for mf in mmapped_files:
        accumulator = int(mf.readline())
        while True:
            line = mf.readline()
            if line is None or line == '':
                break
            num = int(line)
            accumulator = function(num, accumulator)
        update_result(result_mmap, result_index, inc, sema)


Maybe I'm wrong, but are you sure that the semaphore really works between the processes (is it a system mutex?). Because if it's not, processes do not share the same memory space. What you might want to consider using would be the threading library, in order for the threads to use the same semaphore.

0

精彩评论

暂无评论...
验证码 换一张
取 消