Tuesday, February 17, 2015

Difference between using mutex (synchronized keyword) or semaphore (especially binary semaphore) - Java

Mutex
A mutex object is a synchronization object whose state is set to signaled when it is not owned by any thread, and non signaled when it is owned. Only one thread at a time can own a mutex object, whose name comes from the fact that it is useful in coordinating mutually exclusive access to a shared resource.
Semaphore -
- Before obtaining an item each thread must acquire a permit from the semaphore, guaranteeing that an item is available for use. When the thread has finished with the item it is returned back to the pool and a permit is returned to the semaphore, allowing another thread to acquire that item.  Note that no synchronization lock is held when {@link #acquire} is called as that would prevent an item from being returned to the pool.  The semaphore encapsulates the synchronization needed to restrict access to the pool, separately from any synchronization needed to maintain the consistency of the pool itself.
A semaphore initialized to one, and which is used such that it only has at most one permit available, can serve as a mutual exclusion lock.  This is more commonly known as a binary semaphore, because it only has two states: one permit available, or zero permits available.  When used in this way, the binary semaphore has the property (unlike many Lock implementations), that thelock can be released by a thread other than the owner (as semaphores have no notion of ownership).  This can be useful in some specialized contexts, such as deadlock recovery.
Semaphore (What you can/can't do) -
  1. Can be used anywhere in a program, but should not be used in a monitor
  2. Acquire() does not always block the caller (i.e., when the semaphore counter is greater than zero).
  3. Signal() either releases a blocked thread, if there is one, or increases the semaphore counter.
  4. If Signal() releases a blocked thread, the caller and the released thread both continue.
  5. As a programmer you need to call release on semaphore to increment semaphore count
  6. Most importantly (Problem) - Any other thread can call signal() and increment the semaphore count.
Monitor (What you can/can't do) -
  1. Wait() always blocks the caller
  2. Signal() either releases a blocked thread, if there is one, or the signal is lost as if it never happens.
  3. If Signal() releases a blocked thread, the caller yields the monitor  Only one of the caller or the released thread can continue, but not both.
  4. No need to release lock explicitely
  5. Only acquiring thread can release the lock.

Reference -
http://blog.metawrap.com/2004/11/09/semaphore-vs-monitor/
http://stackoverflow.com/questions/5083818/is-it-better-to-synchronize-with-semaphores-or-with-monitors
http://msdn.microsoft.com/en-us/library/windows/desktop/ms684266(v=vs.85).aspx
Code -
Below is the example which shows three different scenarios, output1 and output2 generate the same output as it uses mutex and binary semaphore.
Ouput3 - shows how using binary semaphore to access critical resource can create problems if not implemented correctly/ carefully.
public class Main {
      public static void main(String[] args) {
            //mainSemaphore(args);
       mainMonitor(args);
      }     
      public static void mainSemaphore(String[] args) {
            PrintQueue printQueue = new PrintQueueSemaphore();
            Thread thread[] = new Thread[10];
            for (int i = 0; i < 5; i++) {
                  thread[i] = new Thread(new Job(printQueue), "ThreadSemaphore" + i);
            }
            for (int i = 0; i < 5; i++) {
                  //((PrintQueueSemaphore)printQueue).semaphore.release();
                  thread[i].start();
            }
      }
     
      public static void mainMonitor(String[] args) {
            PrintQueue printQueue = new PrintQueueMonitor();
            Thread thread[] = new Thread[10];
            for (int i = 0; i < 5; i++) {
                  thread[i] = new Thread(new Job(printQueue), "ThreadMonitor" + i);
            }
            for (int i = 0; i < 5; i++) {
                  thread[i].start();
            }
      }
}
 
public class Job implements Runnable {
      private PrintQueue printQueue;
      public Job(PrintQueue printQueue) {
            this.printQueue = printQueue;
      }
 
      @Override
      public void run() {
            System.out.printf("%s: Going to print a job\n", Thread.currentThread()
                        .getName());
            printQueue.printJob(new Object());
      }
}
 
public interface PrintQueue {
     
      public void printJob(Object docuemnt);
}
public class PrintQueueMonitor implements PrintQueue {
 
      public synchronized void printJob(Object docuemnt) {
            System.out.println("Monitor acquired >>>>");
            try {
                  long duration = (long) (Math.random() * 10);
                  System.out.printf(
                              "%s: PrintQueue: Printing a Job during %d seconds\n",
                              Thread.currentThread().getName(), duration);
                  Thread.sleep(duration);
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }
            System.out.println("Monitor released <<<");
      }
}
import java.util.concurrent.Semaphore;
public class PrintQueueSemaphore implements PrintQueue {
      public final Semaphore semaphore;
 
      public PrintQueueSemaphore() {
            semaphore = new Semaphore(1);
      }
 
      public void printJob(Object document) {
            try {
                  semaphore.acquire();
                  System.out.println("Semaphore acquired >>>>");
                  long duration = (long) (Math.random() * 10);
                  System.out.printf(
                              "%s: PrintQueue: Printing a Job during %d seconds\n",
                              Thread.currentThread().getName(), duration);
                  Thread.sleep(duration);
            } catch (InterruptedException e) {
                  e.printStackTrace();
            } finally {
                  semaphore.release();
                  System.out.println("Semaphore released <<<");
            }
      }
}
Output1:  Comment out line 7 in mainMonitor method in Main.java class and run the code with mainSemaphore only (With correct Semaphore implementation).
ThreadSemaphore0: Going to print a job
ThreadSemaphore4: Going to print a job
ThreadSemaphore3: Going to print a job
ThreadSemaphore2: Going to print a job
ThreadSemaphore1: Going to print a job
Semaphore acquired >>>>
ThreadSemaphore0: PrintQueue: Printing a Job during 2 seconds
Semaphore released <<<
Semaphore acquired >>>>
ThreadSemaphore4: PrintQueue: Printing a Job during 0 seconds
Semaphore released <<<
Semaphore acquired >>>>
ThreadSemaphore3: PrintQueue: Printing a Job during 8 seconds
Semaphore released <<<
Semaphore acquired >>>>
ThreadSemaphore2: PrintQueue: Printing a Job during 4 seconds
Semaphore released <<<
Semaphore acquired >>>>
ThreadSemaphore1: PrintQueue: Printing a Job during 6 seconds
Semaphore released <<<

Output2: Comment out line 6 mainSemaphore in Main.java class and run the code with mainMonitor only (With Monitor).

ThreadMonitor2: Going to print a job
ThreadMonitor4: Going to print a job
ThreadMonitor3: Going to print a job
ThreadMonitor0: Going to print a job
ThreadMonitor1: Going to print a job
Monitor acquired >>>>
ThreadMonitor2: PrintQueue: Printing a Job during 9 seconds
Monitor released <<<
Monitor acquired >>>>
ThreadMonitor1: PrintQueue: Printing a Job during 7 seconds
Monitor released <<<
Monitor acquired >>>>
ThreadMonitor0: PrintQueue: Printing a Job during 0 seconds
Monitor released <<<
Monitor acquired >>>>
ThreadMonitor3: PrintQueue: Printing a Job during 1 seconds
Monitor released <<<
Monitor acquired >>>>
ThreadMonitor4: PrintQueue: Printing a Job during 9 seconds
Monitor released <<<

Output1 and Output2 will be identical.
Output3: When semaphore released by some other code. Uncomment line 17 in Main.java class (Result could vary).

ThreadSemaphore0: Going to print a job
ThreadSemaphore4: Going to print a job
Semaphore acquired >>>>
ThreadSemaphore3: Going to print a job
ThreadSemaphore2: Going to print a job
ThreadSemaphore1: Going to print a job
ThreadSemaphore4: PrintQueue: Printing a Job during 8 seconds
Semaphore acquired >>>>
Semaphore acquired >>>>
Semaphore acquired >>>>
ThreadSemaphore3: PrintQueue: Printing a Job during 9 seconds
ThreadSemaphore2: PrintQueue: Printing a Job during 6 seconds
Semaphore acquired >>>>
ThreadSemaphore0: PrintQueue: Printing a Job during 5 seconds
ThreadSemaphore1: PrintQueue: Printing a Job during 0 seconds
Semaphore released <<<
Semaphore released <<<
Semaphore released <<<
Semaphore released <<<
Semaphore released <<<
----------------------
Conclusion -
Semaphore and Monitor are meant to solve two different problems. Semaphores should be used when multiple threads needs to access critical resource simultaneously.
  1. e.g. if you want to implement a pool connection and only 100 connections object are available then you can use a semaphore with 100 count and if there are more request then they will wait until one of the existing connection object is released.
Whereas, Monitor are used when only one thread need to have access to critical resources at a time.

Why wait should be used inside a loop?

Sometimes, people are not sure why we should use wait inside a loop instead of using an if condition. In short, notify/notifyAll can be called by some other parts of the code and your code can fail if you don't check the condition in the (for) loop

Easy to understand with an example -

public class SynchronizationMain {
  public static void main(String[] args) {
      final EventStorage storage = new EventStorage();
      startEvilThread(storage);
      final Producer producer = new Producer(storage);
      final Thread producerThread = new Thread(producer);
      final Consumer consumer = new Consumer(storage);
      final Thread consumerThread = new Thread(consumer);
      producerThread.start();
      consumerThread.start();
  }

  public static void startEvilThread(final EventStorage storage) {
      Runnable evilThread = new Runnable() {
      public void run() {
      while(true) {
      synchronized(storage) {
      storage.notifyAll();
      }
      }
      }
      };
  Thread wrapper = new Thread(evilThread);
  wrapper.start();
  }
}

public class EventStorage {
  private int maxSize;
  private Queue storage;

  public EventStorage() {
  maxSize = 10;
  storage = new LinkedList();
  }

  public synchronized void set() {
  while (storage.size() == maxSize) {
  try {
  wait();
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  storage.offer(new Date());
  System.out.printf("Set: %d",storage.size());
  System.out.println("");
  notifyAll();
  }

  public synchronized void get() {
  while (storage.size() == 0) {
  try {
  wait();
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  System.out.printf("Get: %d: %s",storage.size(), storage.poll());
  System.out.println("");
  notifyAll();
  }
}

public class Producer implements Runnable {
  private EventStorage storage;

  public Producer(EventStorage storage) {
  this.storage = storage;
  }

  @Override
  public void run() {
  for(int i = 0; i < 100; i++) {
  storage.set();
  }
  }
}

public class Consumer implements Runnable {
  private EventStorage storage;
  public Consumer(EventStorage storage) {
      this.storage = storage;
  }

  @Override
  public void run() {
      for(int i = 0; i < 100; i++) {
      storage.get();
  }
  }
}

Run the same program before and after changing while loop to if in set and get method of EventStorage class and you should notice the difference.