Tagged: thread Toggle Comment Threads | Keyboard Shortcuts

  • Subinkrishna Gopi 3:12 pm on February 19, 2010 Permalink |
    Tags: , executor, , , thread, thread pool, threadpoolexecutor   

    ThreadPoolExecutor – basics 

    ThreadPoolExecutor is an implementation of the ExecutorService, which can be used to execute the submitted tasks using the available Thread pool. Read the Sun Java API docs for more information.

    The buzz words

    1. Core & max pool size
    This is the size of the core (minimam) and maximum possible thread that need to be in the thread pool every time. We can even specify the keep-alive time limit for the threads.

    2. Work queue
    Work queue is a BlockingQueue, which is used to hold the work/task which is being sent to the executor when all the threads in the pool are busy executing the tasks. Read more about the BlockingQueue in Sun Java API docs.

    3. RejectedExecutionHandler
    So we have a limited number of threads in the pool to execute the tasks and a fixed sized work queue to hold the additional tasks. But what if the work queue overflows! RejectedExecutionHandler can help us in such situations. Read the Sun Java API docs for more.

    Some sample code

    Initializing the ThreadPoolExecutor and sending the task. Here we’ve defined a worksQueue with size 2, core & max pool size is 3 and the threads will expire after 10 seconds. We are also starting a daemon thread to monitor the executor status.

    BlockingQueue<Runnable> worksQueue = new ArrayBlockingQueue<Runnable>(2);
    RejectedExecutionHandler executionHandler = new MyRejectedExecutionHandelerImpl();
    
    // Create the ThreadPoolExecutor
    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 10,
            TimeUnit.SECONDS, worksQueue, executionHandler);
    executor.allowCoreThreadTimeOut(true);
    
    // Starting the monitor thread as a daemon
    Thread monitor = new Thread(new MyMonitorThread(executor));
    monitor.setDaemon(true);
    monitor.start();
    
    // Adding the tasks
    executor.execute(new MyWork("1"));
    executor.execute(new MyWork("2"));
    executor.execute(new MyWork("3"));
    executor.execute(new MyWork("4"));
    executor.execute(new MyWork("5"));
    executor.execute(new MyWork("6"));
    executor.execute(new MyWork("7"));
    executor.execute(new MyWork("8"));
    
    try
    {
        Thread.sleep(30000);
        executor.execute(new MyWork("9"));
    }
    catch (Exception e)
    {
        e.printStackTrace();
    }
    

    RejectedExecutionHandler implementation.

    /**
     * The custom {@link RejectedExecutionHandler} to handle the rejected
     * tasks / {@link Runnable}
     */
    public class MyRejectedExecutionHandelerImpl
    implements RejectedExecutionHandler
    {
        @Override
        public void rejectedExecution(Runnable runnable,
                ThreadPoolExecutor executor)
        {
            System.out.println(runnable.toString() + " : I've been rejected ! ");
        }
    }
    

    My task (implements Runnable).

    /**
     * My {@link Runnable} class. Represents a task which need to be executed.
     */
    public class MyWork implements Runnable
    {
        String name;
    
        public MyWork(String name)
        {
            this.name = name;
        }
    
        @Override
        public void run()
        {
            System.out.println(this.name + " : I'm running ! ");
    
            try
            {
                Thread.sleep(5000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
    
            System.out.println(this.name + " : I'm done ! ");
        }
    
        @Override
        public String toString()
        {
            return (this.name);
        }
    }
    

    And the monitor thread.

    /**
     * My monitor thread. To monitor the status of {@link ThreadPoolExecutor}
     * and its status.
     */
    public class MyMonitorThread implements Runnable
    {
        ThreadPoolExecutor executor;
    
        public MyMonitorThread(ThreadPoolExecutor executor)
        {
            this.executor = executor;
        }
    
        @Override
        public void run()
        {
            try
            {
                do
                {
                    System.out.println(
                        String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                            this.executor.getPoolSize(),
                            this.executor.getCorePoolSize(),
                            this.executor.getActiveCount(),
                            this.executor.getCompletedTaskCount(),
                            this.executor.getTaskCount(),
                            this.executor.isShutdown(),
                            this.executor.isTerminated()));
                    Thread.sleep(3000);
                }
                while (true);
            }
            catch (Exception e)
            {
            }
        }
    }
    

    So we have all necessary code blocks in place. If we execute the above code we may get something like this.

    1 : I'm running !
    2 : I'm running !
    3 : I'm running !
    6 : I've been rejected !
    7 : I've been rejected !
    8 : I've been rejected !
    [monitor] [0/3] Active: 1, Completed: 0, Task: 1, isShutdown: false, isTerminated: false
    [monitor] [3/3] Active: 3, Completed: 0, Task: 5, isShutdown: false, isTerminated: false
    1 : I'm done !
    4 : I'm running !
    2 : I'm done !
    5 : I'm running !
    3 : I'm done !
    [monitor] [3/3] Active: 2, Completed: 3, Task: 5, isShutdown: false, isTerminated: false
    [monitor] [3/3] Active: 2, Completed: 3, Task: 5, isShutdown: false, isTerminated: false
    4 : I'm done !
    5 : I'm done !
    [monitor] [3/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
    [monitor] [2/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
    [monitor] [2/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
    [monitor] [0/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
    [monitor] [0/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
    [monitor] [0/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
    9 : I'm running !
    [monitor] [1/3] Active: 1, Completed: 5, Task: 6, isShutdown: false, isTerminated: false
    [monitor] [1/3] Active: 1, Completed: 5, Task: 6, isShutdown: false, isTerminated: false
    9 : I'm done !
    [monitor] [1/3] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
    [monitor] [1/3] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
    [monitor] [1/3] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
    

    Advertisements
     
    • nguyenhuuquan 9:28 pm on April 29, 2013 Permalink

      thanks, it’s helpful. I have a question: if a task take a long time to execute, how to terminate it?

  • Subinkrishna Gopi 7:18 am on July 12, 2007 Permalink |
    Tags: , lock, reentrant lock, synchronization, thread   

    Using ReentrantLock for thread synchronization 

    ReentrantLock was introduced in Java 1.5. This can be considered as a replacement for the traditional “wait-notify” method. The basic concept is, every thread need to acquire the lock before entering in to the critical section and should release it after finishing it. And its the most basic concept of synchronization.

    ReentrantLock eliminates the use of “synchronized” keyword.

    import java.util.concurrent.locks.ReentrantLock;
    final ReentrantLock _lock = new ReentrantLock();
    private void method() throws InterruptedException
    {
       //Trying to enter the critical section
       _lock.lock(); // will wait until this thread gets the lock
       try
       {
          // critical section
       }
       finally
       {
          //releasing the lock so that other threads can get notifies
          _lock.unlock();
       }       
    
    }
    

    Using optional “fairness” parameter with ReentrantLock
    ReentrantLock accepts an optional “fairness” parameter in it’s constructor. Normally what happens is, whenever a thread releases the lock anyone of the waiting threads will get the chance to acquire that lock. But there is no predefined order or priority in the selection of the thread (at least from a programmers perspective).

    But if we are specifying the fairness parameter as “true” while creating a new ReentrantLock object, it gives us the guaranty that the longest waiting thread will get the lock next. Sounds pretty nice right?

    Use of “Condition” in ReentrantLock
    Condition can be considered as a separation of monitor methods (wait(), notify() & notifyAll()). For each ReentrantLock we can define a set of conditions and based on that we can make the threads waiting & things like that.

    import java.util.concurrent.locks.Condition;
    final Condition _aCondition = _lock.newCondition();
    private void method1() throws InterruptedException
    {
       _lock.lock();
       try
       {
          while (condition 1)
          {
              // Waiting for the condition to be satisfied
              // Note: At this time, the thread will give up the lock
              // until the condition is satisfied. (Signaled by other threads)
              _aCondition.await();
          }
          // method body
       }
       finally
       {
          _lock.unlock();
       }       
    
    }
    
    private void method2() throws InterruptedException
    {
       _lock.lock();
       try
       {
          doSomething();
          if (condition 2)
          {
              // Signaling other threads that the condition is satisfied
              // Wakes up any one of the waiting threads
              _aCondition.signal();
    
              // Wakes up all threads waiting for this condition
              _aCondition.signalAll();
          }       
    
          // method body
       }
       finally
       {
          _lock.unlock();
       }
    }
    

    Update [ Subinkrishna on 19-Feb-2009]
    The main difference between RentrantLock and traditional “synchronized” is that, if we are using ReentrantLock, the thread can temporarily release the locks it has when it is going to a wait state. This feature is not available in the traditional way and this can considerably reduce the deadlock situations.

    Some useful links
    http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/locks/ReentrantLock.html

     
    • yuv11 2:53 pm on October 27, 2009 Permalink

      I believe even in synchronized block/method call to wait() does release an object lock. Thats why some other thread can acquire and notify() on completion.

    • Subinkrishna G 11:39 am on October 28, 2009 Permalink

      yuv11, I am not sure whether a thread can release the locks it holds already (if that thread is using wait-notify), while waiting for another one. I don’t think it can.

    • eniosp 2:15 pm on January 31, 2010 Permalink

      The wait releases the lock. From the Object’s class javadoc (wait method):
      The current thread must own this object’s monitor. The thread releases ownership of this monitor and waits until another thread notifies threads waiting on this object’s monitor to wake up either through a call to the notify method or the notifyAll method. The thread then waits until it can re-obtain ownership of the monitor and resumes execution.

    • Subinkrishna G 10:42 am on February 1, 2010 Permalink

      @eniosp,
      Thanks for the update.

    • javabreadjam 8:21 am on February 9, 2010 Permalink

      so basically _aCondition.await(); in your code
      is same as obj.wait() since both give up locks on the object . so your eaxmple is just another way. Only thread.sleep() gives up the lock

    • Noman Khan 9:45 pm on February 25, 2010 Permalink

      Interesting.

    • eniosp 5:01 pm on February 26, 2010 Permalink

      @javabreadjam,
      thread.sleep DOES NOT release the lock. The object keeps the monitor.
      From Thread.sleep javadoc:
      “Causes the currently executing thread to sleep (temporarily cease execution) for the specified number of milliseconds. The thread does not lose ownership of any monitors.”

    • javarevisited 5:42 am on May 11, 2011 Permalink

      wonderful example and explanation , ideal for any beginner. just to add by using re-entrant lock we can build sophisticated cache which support behavior of concurrenthashMap i.e. allowing multiple read concurrently. this is not possible with synchronized keyword in java as it lock both reader and writer. I have blogged some of my experience as How Synchronization works in Java , you may find it interesting.

c
Compose new post
j
Next post/Next comment
k
Previous post/Previous comment
r
Reply
e
Edit
o
Show/Hide comments
t
Go to top
l
Go to login
h
Show/Hide help
shift + esc
Cancel