Tutorial :Advice for efficient blocking queries


I would like to store tuples objects in a concurent java collection and then have an efficient, blocking query method that returns the first element matching a pattern. If no such element is available, it would block until such element is present.

For instance if I have a class:

public class Pair {    public final String first;    public final String Second;    public Pair( String first, String second ) {      this.first = first;      this.second = second;    }  }  

And a collection like:

public class FunkyCollection {    public void add( Pair p ) { /* ... */ }    public Pair get( Pair p ) { /* ... */ }  }  

I would like to query it like:

myFunkyCollection.get( new Pair( null, "foo" ) );  

which returns the first available pair with the second field equalling "foo" or blocks until such element is added. Another query example:

myFunkyCollection.get( new Pair( null, null ) );  

should return the first available pair whatever its values.

Does a solution already exists ? If it is not the case, what do you suggest to implement the get( Pair p ) method ?

Clarification: The method get( Pair p) must also remove the element. The name choice was not very smart. A better name would be take( ... ).


Here's some source code. It basically the same as what cb160 said, but having the source code might help to clear up any questions you may still have. In particular the methods on the FunkyCollection must be synchronized.

As meriton pointed out, the get method performs an O(n) scan for every blocked get every time a new object is added. It also performs an O(n) operation to remove objects. This could be improved by using a data structure similar to a linked list where you can keep an iterator to the last item checked. I haven't provided source code for this optimization, but it shouldn't be too difficult to implement if you need the extra performance.

import java.util.*;    public class BlockingQueries  {      public class Pair      {          public final String first;          public final String second;          public Pair(String first, String second)          {              this.first = first;              this.second = second;          }      }        public class FunkyCollection      {          final ArrayList<Pair> pairs = new ArrayList<Pair>();            public synchronized void add( Pair p )          {              pairs.add(p);              notifyAll();          }            public synchronized Pair get( Pair p ) throws InterruptedException          {              while (true)              {                  for (Iterator<Pair> i = pairs.iterator(); i.hasNext(); )                  {                      Pair pair = i.next();                      boolean firstOk = p.first == null || p.first.equals(pair.first);                      boolean secondOk = p.second == null || p.second.equals(pair.second);                      if (firstOk && secondOk)                      {                          i.remove();                          return pair;                                      }                  }                  wait();              }          }         }        class Producer implements Runnable      {          private FunkyCollection funkyCollection;            public Producer(FunkyCollection funkyCollection)          {              this.funkyCollection = funkyCollection;          }            public void run()          {              try              {                  for (int i = 0; i < 10; ++i)                  {                      System.out.println("Adding item " + i);                      funkyCollection.add(new Pair("foo" + i, "bar" + i));                      Thread.sleep(1000);                  }              }              catch (InterruptedException e)              {                  Thread.currentThread().interrupt();              }          }      }        public void go() throws InterruptedException      {          FunkyCollection funkyCollection = new FunkyCollection();          new Thread(new Producer(funkyCollection)).start();          System.out.println("Fetching bar5.");          funkyCollection.get(new Pair(null, "bar5"));          System.out.println("Fetching foo2.");          funkyCollection.get(new Pair("foo2", null));          System.out.println("Fetching foo8, bar8");          funkyCollection.get(new Pair("foo8", "bar8"));          System.out.println("Finished.");      }        public static void main(String[] args) throws InterruptedException      {          new BlockingQueries().go();      }  }  


Fetching bar5.  Adding item 0  Adding item 1  Adding item 2  Adding item 3  Adding item 4  Adding item 5  Fetching foo2.  Fetching foo8, bar8  Adding item 6  Adding item 7  Adding item 8  Finished.  Adding item 9  

Note that I put everything into one source file to make it easier to run.


I know of no existing container that will provide this behavior. One problem you face is the case where no existing entry matches the query. In that case, you'll have to wait for new entries to arrive, and those new entries are supposed to arrive at the tail of the sequence. Given that you're blocking, you don't want to have to examine all the entries that precede the latest addition, because you've already inspected them and determined that they don't match. Hence, you need some way to record your current position, and be able to search forward from there whenever a new entry arrives.

This waiting is a job for a Condition. As suggested in cb160's answer, you should allocate a Condition instance inside your collection, and block on it via Condition#await(). You should also expose a companion overload to your get() method to allow timed waiting:

public Pair get(Pair p) throws InterruptedException;  public Pair get(Pair p, long time, TimeUnit unit) throws InterruptedException;  

Upon each call to add(), call on Condition#signalAll() to unblock the threads waiting on unsatisfied get() queries, allowing them to scan the recent additions.

You haven't mentioned how or if items are ever removed from this container. If the container only grows, that simplifies how threads can scan its contents without worrying about contention from other threads mutating the container. Each thread can begin its query with confidence as to the minimum number of entries available to inspect. However, if you allow removal of items, there are many more challenges to confront.


In your FunkyCollection add method you could call notifyAll on the collection itself every time you add an element.

In the get method, if the underlying container (Any suitable conatiner is fine) doesn't contain the value you need, wait on the FunkyCollection. When the wait is notified, check to see if the underlying container contains the result you need. If it does, return the value, otherwise, wait again.


It appears you are looking for an implementation of Tuple Spaces. The Wikipedia article about them lists a few implementations for Java, perhaps you can use one of those. Failing that, you might find an open source implementation to imitate, or relevant research papers.

Note:If u also have question or solution just comment us below or mail us on toontricks1994@gmail.com
Next Post »