开发者

Pattern for concurrent cache sharing

开发者 https://www.devze.com 2023-01-30 03:44 出处:网络
Ok I was a little unsure on how best name this problem :) But assume this scenarion, you\'re going out and fetch开发者_Python百科ing some webpage (with various urls) and caching it locally. The cache

Ok I was a little unsure on how best name this problem :) But assume this scenarion, you're going out and fetch开发者_Python百科ing some webpage (with various urls) and caching it locally. The cache part is pretty easy to solve even with multiple threads.

However, imagine that one thread starts fetching an url, and a couple of milliseconds later another want to get the same url. Is there any good pattern for making the seconds thread's method wait on the first one to fetch the page , insert it into the cache and return it so you don't have to do multiple requests. With little enough overhead that it's worth doing even for requests that take about 300-700 ms? And without locking requests for other urls

Basically when requests for identical urls comes in tightly after each other I want the second request to "piggyback" the first request

I had some loose idea of having a dictionary where you insert an object with the key as url when you start fetching a page and lock on it. If there's any matching the key already it get's the object, locks on it and then tries to fetch the url for the actual cache.

I'm a little unsure of the particulars however to make it really thread-safe, using ConcurrentDictionary might be one part of it...

Is there any common pattern and solutions for scenarios like this?

Breakdown wrong behavior:

Thread 1: Checks the cache, it doesnt exists so starts fetching the url

Thread 2: Starts fetching the same url since it still doesn't exist in Cache

Thread 1: finished and inserts into the cache, returns the page

Thread 2: Finishes and also inserts into cache (or discards it), returns the page

Breakdown correct behavior:

Thread 1: Checks the cache, it doesnt exists so starts fetching the url

Thread 2: Wants the same url, but sees it's currently being fetched so waits on thread 1

Thread 1: finished and inserts into the cache, returns the page

Thread 2: Notices that thread 1 is finished and returns the page thread 1 it fetched

EDIT

Most solutions sofar seem to misunderstand the problem and only addressing the caching, as I said that isnt the problem, the problem is when doing an external web fetch to make the second fetch that is done before the first one has cached it to use the result from the first rather then doing a second


You could use a ConcurrentDictionary<K,V> and a variant of double-checked locking:

public static string GetUrlContent(string url)
{
    object value1 = _cache.GetOrAdd(url, new object());

    if (value1 == null)    // null check only required if content
        return null;       // could legitimately be a null string

    var urlContent = value1 as string;
    if (urlContent != null)
        return urlContent;    // got the content

    // value1 isn't a string which means that it's an object to lock against
    lock (value1)
    {
        object value2 = _cache[url];

        // at this point value2 will *either* be the url content
        // *or* the object that we already hold a lock against
        if (value2 != value1)
            return (string)value2;    // got the content

        urlContent = FetchContentFromTheWeb(url);    // todo
        _cache[url] = urlContent;
        return urlContent;
    }
}

private static readonly ConcurrentDictionary<string, object> _cache =
                                  new ConcurrentDictionary<string, object>();


EDIT: My code is quite a bit uglier now, but uses a separate lock per URL. This allows different URLs to be fetched asynchronously, however each URL will only be fetched once.

public class UrlFetcher
{
    static Hashtable cache = Hashtable.Synchronized(new Hashtable());

    public static String GetCachedUrl(String url)
    {
        // exactly 1 fetcher is created per URL
        InternalFetcher fetcher = (InternalFetcher)cache[url];
        if( fetcher == null )
        {
            lock( cache.SyncRoot )
            {
                fetcher = (InternalFetcher)cache[url];
                if( fetcher == null )
                {
                    fetcher = new InternalFetcher(url);
                    cache[url] = fetcher;
                }
            }
        }
        // blocks all threads requesting the same URL
        return fetcher.Contents;
    }

    /// <summary>Each fetcher locks on itself and is initilized with null contents.
    /// The first thread to call fetcher.Contents will cause the fetch to occur, and
    /// block until completion.</summary>
    private class InternalFetcher
    {
        private String url;
        private String contents;

        public InternalFetcher(String url)
        {
            this.url = url;
            this.contents = null;
        }

        public String Contents
        {
            get
            {
                if( contents == null )
                {
                    lock( this ) // "this" is an instance of InternalFetcher...
                    {
                        if( contents == null )
                        {
                            contents = FetchFromWeb(url);
                        }
                    }
                }
                return contents;
            }
        }
    }
}


