Tutorial :Deadlock in ThreadPoolExecutor



Question:

Encountered a situation when ThreadPoolExecutor is parked in execute(Runnable) function while all the ThreadPool threads are waiting in getTask func, workQueue is empty.

Does anybody have any ideas?

The ThreadPoolExecutor is created with ArrayBlockingQueue, and corePoolSize == maximumPoolSize = 4

[Edit] To be more precise, the thread is blocked in ThreadPoolExecutor.exec(Runnable command) func. It has the task to execute, but doesn't do it.

[Edit2] The executor is blocked somewhere inside the working queue (ArrayBlockingQueue).

[Edit3] The callstack:

thread = front_end(224)  at sun.misc.Unsafe.park(Native methord)  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)  at  java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)  at  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)  at  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114)  at  java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)  at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)  at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224)  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653)  at net.listenThread.WorkersPool.execute(WorkersPool.java:45)  

at the same time the workQueue is empty (checked using remote debug)

[Edit4] Code working with ThreadPoolExecutor:

public WorkersPool(int size) {    pool = new ThreadPoolExecutor(size, size, IDLE_WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(WORK_QUEUE_CAPACITY),        new ThreadFactory() {          @NotNull          private final AtomicInteger threadsCount = new AtomicInteger(0);            @NotNull          public Thread newThread(@NotNull Runnable r) {            final Thread thread = new Thread(r);            thread.setName("net_worker_" + threadsCount.incrementAndGet());            return thread;          }        },          new RejectedExecutionHandler() {          public void rejectedExecution(@Nullable Runnable r, @Nullable ThreadPoolExecutor executor) {            Verify.warning("new task " + r + " is discarded");          }        });    }      public void execute(@NotNull Runnable task) {      pool.execute(task);    }      public void stopWorkers() throws WorkersTerminationFailedException {      pool.shutdownNow();      try {        pool.awaitTermination(THREAD_TERMINATION_WAIT_TIME, TimeUnit.SECONDS);      } catch (InterruptedException e) {        throw new WorkersTerminationFailedException("Workers-pool termination failed", e);      }    }  }  


Solution:1

It sounds like it is a bug with an JVM's older than 6u21. There was an issue in the compiled native code for some (maybe all) OS's.

From the link:

The bug is caused by missing memory barriers in various Parker::park() paths that can result in lost wakeups and hangs. (Note that PlatformEvent::park used by built-in synchronization is not vulnerable to the issue). -XX:+UseMembar constitues a work-around because the membar barrier in the state transition logic hides the problem in Parker::. (that is, there's nothing wrong with the use -UseMembar mechanism, but +UseMembar hides the bug Parker::). This is a day-one bug introduced with the addition of java.util.concurrent in JDK 5.0. I developed a simple C mode of the failure and it seems more likely to manifest on modern AMD and Nehalem platforms, likely because of deeper store buffers that take longer to drain. I provided a tentative fix to Doug Lea for Parker::park which appears to eliminate the bug. I'll be delivering this fix to runtime. (I'll also augment the CR with additional test cases and and a longer explanation). This is likely a good candidate for back-ports.

Link: JVM Bug

Workarounds are available, but you would probably be best off just getting the most recent copy of Java.


Solution:2

I don't see any locking in the code of ThreadPoolExecutor's execute(Runnable). The only variable there is the workQueue. What sort of BlockingQueue did you provide to your ThreadPoolExecutor?

On the topic of deadlocks:

You can confirm this is a deadlock by examining the Full Thread Dump, as provided by <ctrl><break> on Windows or kill -QUIT on UNIX systems.

Once you have that data, you can examine the threads. Here is a pertinent excerpt from Sun's article on examining thread dumps (suggested reading):

For hanging, deadlocked or frozen programs: If you think your program is hanging, generate a stack trace and examine the threads in states MW or CW. If the program is deadlocked then some of the system threads will probably show up as the current threads, because there is nothing else for the JVM to do.

On a lighter note: if you are running in an IDE, can you ensure that there are no breakpoints enabled in these methods.


Solution:3

This deadlock probably because you run task from executor itself. For example, you submit one task, and this one fires another 4 tasks. If you have pool size equals to 4, then you just totally overflow it and last task will wait until someone of task return value. But the first task wait for all forked tasks to be completed.


Solution:4

As someone already mentioned, this sounds like normal behaviour, the ThreadPoolExecutor is just waiting to do some work. If you want to stop it, you need to call:

executor.shutdown()

to get it to terminate, usually followed by a executor.awaitTermination


Solution:5

The library code source is below (that's in fact a class from http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip),
- a bit complicated - added protection against repeated calls of FutureTask if I'm not mistaken - but doesn't seem like deadlock prone - very simple ThreadPool usage:

package net.spy.memcached.transcoders;    import java.util.concurrent.ArrayBlockingQueue;  import java.util.concurrent.Callable;  import java.util.concurrent.ExecutionException;  import java.util.concurrent.Future;  import java.util.concurrent.FutureTask;  import java.util.concurrent.ThreadPoolExecutor;  import java.util.concurrent.TimeUnit;  import java.util.concurrent.TimeoutException;  import java.util.concurrent.atomic.AtomicBoolean;    import net.spy.memcached.CachedData;  import net.spy.memcached.compat.SpyObject;    /**   * Asynchronous transcoder.   */  public class TranscodeService extends SpyObject {        private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L,              TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),              new ThreadPoolExecutor.DiscardPolicy());        /**       * Perform a decode.       */      public <T> Future<T> decode(final Transcoder<T> tc,              final CachedData cachedData) {            assert !pool.isShutdown() : "Pool has already shut down.";            TranscodeService.Task<T> task = new TranscodeService.Task<T>(                  new Callable<T>() {                      public T call() {                          return tc.decode(cachedData);                      }                  });            if (tc.asyncDecode(cachedData)) {              this.pool.execute(task);          }          return task;      }        /**       * Shut down the pool.       */      public void shutdown() {          pool.shutdown();      }        /**       * Ask whether this service has been shut down.       */      public boolean isShutdown() {          return pool.isShutdown();      }        private static class Task<T> extends FutureTask<T> {          private final AtomicBoolean isRunning = new AtomicBoolean(false);            public Task(Callable<T> callable) {              super(callable);          }            @Override          public T get() throws InterruptedException, ExecutionException {              this.run();              return super.get();          }            @Override          public T get(long timeout, TimeUnit unit) throws InterruptedException,                  ExecutionException, TimeoutException {              this.run();              return super.get(timeout, unit);          }            @Override          public void run() {              if (this.isRunning.compareAndSet(false, true)) {                  super.run();              }          }      }    }  


Solution:6

Definitely strange.

But before writing your own TPE try:

  • another BlockingQueue impl., e.g. LinkedBlockingQueue

  • specify fairness=true in ArrayBlockingQueue, i.e. use new ArrayBlockingQueue(n, true)

From those two opts I would chose second one 'cause it's very strange that offer() being blocked; one reason that comes into mind - thread scheduling policy on your Linux. Just as an assumption.


Note:If u also have question or solution just comment us below or mail us on toontricks1994@gmail.com
Previous
Next Post »