Tutorial :Computing map: computing value ahead of time


I have a computing map (with soft values) that I am using to cache the results of an expensive computation.

Now I have a situation where I know that a particular key is likely to be looked up within the next few seconds. That key is also more expensive to compute than most.

I would like to compute the value in advance, in a minimum-priority thread, so that when the value is eventually requested it will already be cached, improving the response time.

What is a good way to do this such that:

  1. I have control over the thread (specifically its priority) in which the computation is performed.
  2. Duplicate work is avoided, i.e. the computation is only done once. If the computation task is already running then the calling thread waits for that task instead of computing the value again (FutureTask implements this. With Guava's computing maps this is true if you only call get but not if you mix it with calls to put.)
  3. The "compute value in advance" method is asynchronous and idempotent. If a computation is already in progress it should return immediately without waiting for that computation to finish.
  4. Avoid priority inversion, e.g. if a high-priority thread requests the value while a medium-priority thread is doing something unrelated but the the computation task is queued on a low-priority thread, the high-priority thread must not be starved. Maybe this could be achieved by temporarily boosting the priority of the computing thread(s) and/or running the computation on the calling thread.

How could this be coordinated between all the threads involved?

Additional info
The computations in my application are image filtering operations, which means they are all CPU-bound. These operations include affine transforms (ranging from 50µs to 1ms) and convolutions (up to 10ms.) Of course the effectiveness of varying thread priorities depends on the ability of the OS to preempt the larger tasks.


You can arrange for "once only" execution of the background computation by using a Future with the ComputedMap. The Future represents the task that computes the value. The future is created by the ComputedMap and at the same time, passed to an ExecutorService for background execution. The executor can be configured with your own ThreadFactory implementation that creates low priority threads, e.g.

class LowPriorityThreadFactory implements ThreadFactory  {     public Thread newThread(Runnable r) {       Tread t = new Thread(r);       t.setPriority(MIN_PRIORITY);       return t;     }  }  

When the value is needed, your high-priority thread then fetches the future from the map, and calls the get() method to retrieve the result, waiting for it to be computed if necessary. To avoid priority inversion you add some additional code to the task:

class HandlePriorityInversionTask extends FutureTask<ResultType>  {     Integer priority;  // non null if set     Integer originalPriority;     Thread thread;     public ResultType get() {        if (!isDone())            setPriority(Thread.currentThread().getPriority());        return super.get();     }     public void run() {        synchronized (this) {           thread = Thread.currentThread();           originalPriority = thread.getPriority();           if (priority!=null) setPriority(priority);        }         super.run();     }     protected synchronized void done() {           if (originalPriority!=null) setPriority(originalPriority);           thread = null;     }       void synchronized setPriority(int priority) {         this.priority = Integer.valueOf(priority);         if (thread!=null)            thread.setPriority(priority);     }  }  

This takes care of raising the priority of the task to the priority of the thread calling get() if the task has not completed, and returns the priority to the original when the task completes, normally or otherwise. (To keep it brief, the code doesn't check if the priority is indeed greater, but that's easy to add.)

When the high priority task calls get(), the future may not yet have begun executing. You might be tempted to avoid this by setting a large upper bound on the number of threads used by the executor service, but this may be a bad idea, since each thread could be running at high priority, consuming as much cpu as it can before the OS switches it out. The pool should probably be the same size as the number of hardware threads, e.g. size the pool to Runtime.availableProcessors(). If the task has not started executing, rather than wait for the executor to schedule it (which is a form of priority inversion, since your high priority thread is waiting for the low-priority threads to complete) then you may choose to cancel it from the current executor and re-submit on an executor running only high-priority threads.


One common way of coordinating this type of situation is to have a map whose values are FutureTask objects. So, stealing as an example some code I wrote from a web server of mine, the essential idea is that for a given parameter, we see if there is already a FutureTask (meaning that the calculation with that parameter has already been scheduled), and if so we wait for it. In this example, we otherwise schedule the lookup, but that could be done elsewhere with a separate call if that was desirable:

  private final ConcurrentMap<WordLookupJob, Future<CharSequence>> cache = ...      private Future<CharSequence> getOrScheduleLookup(final WordLookupJob word) {      Future<CharSequence> f = cache.get(word);      if (f == null) {        Callable<CharSequence> ex = new Callable<CharSequence>() {          public CharSequence call() throws Exception {            return doCalculation(word);          }        };        Future<CharSequence> ft = executor.submit(ex);        f = cache.putIfAbsent(word, ft);        if (f != null) {          // somebody slipped in with the same word -- cancel the          // lookup we've just started and return the previous one          ft.cancel(true);        } else {          f = ft;        }      }      return f;    }  

In terms of thread priorities: I wonder if this will achieve what you think it will? I don't quite understand your point about raising the priority of the lookup above the waiting thread: if the thread is waiting, then it's waiting, whatever the relative priorities of other threads... (You might want to have a look at some articles I've written on thread priorities and thread scheduling, but to cut a long story short, I'm not sure that changing the priority will necessarily buy you what you're expecting.)


I suspect that you are heading down the wrong path by focusing on thread priorities. Usually the data that a cache holds is expensive to compute due I/O (out-of-memory data) vs. CPU bound (logic computation). If you're prefetching to guess a user's future action, such as looking at unread emails, then it indicates to me that your work is likely I/O bound. This means that as long as thread starvation does not occur (which schedulers disallow), playing games with thread priority won't offer much of a performance improvement.

If the cost is an I/O call then the background thread is blocked waiting for the data to arrive and processing that data should be fairly cheap (e.g. deserialization). As the change in thread priority won't offer much of a speed-up, performing the work asynchronously on background threadpool should be sufficient. If the cache miss penalty is too high, then using multiple layers of caching tends to help to further reduce the user perceived latency.


As an alternative to thread priorities, you could perform a low-priority task only if no high-priority tasks are in progress. Here's a simple way to do that:

AtomicInteger highPriorityCount = new AtomicInteger();    void highPriorityTask() {    highPriorityCount.incrementAndGet();    try {      highPriorityImpl();    } finally {      highPriorityCount.decrementAndGet();      }  }    void lowPriorityTask() {    if (highPriorityCount.get() == 0) {      lowPriorityImpl();    }  }  

In your use case, both Impl() methods would call get() on the computing map, highPriorityImpl() in the same thread and lowPriorityImpl() in a different thread.

You could write a more sophisticated version that defers low-priority tasks until the high-priority tasks complete and limits the number of concurrent low-priority tasks.

Note:If u also have question or solution just comment us below or mail us on toontricks1994@gmail.com
Next Post »