Will the Semaphore please stand up! stand up! stand up!

use Semaphore you can easily synchronize your threads with it. on both cases where

  1. you are trying to load a page that is currently being cached
  2. you are saving cache to a file where a page is loading from it.

in both scenarios you will face troubles.

it is just like writers and readers problem that is a common problem in Operating System Racing Issues. just when a thread wants to rebuild a cache or start caching a page no thread should read from it. if a thread is reading it it should wait until reading finished and replace the cache, no 2 threads should cache same page in to a same file. hence it is possible for all readers to read from a cache at anytime since no writer is writing on it.

you should read some semaphore using samples on msdn, it is very easy to use. just the thread that wants to do something is call the semaphore and if the resource can granted it do the works otherwise sleeps and wait to be woken up when the resource is ready.


Disclaimer: This might be a n00bish answer. Please pardon me, if it is.

I'd recommend using some shared dictionary object with locks to keep a track of the url being currently fetched or have already been fetched.

  • At every request, check the url against this object.

  • If an entry for the url is present, check the cache. (this means one of the threads has either fetched it or is currently fetching it)

  • If its available in the cache, use it, else put the current thread to sleep for a while and check back again. (if not in cache, some thread is still fetching it, so wait while its done)

  • If the entry is not found in the dictionary object, add the url to it and send the request. Once it obtains a response, add it to cache.

This logic should work, however, you would need to take care of cache expiration and removal of the entry from the dictionary object.


my solution is use atomicBoolean to control access database when cache is timeout or unexist;

at the same moment, only one thread(i call it read-th) can access database, the other threads spin until the read-th return data and write it into cache;

here codes; implement by java;

public class CacheBreakDownDefender<K, R> {

/**
 * false = do not write null to cache when get null value from database;
 */
private final boolean writeNullToCache;

/**
 * cache different query key
 */
private final ConcurrentHashMap<K, AtomicBoolean> selectingDBTagMap = new ConcurrentHashMap<>();


public static <K, R> CacheBreakDownDefender<K, R> getInstance(Class<K> keyType, Class<R> resultType) {
    return Singleton.get(keyType.getName() + resultType.getName(), () -> new CacheBreakDownDefender<>(false));
}

public static <K, R> CacheBreakDownDefender<K, R> getInstance(Class<K> keyType, Class<R> resultType, boolean writeNullToCache) {
    return Singleton.get(keyType.getName() + resultType.getName(), () -> new CacheBreakDownDefender<>(writeNullToCache));
}

private CacheBreakDownDefender(boolean writeNullToCache) {
    this.writeNullToCache = writeNullToCache;
}

public R readFromCache(K key, Function<K, ? extends R> getFromCache, Function<K, ? extends R> getFromDB, BiConsumer<K, R> writeCache) throws InterruptedException {
    R result = getFromCache.apply(key);
    if (result == null) {
        final AtomicBoolean selectingDB = selectingDBTagMap.computeIfAbsent(key, x -> new AtomicBoolean(false));
        if (selectingDB.compareAndSet(false, true)) { 
            try { 
                result = getFromDB.apply(key);
                if (result != null || writeNullToCache) {
                    writeCache.accept(key, result);
                }
            } finally {
                selectingDB.getAndSet(false);
                selectingDBTagMap.remove(key);
            }
        } else {
            
            while (selectingDB.get()) {
                TimeUnit.MILLISECONDS.sleep(0L);
                //do nothing...  
            }
            return getFromCache.apply(key);
        }
    }
    return result;
}

public static void main(String[] args) throws InterruptedException {

    Map<String, String> map = new ConcurrentHashMap<>();
    CacheBreakDownDefender<String, String> instance = CacheBreakDownDefender.getInstance(String.class, String.class, true);

    for (int i = 0; i < 9; i++) {
        int finalI = i;
        new Thread(() -> {
            String kele = null;
            try {
                if (finalI == 6) {
                    kele = instance.readFromCache("kele2", map::get, key -> "helloword2", map::put);
                } else
                    kele = instance.readFromCache("kele", map::get, key -> "helloword", map::put);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            log.info("resut= {}", kele);
        }).start();
    }
    TimeUnit.SECONDS.sleep(2L);
}

}


This is not exactly for concurrent caches but for all caches:

"A cache with a bad policy is another name for a memory leak" (Raymond Chen)

0

精彩评论

暂无评论...
验证码 换一张
取 消