ThreadPoolExecutor – basics

ThreadPoolExecutor is an implementation of the ExecutorService, which can be used to execute the submitted tasks using the available Thread pool. Read the Sun Java API docs for more information.

The buzz words

1. Core & max pool size
This is the size of the core (minimam) and maximum possible thread that need to be in the thread pool every time. We can even specify the keep-alive time limit for the threads.

2. Work queue
Work queue is a BlockingQueue, which is used to hold the work/task which is being sent to the executor when all the threads in the pool are busy executing the tasks. Read more about the BlockingQueue in Sun Java API docs.

3. RejectedExecutionHandler
So we have a limited number of threads in the pool to execute the tasks and a fixed sized work queue to hold the additional tasks. But what if the work queue overflows! RejectedExecutionHandler can help us in such situations. Read the Sun Java API docs for more.

Some sample code

Initializing the ThreadPoolExecutor and sending the task. Here we’ve defined a worksQueue with size 2, core & max pool size is 3 and the threads will expire after 10 seconds. We are also starting a daemon thread to monitor the executor status.

BlockingQueue<Runnable> worksQueue = new ArrayBlockingQueue<Runnable>(2);
RejectedExecutionHandler executionHandler = new MyRejectedExecutionHandelerImpl();

// Create the ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 10,
        TimeUnit.SECONDS, worksQueue, executionHandler);
executor.allowCoreThreadTimeOut(true);

// Starting the monitor thread as a daemon
Thread monitor = new Thread(new MyMonitorThread(executor));
monitor.setDaemon(true);
monitor.start();

// Adding the tasks
executor.execute(new MyWork("1"));
executor.execute(new MyWork("2"));
executor.execute(new MyWork("3"));
executor.execute(new MyWork("4"));
executor.execute(new MyWork("5"));
executor.execute(new MyWork("6"));
executor.execute(new MyWork("7"));
executor.execute(new MyWork("8"));

try
{
    Thread.sleep(30000);
    executor.execute(new MyWork("9"));
}
catch (Exception e)
{
    e.printStackTrace();
}

RejectedExecutionHandler implementation.

/**
 * The custom {@link RejectedExecutionHandler} to handle the rejected
 * tasks / {@link Runnable}
 */
public class MyRejectedExecutionHandelerImpl
implements RejectedExecutionHandler
{
    @Override
    public void rejectedExecution(Runnable runnable,
            ThreadPoolExecutor executor)
    {
        System.out.println(runnable.toString() + " : I've been rejected ! ");
    }
}

My task (implements Runnable).

/**
 * My {@link Runnable} class. Represents a task which need to be executed.
 */
public class MyWork implements Runnable
{
    String name;

    public MyWork(String name)
    {
        this.name = name;
    }

    @Override
    public void run()
    {
        System.out.println(this.name + " : I'm running ! ");

        try
        {
            Thread.sleep(5000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

        System.out.println(this.name + " : I'm done ! ");
    }

    @Override
    public String toString()
    {
        return (this.name);
    }
}

And the monitor thread.

/**
 * My monitor thread. To monitor the status of {@link ThreadPoolExecutor}
 * and its status.
 */
public class MyMonitorThread implements Runnable
{
    ThreadPoolExecutor executor;

    public MyMonitorThread(ThreadPoolExecutor executor)
    {
        this.executor = executor;
    }

    @Override
    public void run()
    {
        try
        {
            do
            {
                System.out.println(
                    String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                        this.executor.getPoolSize(),
                        this.executor.getCorePoolSize(),
                        this.executor.getActiveCount(),
                        this.executor.getCompletedTaskCount(),
                        this.executor.getTaskCount(),
                        this.executor.isShutdown(),
                        this.executor.isTerminated()));
                Thread.sleep(3000);
            }
            while (true);
        }
        catch (Exception e)
        {
        }
    }
}

So we have all necessary code blocks in place. If we execute the above code we may get something like this.

1 : I'm running !
2 : I'm running !
3 : I'm running !
6 : I've been rejected !
7 : I've been rejected !
8 : I've been rejected !
[monitor] [0/3] Active: 1, Completed: 0, Task: 1, isShutdown: false, isTerminated: false
[monitor] [3/3] Active: 3, Completed: 0, Task: 5, isShutdown: false, isTerminated: false
1 : I'm done !
4 : I'm running !
2 : I'm done !
5 : I'm running !
3 : I'm done !
[monitor] [3/3] Active: 2, Completed: 3, Task: 5, isShutdown: false, isTerminated: false
[monitor] [3/3] Active: 2, Completed: 3, Task: 5, isShutdown: false, isTerminated: false
4 : I'm done !
5 : I'm done !
[monitor] [3/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
[monitor] [2/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
[monitor] [2/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
[monitor] [0/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
[monitor] [0/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
[monitor] [0/3] Active: 0, Completed: 5, Task: 5, isShutdown: false, isTerminated: false
9 : I'm running !
[monitor] [1/3] Active: 1, Completed: 5, Task: 6, isShutdown: false, isTerminated: false
[monitor] [1/3] Active: 1, Completed: 5, Task: 6, isShutdown: false, isTerminated: false
9 : I'm done !
[monitor] [1/3] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [1/3] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [1/3] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false

About these ads