I'm using collectiveidea's version of delayed_job: https://github.com/collectiveidea/delayed_job
Can anyone point me to the actual criteria used when a worker goes to pick the next job to work on? I'm assuming it's something like "SELECT id FROM delayed_jobs WHERE run_at > NOW() ORDER BY priority ASC, run_at ASC LIMIT 1" (picks by priority first, run_at time second), but I haven't been able to find exactly what's considered. I've done some poking about in the code on GitHub, but haven't found the actual query for the next job. Curious on a couple things, including 开发者_如何学运维whether 'created_at', 'attempts' or 'failed_at' factor into prioritization at all. (and I realize I'm unlikely to find actual SQL, just an easy way to represent what I assume the query does).
Secondarily, any good sources of actual documentation for this gem? For something that's used so commonly in Rails, the documentation I've seen is pretty darn sparse.
A bit of digging through the source showed this in backend/active_record.rb:
scope :ready_to_run, lambda {|worker_name, max_run_time|
where(['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name])
}
scope :by_priority, order('priority ASC, run_at ASC')
# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
::ActiveRecord::Base.silence do
scope.by_priority.all(:limit => limit)
end
end
Also, this bit in backend/base.rb is of interest:
def reserve(worker, max_run_time = Worker.max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(worker.name, 5, max_run_time).detect do |job|
job.lock_exclusively!(max_run_time, worker.name)
end
end
reserve
is called by a worker to choose the next job.
delayed_job/lib/delayed/backend/active_record.rb
self.find_available
calls ready_to_run
then scopes it based on the current workers min/max priority
then orders it by priority ASC, run_at ASC
the scope :ready_to_run
calls
where(['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name])
so it generates something like
SELECT * FROM jobs where run at <= ? AND locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL AND priority >= ? AND priority <= ? ORDER BY priority ASC, run_at ASC
精彩评论