I've got a Python script running Django for database and memcache, but it's notably runnning as a standalone daemon (i.e. not responding to webserver requests). The daemon checks a Django model Requisition for objects with a status=STATUS_NEW
, then marks them STATUS_WORKING and puts them into a queue.
A number of processes (created using the multiprocess package) will pull things out of the Queue and do work on the Requisition with the pr.id
that was passed to the Queue. I believe the memory leak is probably in the following code (but it could be in the 'Worker' code on the other side of the Queue though this is unlikely because because the memory size is growing even when no Requisitions are coming up -- i.e. when the workers are all blocking on Queue.get()).
from requisitions.models import Requisition # our Django model
from multiprocessing import Queue
while True:
# Wait for "N"ew requisitions, then pop them into the queue.
for pr in Requisition.objec开发者_如何学Pythonts.all().filter(status=Requisition.STATUS_NEW):
pr.set_status(pr.STATUS_WORKING)
pr.save()
queue.put(pr.id)
time.sleep(settings.DAEMON_POLL_WAIT)
Where settings.DAEMON_POLL_WAIT=0.01
.
It seems if I leave this running for a period of time (i.e. a couple days) the Python process will grow to infinite size and eventually the system will run out of memory.
What's going on here (or how can I find out), and more importantly - how can you run a daemon that does this?
My first thought is to change the dynamic of the function, notably by putting the check for new Requisition objects into a django.core.cache cache
, i.e.
from django.core.cache import cache
while True:
time.sleep(settings.DAEMON_POLL_WAIT)
if cache.get('new_requisitions'):
# Possible race condition
cache.clear()
process_new_requisitions(queue)
def process_new_requisitions(queue):
for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
pr.set_status(pr.STATUS_WORKING)
pr.save()
queue.put(pr.id)
The process that's creating Requisitions with status=STATUS_NEW
can do a cache.set('new_requisitions', 1)
(or alternatively we could catch a signal or Requisition.save() event where a new Requisition is being created and then set the flag in the cache from there).
However I'm not sure that the solution I've proposed here addresses the memory issues (which are probably related to garbage collection - so the scoping by way of the process_new_requisitions
may solve the problem).
I'm grateful for any thoughts and feedback.
You need to regularly reset a list of queries that Django keeps for debugging purposes. Normally it is cleared after every request, but since your application is not request based, you need to do this manually:
from django import db
db.reset_queries()
See also:
"Debugging Django memory leak with TrackRefs and Guppy" by Mikko Ohtamaa:
Django keeps track of all queries for debugging purposes (connection.queries). This list is reseted at the end of HTTP request. But in standalone mode, there are no requests. So you need to manually reset to queries list after each working cycle
"Why is Django leaking memory?" in Django FAQ - it talks both about setting
DEBUG
toFalse
, which is always important, and about clearing the list of queries usingdb.reset_queries()
, important in applications like yours.
Does the settings.py file for the daemon process have DEBUG = True
? If so, Django keeps in memory a record of all the SQL it has run so far, which can lead to a memory leak.
I had a lot of data crunching to do, so, my solution to this issue was using multiprocessing, and using pools to counteract whatever memory bloat was happening.
To keep it simple, I just defined some "global" (top-level, whatever the term is in Python) functions instead of trying to make things pickle-able.
Here it is in abstract form:
import multiprocessing as mp
WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound
# this is a global function
def worker(params):
# do stuff
return something_for_the_callback_to_analyze
# this is a global function
def worker_callback(worker_return_value):
# report stuff, or pass
# My multiprocess_launch was inside of a class
def multiprocess_launcher(params):
# somehow define a collection
while True:
if len(collection) == 0:
break
# Take a slice
pool_sub_batch = []
for _ in range(WORKERS):
if collection: # as long as there's still something in the collection
pool_sub_batch.append( collection.pop() )
# Start a pool, limited to the slice
pool_size = WORKERS
if len(pool_sub_batch) < WORKERS:
pool_size = len(pool_sub_batch)
pool = mp.Pool(processes=pool_size)
for sub_batch in pool_sub_batch:
pool.apply_async(worker, args = (sub_batch), callback = worker_callback)
pool.close()
pool.join()
# Loop, more slices
Apart from db.reset_queries() and DEBUG = False tricks, here is another approach: Just spawn another process that performs the django query and feeds the queue. This process will work in its own memory context, and after performing your task it will release back your memory.
I believe that sometimes (if not always) it's inevitable to control memory issues with a long running process that performs heavy django transactions.
精彩评论