Saturday 8 April 2017

Multithreading (Java Concurrent API - II)

This article is in continuation of Java Concurrent API - I. Please do read that article for other features of API. Java concurrent API is very useful if you writing multithreading code. These in built classes help programmers to handle lot of complex scenario with ease. It always recommended to use theses API classes instead of writing your own implementation.



1. What is CountDownLatch?

Answer: java.utill.concurrent.CountDownLatch class offers the functionality where one or more thread have to wait for other threads to finish their task. Once all the thread finish their task then the thread(s) that are waiting come out of waiting state.
We initialize CountDownLatch with a count. Then threads(which are doing some task) need to call countDown() method which decrease the count by one. All the threads which are waiting they need to call await() method. Once count reaches to zero all the threads which are waiting(called await) come out of waiting state.
There are three importnat methods:
1. countDown() : Every call to countDown() method decreases the size(initial count) of countdownLatch by one.
2. await() : Threads which need to wait for other threads to complete call await() method. Theses waiting thread become alive once count reaches zero due to invocation of countDown() method.
Example: One of the famous uses of countDownLatch is where we divide a task into N sub task and then parent thread have to wait for all N sub task to finish.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
 public static void main(String[] args) throws InterruptedException {
  System.out.println(Thread.currentThread().getName()+ " started");
  CountDownLatch countDownLatch = new CountDownLatch(4); //A CountDownLatch with count 4
  IncrementTask incrementTask = new IncrementTask();
  incrementTask.countDownLatch = countDownLatch;
  Thread th1 = new Thread(incrementTask);
  Thread th2 = new Thread(incrementTask);
  Thread th3 = new Thread(incrementTask);
  Thread th4 = new Thread(incrementTask);
  
  th1.start();
  th2.start();
  th3.start();
  th4.start();
  
  countDownLatch.await(); //Main thread will wait till count reaches to zero.
  
  System.out.println(Thread.currentThread().getName()+ " Finished");
 }
}


class IncrementTask implements Runnable{
 CountDownLatch countDownLatch;
 @Override
 public void run() {
  System.out.println(Thread.currentThread().getName()+ " started");
  try {
   Thread.sleep(1000); // Just to replicate that some task is being done
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  System.out.println(Thread.currentThread().getName()+ " Finished");
  countDownLatch.countDown(); // Every invocation will decrease the count by 1.
 }
 
} 

In above code we have started 4 worker thread. We have also initialized CountDownLatch of count 4. Each thread once finishes their task will call countDown() method which will decrease the count.
Main thread called await() method which sent main thread into waiting state till count reaches to zero.
If you run above code you will find that main thread finishes last after all 4 thread complete their task.
But if you comment the line number 19 (main thread won't call await()) then you will notice that main thread finishes first without waiting for all 4 thread.


2. What is CyclicBarrier?

Answerjava.utill.concurrent.CyclicBarrier class offers the barriers. Barriers or nothing but a point in your code where all threads need to wait for each other until all the threads reach there.
It is called Cyclic because we can reset the barrier and we can re-use it.
Example: In below example we have 3 child services(threads) and 1 master service(main thread). Master services will start once all three child services would started. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
 public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
  CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
  Thread threadService1 = new Thread(new Service1(cyclicBarrier));
  Thread threadService2 = new Thread(new Service2(cyclicBarrier));
  Thread threadService3 = new Thread(new Service3(cyclicBarrier));
  System.out.println("Main service is waiting for all other dependent services to start");
  threadService1.start();
  threadService2.start();
  threadService3.start();
  cyclicBarrier.await();
  System.out.println("Main service is started");
 }
}

class Service1 implements Runnable{
 CyclicBarrier cyclicBarrier;
 public Service1(CyclicBarrier cyclicBarrier){
  this.cyclicBarrier = cyclicBarrier;
 }
 @Override
 public void run() {
  // TODO Auto-generated method stub
  try {
   System.out.println("Service1 is started");
   cyclicBarrier.await();
  } catch (InterruptedException | BrokenBarrierException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
 
}

class Service2 implements Runnable{
 CyclicBarrier cyclicBarrier;
 public Service2(CyclicBarrier cyclicBarrier){
  this.cyclicBarrier = cyclicBarrier;
 }
 @Override
 public void run() {
  try {
   System.out.println("Service2 is started");
   cyclicBarrier.await();
  } catch (InterruptedException | BrokenBarrierException e) {
   e.printStackTrace();
  }  
 }
 
}

class Service3 implements Runnable{
 CyclicBarrier cyclicBarrier;
 public Service3(CyclicBarrier cyclicBarrier){
  this.cyclicBarrier = cyclicBarrier;
 }
 @Override
 public void run() {
  try {
   System.out.println("Service3 is started");
   cyclicBarrier.await();
  } catch (InterruptedException | BrokenBarrierException e) {
   e.printStackTrace();
  }   
 }
 
}
Ma In above code we have initialized CyclicBarrier with 4 parties(count). Once all these 4 parties will reach to common barrier by calling await() method all 4 threads can resume their execution.

3. What is difference between CyclicBarrier and CountDownLatch?

AnswerThe only difference between these two is "We can re-set the count of CyclicBarrier in order to re-use the barrier but we can't reset the count of CountDownLatch, It's one time activity ".
CountDownLatch is good for one time activity like application startup etc. whereas CyclicBarrier is use full for recurring activity like complex computation where we divide the computation activity in sub task and at the end combine them all to produce final results.

4. What is difference between join() and CountDownLatch/CyclicBarrier?

Answer: join() method and CountDownLatch/CyclicBarrier both provide same functionality. If you look closely at the codes in both case thread(s) need(s) to wait for other threads to finish.
join() method works fine If you are handling thread by your self but if you are using ExecutorService of concurrent API then you have to use CountDownLatch/CyclicBarrier.
Since ExecurorService doesn't reveal how threads are working internally from you hence you can not directly use the join() method, So you would have to use CountDownLatch/CyclicBarrier. 

No comments:

Post a Comment