I'm looking for an efficient approach to rate-limiting request from Google App Engine to a third party service. The third party service rate limits开发者_运维知识库 requests on a per-account basis, and on the Google App Engine side, most of the work is carried out inside tasks. Token buckets are a great general algorithm here.
Q: what approach can be used to efficiently rate-limit requests on a per-account rather than per-service basis?
This should not involve setting up rates on GAE task queues as the number of requests per account and the number of accounts serviced will vary greatly. For performance reason I'm most interested in memcache-based (incr/decr?) ideas!
I think this boils down to memcache-based token bucket?
Thoughts?
I kept this project as a bookmark a while ago : http://code.google.com/p/gaedjango-ratelimitcache/
Not really an answer to your specific question but maybe it could help you get started.
I know this is an old question, but it's a top search result and I thought others might find an alternative I made useful. It's a bit more granular (down to the second), simple (only a single function), and performant (only one memcache lookup) than the solution above:
import webapp2
from functools import wraps
from google.appengine.api import memcache
def rate_limit(seconds_per_request=1):
def rate_limiter(function):
@wraps(function)
def wrapper(self, *args, **kwargs):
added = memcache.add('%s:%s' % (self.__class__.__name__, self.request.remote_addr or ''), 1,
time=seconds_per_request, namespace='rate_limiting')
if not added:
self.response.write('Rate limit exceeded.')
self.response.set_status(429)
return
return function(self, *args, **kwargs)
return wrapper
return rate_limiter
class ExampleHandler(webapp2.RequestHandler):
@rate_limit(seconds_per_request=2)
def get(self):
self.response.write('Hello, webapp2!')
Here is how I implemented token bucket with memcache on GAE:
Edit: taking (another) stab at this.
This is borrowed in part from https://github.com/simonw/ratelimitcache/blob/master/ratelimitcache.py
def throttle(key, rate_count, rate_seconds, tries=3):
'''
returns True if throttled (not enough tokens available) else False
implements token bucket algorithm
'''
client = memcache.Client(CLIENT_ARGS)
for _ in range(tries):
now = int(time.time())
keys = ['%s-%s' % (key, str(now-i)) for i in range(rate_seconds)]
client.add(keys[0], 0, time=rate_seconds+1)
tokens = client.get_multi(keys[1:])
tokens[keys[0]] = client.gets(keys[0])
if sum(tokens.values()) >= rate_count:
return True
if client.cas(keys[0], tokens[keys[0]] + 1, time=rate_seconds+1) != 0:
return False
logging.error('cache contention error')
return True
Here are usage examples:
def test_that_it_throttles_too_many_requests(self):
burst = 1
interval = 1
assert shared.rate_limit.throttle('test', burst, interval) is False
assert shared.rate_limit.throttle('test', burst, interval) is True
def test_that_it_doesnt_throttle_burst_of_requests(self):
burst = 16
interval = 1
for i in range(burst):
assert shared.rate_limit.throttle('test', burst, interval) is False
time.sleep(interval + 1) # memcache has 1 second granularity
for i in range(burst):
assert shared.rate_limit.throttle('test', burst, interval) is False
精彩评论