I am writing a script that processes some mmaps concurrently with multiprocessing.Process and updates a result list stored in an mmap and locked with a mutex.
My function to write 开发者_开发问答to the result list looks like this
def update_result(result_mmap, new_value, new_value_index, sema):
sema.acquire()
result_mmap.seek(0)
old_result = result_mmap.readline().split("\t")
old_result[new_value_index] = new_value
new_result = "\t".join(map(str, old_result))
result_mmap.resize(len(new_result))
result_mmap.seek(0)
result_mmap.write(new_result)
sema.release()
This works SOMETIMES, but other times, depending on the order of execution of the processes, it seems that the result_mmap isn't resizing properly. I am not sure where to look from here- I know that a race condition exists but I don't know why.
Edit: This is the function that calls update_result
:
def apply_function(mmapped_files, function, result_mmap, result_index, sema):
for mf in mmapped_files:
accumulator = int(mf.readline())
while True:
line = mf.readline()
if line is None or line == '':
break
num = int(line)
accumulator = function(num, accumulator)
update_result(result_mmap, result_index, inc, sema)
Maybe I'm wrong, but are you sure that the semaphore really works between the processes (is it a system mutex?). Because if it's not, processes do not share the same memory space. What you might want to consider using would be the threading library, in order for the threads to use the same semaphore.
精彩评论