Java ThreadPool Executor

In this section , lets discuss about how to run multiple threads and manage the same using java threadpoolexecutor.

First of all lets see how to run a simple thread in java

Simple Thread

package in.techdive.java.examples;

public class MyThread implements Runnable
{
    public void run()
    {
        System.out.println("Thread execution is completed....");
    }
}

public class MainThread
{
    public MainThread()
    {
    }

    public static void main(String[] args)
    {
        Thread tD = new Thread(new MyThread());
        tD.start();
    }
}

The above code needs no explanation as its a straight forward way of creating a thread and starting it.

Consider a scenario where you need to run a set of tasks (for ex: 100) it would be a nightmare to initialize and start such 100 threads.

Java's concurrency api comes handy to solve the above issue. Lets look at ThreadpoolExecutor in detail with an example.

The java.util.concurrent.ThreadPoolExecutor is used to run each job submitted to it using pooled threads. When a set of jobs (Runnable java objects) are submitted to the threadpoolexecutor using a datastructure like ArrayBlockingQueue, the ThreadPoolExecutor executes each task one by one using pooled threads efficiently. The number of threads to be pooled initially and the maximum number of threads to be pooled and other such details can be configured in ThreadPoolExecutor. So the user is relieved from managing the thread details.

Lets simulate how a Manager in a IT company allocates tasks to his employees. For a project, the manager creates a set of tasks and assigns it to the employees using a project scheduler. Employees should pick up those assigned tasks and execute them with in the given time frame. This scenario can be simulated using ThreadPoolExecutor. First lets create a class to represent a Task

Task Interface

package in.techdive.java.examples;

public interface Task extends Runnable
{
    public void executeTask();
}

ProjectTask Class

package in.techdive.java.examples;

public class ProjectTask implements Task
{
    Employee1 e = null;

    public ProjectTask(Employee1 e)
    {
        this.e = e;
    }

    public void executeTask()
    {
        e.executeProjTask();
    }

    public void run()
    {
        executeTask();
    }
}

The ProjectTask class which implements Task( a runnable interface) represents a task which is to be executed by an Employee. When it is run as a thread it calls the executeProjTask() method of the Employee object.

The Employee class is as follows.

The Employee class is as follows.

Employee Class

package in.techdive.java.examples;

public class Employee
{
    private static Map      empObjMap = new HashMap();
    private String        empId     = null;
    private static Employee emp       = null;

    private Employee(String employeeId)
    {
        this.empId = employeeId;
    }

    public static Employee getInstance(String eId)
    {

        if (empObjMap.containsKey(eId))
        {
            return (Employee) empObjMap.get(eId);
        }
        else
        {
            emp = new Employee(eId);
            empObjMap.put(eId, emp);
        }

        return emp;
    }

    public void executeProjTask()
    {
        System.out.println("Employee " + this.empId
                + " completed the project task");
    }
}

The Employee class is implemented in a optimized way so that there is only one object per employee id is created. It has a private constructor, so object creation is done by calling getInstance() static method by passing employee_id. It maintains a map of (employeeid, employeeObject) and returns the employee object if it is already created from the map. New object for employee id is created only when it is not available in map and then after creation it is added to the map.

ProjectTask and Employee classes are created and its time for Manager to assign tasks to the employees using ProjectScheduler.

Take a look at the ProjectScheduler class below.

ProjectScheduler Class

/**
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

package in.techdive.java.examples;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ProjectScheduler
{
    final ArrayBlockingQueue queue       = new ArrayBlockingQueue(10, true);
    int               poolSize      = 2;
    int               maxPoolSize   = 2;
    long                     keepAliveTime = 10;
    ThreadPoolExecutor       threadPool    = null;
    Task                     t       = null;

    public ProjectScheduler()
    {
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, queue);
        threadPool.prestartAllCoreThreads();
    }

    public void executeProjTask()
    {
        while (queue.size() > 0)
        {
            threadPool.execute(queue.poll());
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
        threadPool.shutdown();
    }

    public void addTaskToQueue(Task t)
    {
        queue.add(t);
    }
}

In the constructor of ProjectScheduler() we initialize the threadPoolExecutor parameters max, min pool size ,keepAliveTime and the queue to use for holding tasks before they are executed. ArrayBlockingQueue datastructure is used to hold the tasks to be executed. Its a bounded datastructure backed by an array. Its capacity to be specified while initializing it. The queue should be filled with the Runnable tasks by the Manager using addTaskToQueue(Task) method. Consider the executeProjTask() method , queue.poll() method is used to retrieve(and remove) the element from queue on FIFO basis and threadPool.execute() method executes the retrieved tasks using the internally pooled threads. Finally shutdown() method is called to shutdown the threadpoolexecutor and its pooled threads.

ProjectScheduler is setup. Oops! Now the KingMaker (or Manager) is going to make the employees busy.

Take a look at the Manager class below.

Manager Class

/**
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

package in.techdive.java.examples;

public class Manager
{
    public Manager()
    {
    }

    public static void main(String[] args)
    {
        ProjectScheduler pS = new ProjectScheduler();
        for (int i = 100; i < 110; i++)
        {
            ProjectTask pT = new ProjectTask(Employee.getInstance(i + ""));
            pS.addTaskToQueue(pT);
        }
        pS.executeProjTask();
    }
}

It initializes the ProjectScheduler which internally starts up the ThreadPoolExecutor. Then the manager creates projectTasks and assigns it to each employee. Each projectTasks are added to a queue in ProjectScheduler using addTaskToQueue() method. Finally the ThreadPoolExecutor is executed using executeProjTask() method.

Lets see the output below.

Output:

Employee 100 completed the project task
Employee 101 completed the project task
Employee 102 completed the project task
Employee 103 completed the project task
Employee 104 completed the project task
Employee 106 completed the project task
Employee 107 completed the project task
Employee 108 completed the project task
Employee 109 completed the project task
Employee 105 completed the project task

Use ThreadPoolExecutors when you have to run more number of tasks as separate threads.

Technology: 

Search