Tuesday, February 17, 2015

Difference between using mutex (synchronized keyword) or semaphore (especially binary semaphore) - Java

Mutex
A mutex object is a synchronization object whose state is set to signaled when it is not owned by any thread, and non signaled when it is owned. Only one thread at a time can own a mutex object, whose name comes from the fact that it is useful in coordinating mutually exclusive access to a shared resource.
Semaphore -
- Before obtaining an item each thread must acquire a permit from the semaphore, guaranteeing that an item is available for use. When the thread has finished with the item it is returned back to the pool and a permit is returned to the semaphore, allowing another thread to acquire that item.  Note that no synchronization lock is held when {@link #acquire} is called as that would prevent an item from being returned to the pool.  The semaphore encapsulates the synchronization needed to restrict access to the pool, separately from any synchronization needed to maintain the consistency of the pool itself.
A semaphore initialized to one, and which is used such that it only has at most one permit available, can serve as a mutual exclusion lock.  This is more commonly known as a binary semaphore, because it only has two states: one permit available, or zero permits available.  When used in this way, the binary semaphore has the property (unlike many Lock implementations), that thelock can be released by a thread other than the owner (as semaphores have no notion of ownership).  This can be useful in some specialized contexts, such as deadlock recovery.
Semaphore (What you can/can't do) -
  1. Can be used anywhere in a program, but should not be used in a monitor
  2. Acquire() does not always block the caller (i.e., when the semaphore counter is greater than zero).
  3. Signal() either releases a blocked thread, if there is one, or increases the semaphore counter.
  4. If Signal() releases a blocked thread, the caller and the released thread both continue.
  5. As a programmer you need to call release on semaphore to increment semaphore count
  6. Most importantly (Problem) - Any other thread can call signal() and increment the semaphore count.
Monitor (What you can/can't do) -
  1. Wait() always blocks the caller
  2. Signal() either releases a blocked thread, if there is one, or the signal is lost as if it never happens.
  3. If Signal() releases a blocked thread, the caller yields the monitor  Only one of the caller or the released thread can continue, but not both.
  4. No need to release lock explicitely
  5. Only acquiring thread can release the lock.

Reference -
http://blog.metawrap.com/2004/11/09/semaphore-vs-monitor/
http://stackoverflow.com/questions/5083818/is-it-better-to-synchronize-with-semaphores-or-with-monitors
http://msdn.microsoft.com/en-us/library/windows/desktop/ms684266(v=vs.85).aspx
Code -
Below is the example which shows three different scenarios, output1 and output2 generate the same output as it uses mutex and binary semaphore.
Ouput3 - shows how using binary semaphore to access critical resource can create problems if not implemented correctly/ carefully.
public class Main {
      public static void main(String[] args) {
            //mainSemaphore(args);
       mainMonitor(args);
      }     
      public static void mainSemaphore(String[] args) {
            PrintQueue printQueue = new PrintQueueSemaphore();
            Thread thread[] = new Thread[10];
            for (int i = 0; i < 5; i++) {
                  thread[i] = new Thread(new Job(printQueue), "ThreadSemaphore" + i);
            }
            for (int i = 0; i < 5; i++) {
                  //((PrintQueueSemaphore)printQueue).semaphore.release();
                  thread[i].start();
            }
      }
     
      public static void mainMonitor(String[] args) {
            PrintQueue printQueue = new PrintQueueMonitor();
            Thread thread[] = new Thread[10];
            for (int i = 0; i < 5; i++) {
                  thread[i] = new Thread(new Job(printQueue), "ThreadMonitor" + i);
            }
            for (int i = 0; i < 5; i++) {
                  thread[i].start();
            }
      }
}
 
public class Job implements Runnable {
      private PrintQueue printQueue;
      public Job(PrintQueue printQueue) {
            this.printQueue = printQueue;
      }
 
      @Override
      public void run() {
            System.out.printf("%s: Going to print a job\n", Thread.currentThread()
                        .getName());
            printQueue.printJob(new Object());
      }
}
 
public interface PrintQueue {
     
      public void printJob(Object docuemnt);
}
public class PrintQueueMonitor implements PrintQueue {
 
      public synchronized void printJob(Object docuemnt) {
            System.out.println("Monitor acquired >>>>");
            try {
                  long duration = (long) (Math.random() * 10);
                  System.out.printf(
                              "%s: PrintQueue: Printing a Job during %d seconds\n",
                              Thread.currentThread().getName(), duration);
                  Thread.sleep(duration);
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }
            System.out.println("Monitor released <<<");
      }
}
import java.util.concurrent.Semaphore;
public class PrintQueueSemaphore implements PrintQueue {
      public final Semaphore semaphore;
 
      public PrintQueueSemaphore() {
            semaphore = new Semaphore(1);
      }
 
      public void printJob(Object document) {
            try {
                  semaphore.acquire();
                  System.out.println("Semaphore acquired >>>>");
                  long duration = (long) (Math.random() * 10);
                  System.out.printf(
                              "%s: PrintQueue: Printing a Job during %d seconds\n",
                              Thread.currentThread().getName(), duration);
                  Thread.sleep(duration);
            } catch (InterruptedException e) {
                  e.printStackTrace();
            } finally {
                  semaphore.release();
                  System.out.println("Semaphore released <<<");
            }
      }
}
Output1:  Comment out line 7 in mainMonitor method in Main.java class and run the code with mainSemaphore only (With correct Semaphore implementation).
ThreadSemaphore0: Going to print a job
ThreadSemaphore4: Going to print a job
ThreadSemaphore3: Going to print a job
ThreadSemaphore2: Going to print a job
ThreadSemaphore1: Going to print a job
Semaphore acquired >>>>
ThreadSemaphore0: PrintQueue: Printing a Job during 2 seconds
Semaphore released <<<
Semaphore acquired >>>>
ThreadSemaphore4: PrintQueue: Printing a Job during 0 seconds
Semaphore released <<<
Semaphore acquired >>>>
ThreadSemaphore3: PrintQueue: Printing a Job during 8 seconds
Semaphore released <<<
Semaphore acquired >>>>
ThreadSemaphore2: PrintQueue: Printing a Job during 4 seconds
Semaphore released <<<
Semaphore acquired >>>>
ThreadSemaphore1: PrintQueue: Printing a Job during 6 seconds
Semaphore released <<<

Output2: Comment out line 6 mainSemaphore in Main.java class and run the code with mainMonitor only (With Monitor).

ThreadMonitor2: Going to print a job
ThreadMonitor4: Going to print a job
ThreadMonitor3: Going to print a job
ThreadMonitor0: Going to print a job
ThreadMonitor1: Going to print a job
Monitor acquired >>>>
ThreadMonitor2: PrintQueue: Printing a Job during 9 seconds
Monitor released <<<
Monitor acquired >>>>
ThreadMonitor1: PrintQueue: Printing a Job during 7 seconds
Monitor released <<<
Monitor acquired >>>>
ThreadMonitor0: PrintQueue: Printing a Job during 0 seconds
Monitor released <<<
Monitor acquired >>>>
ThreadMonitor3: PrintQueue: Printing a Job during 1 seconds
Monitor released <<<
Monitor acquired >>>>
ThreadMonitor4: PrintQueue: Printing a Job during 9 seconds
Monitor released <<<

Output1 and Output2 will be identical.
Output3: When semaphore released by some other code. Uncomment line 17 in Main.java class (Result could vary).

ThreadSemaphore0: Going to print a job
ThreadSemaphore4: Going to print a job
Semaphore acquired >>>>
ThreadSemaphore3: Going to print a job
ThreadSemaphore2: Going to print a job
ThreadSemaphore1: Going to print a job
ThreadSemaphore4: PrintQueue: Printing a Job during 8 seconds
Semaphore acquired >>>>
Semaphore acquired >>>>
Semaphore acquired >>>>
ThreadSemaphore3: PrintQueue: Printing a Job during 9 seconds
ThreadSemaphore2: PrintQueue: Printing a Job during 6 seconds
Semaphore acquired >>>>
ThreadSemaphore0: PrintQueue: Printing a Job during 5 seconds
ThreadSemaphore1: PrintQueue: Printing a Job during 0 seconds
Semaphore released <<<
Semaphore released <<<
Semaphore released <<<
Semaphore released <<<
Semaphore released <<<
----------------------
Conclusion -
Semaphore and Monitor are meant to solve two different problems. Semaphores should be used when multiple threads needs to access critical resource simultaneously.
  1. e.g. if you want to implement a pool connection and only 100 connections object are available then you can use a semaphore with 100 count and if there are more request then they will wait until one of the existing connection object is released.
Whereas, Monitor are used when only one thread need to have access to critical resources at a time.

Why wait should be used inside a loop?

Sometimes, people are not sure why we should use wait inside a loop instead of using an if condition. In short, notify/notifyAll can be called by some other parts of the code and your code can fail if you don't check the condition in the (for) loop

Easy to understand with an example -

public class SynchronizationMain {
  public static void main(String[] args) {
      final EventStorage storage = new EventStorage();
      startEvilThread(storage);
      final Producer producer = new Producer(storage);
      final Thread producerThread = new Thread(producer);
      final Consumer consumer = new Consumer(storage);
      final Thread consumerThread = new Thread(consumer);
      producerThread.start();
      consumerThread.start();
  }

  public static void startEvilThread(final EventStorage storage) {
      Runnable evilThread = new Runnable() {
      public void run() {
      while(true) {
      synchronized(storage) {
      storage.notifyAll();
      }
      }
      }
      };
  Thread wrapper = new Thread(evilThread);
  wrapper.start();
  }
}

public class EventStorage {
  private int maxSize;
  private Queue storage;

  public EventStorage() {
  maxSize = 10;
  storage = new LinkedList();
  }

  public synchronized void set() {
  while (storage.size() == maxSize) {
  try {
  wait();
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  storage.offer(new Date());
  System.out.printf("Set: %d",storage.size());
  System.out.println("");
  notifyAll();
  }

  public synchronized void get() {
  while (storage.size() == 0) {
  try {
  wait();
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  System.out.printf("Get: %d: %s",storage.size(), storage.poll());
  System.out.println("");
  notifyAll();
  }
}

public class Producer implements Runnable {
  private EventStorage storage;

  public Producer(EventStorage storage) {
  this.storage = storage;
  }

  @Override
  public void run() {
  for(int i = 0; i < 100; i++) {
  storage.set();
  }
  }
}

public class Consumer implements Runnable {
  private EventStorage storage;
  public Consumer(EventStorage storage) {
      this.storage = storage;
  }

  @Override
  public void run() {
      for(int i = 0; i < 100; i++) {
      storage.get();
  }
  }
}

Run the same program before and after changing while loop to if in set and get method of EventStorage class and you should notice the difference.

Wednesday, January 1, 2014

Hadoop questions and answers

Q. What is the default block size in HDFS.
A. 64 mb

Q. What is the difference between block and split.
A. Block is unit of continuous memory space in HDFS where as split is logical set process by one map task, generally block and split size are same and could differ in cases where a record fall between two splits in that case the records is considered as part of the first split.

Q. How can you make sure that you don't split a file.
A. You need to write your own InputFormat class and override isSplittable and return always 'false'. 

Q.When a reducer method of a reducer get called.
A. Not until all mapper finished processing their input.

Q. What if output of your mapper does not match with reducer input.
A. Job will fail with ClassCastException at run time.

Q. Are keys and values in sorted order when passed to reducer?
A. Keys are sorted but values are not.

Q.Where intermediate data emitted from mapper get written?
A. Local file system of the node where mapper is running.

Q.What's the default replication factor.
A. 3

Q. How Hadoop decide where/how to store replicated data?
A. Data block 'd' stored in 3 different nodes n1, n2, n3 (assuming replication factor 3), under two different racks r1, r2.

Q. Can you configure the number of mappers for your input file?
A. You can configure how many mapper will run in parallel under a node but you can't configure total number of mappers as its decided by number of splits (and ultimately by block size), so by changing the block size when can control the number of mappers.


Sunday, December 29, 2013

Hadoop simple program to count occurrence of a character in a file

Prerequisite - a) Java 1.6
                      b) Hadoop (1.2.1) is installed in pseudo mode.


CountDriver (Driver class) -

public class CountDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = new Job(new Configuration(), "Count Driver");
job.setJarByClass(CountDriver.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020//user/chatar/input/practice/count_max.txt"));
FileOutputFormat.setOutputPath(job, new Path(FileNameUtil.HDFS_OUT_DIR +"/" +Calendar.getInstance().getTimeInMillis()));
job.setMapperClass(CountMapper.class);
job.setReducerClass(CountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

CountMapper

public class CountMapper extends Mapper {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] values = value.toString().split(" ");
final Map map = new HashMap();
for(String val : values) {
if(val != null && !val.isEmpty()) {
context.write(new Text(val), new IntWritable(1));
}
}
}
}

CountReducer

public class CountReducer extends Reducer {

public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int total = 0;
for(IntWritable value : values) {
total = total+value.get();
}
context.write(key, new IntWritable(total));
}
}

Input file (count_max.txt) -

g g g h j k l 
b n g f h j j
a a w e r g h
t y u i o p p
d f g h j k l
l m n k k k k 

Output file -

a 2
b 1
d 1
e 1
f 2
g 6
h 4
i 1
j 4
k 6
l 3
m 1
n 2
o 1
p 2
r 1
t 1
u 1
w 1
y 1


Monday, December 24, 2012

Java Interview Questions

Collections -

Explain how HashMap works


HashMap - used array as an internal data structure, where each array position is treated as bucket and elements are stored in buckets using linked list.

Adding into HashMap
 1. Call hashcode method on the key and then use its own hashing function to get a hashcode (This is to make sure that its hashcode failry distribute bukets)
2. Use a mode operation on hashcode with size of array and find out bucket location.
3. Traverse all the elements in the bucket and check if key matches using equals method on keys
4. if key matches update the value else add new entry.

similar steps for delete and retrieve.

Perl how to find a exact word from all files under current directory (One level search only)

# Find out where all you see hello word in files
use warnings;
use strict;

my @files = <*.*>;

foreach my $file(@files) {
    if(-e -f $file) {
        open my $file_handler, '<, $file;
        while(<$file_handler>)  {
            if(/\A(hello)\z / ) {
                print $1;
            }
        }
       close $file_handle;
    }
}

Saturday, November 3, 2012

How to implement your own Map

This post is to show how you can implement a simple Map with only Put, Get and Size operations. This map implementation does not take care of any thread safety and just here to illustrate what all may needed to implement a map kind of data structure.

What all needed - 
  1. eclispe -Juno
  2. JDK 1.7
  3. Junit 4.5
  4. Hamcrest-all-1.1.jar

Test First -


package com.chatar.practice;

import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.*;

import com.chatar.pratice.MyMap;

public class MyMapTest {
   
    @Test(expected=NullPointerException.class)
    public void shouldThrowExceptionIfInsertingNullKey() {
        MyMap myMap = new MyMap();
        myMap.put(null, "some_value");
    }
   
    @Test(expected=NullPointerException.class)
    public void shouldThrowExceptionIfGetingValueForNullKey() {
        MyMap myMap = new MyMap();
        myMap.get(null);
    }
   
    @Test
    public void shouldAbleToPutValues() {
        MyMap myMap = new MyMap();
        myMap.put("key1", "value1");
        myMap.put("key2", "value2");       
        Assert.assertThat(myMap.size(), is(2));
    }
   
    @Test
    public void shouldReturnNullIfKeyNotFound() {
        MyMap myMap = new MyMap();       
        Assert.assertThat(myMap.get("key1"), nullValue());
    }
   
    @Test
    public void shouldOverrideValueIfKeyIsUnique() {
        MyMap myMap = new MyMap();
        myMap.put("key1", "value1");
        Assert.assertThat(myMap.size(), is(1));
        Assert.assertThat(myMap.get("key1"), is("value1"));
        myMap.put("key1", "value2");       
        Assert.assertThat(myMap.size(), is(1));
        Assert.assertThat(myMap.get("key1"), is("value2"));
       
    }
   
    @Test
    public void shouldGetValueByPassingKey() {
        MyMap myMap = new MyMap();
        myMap.put("key1", "value1");
        myMap.put("key2", "value2");       
        Assert.assertThat(myMap.size(), is(2));
        Assert.assertThat(myMap.get("key1"), is("value1"));
        Assert.assertThat(myMap.get("key2"), is("value2"));
    }
}


And Implementation -



package com.chatar.pratice;

public class MyMap {
   
    private Entry[] backets;
    private int size = 0;
   
    public MyMap() {}{
        backets = new Entry[128];
    }
   
    public void put(K key, V value) {
        validate(key);
        Entry entry = backets[backet(key)];
        if(entry != null) {
            addTo(entry, key, value);
        } else {
            backets[backet(key)] = new Entry(key, value);
        }
        size++;
    }

    public V get(K key) {
        validate(key);
        Entry entry = backets[backet(key)];
        while(entry != null && !key.equals(entry.key)) {
            entry = entry.next;
        }
        return entry != null ? entry.value : null;
    }
   
    public int size() {
        return size;
    }

    private void validate(K key) {
        if(key == null) {
            throw new NullPointerException("Key can't be null");   
        }
    }
   
    private void addTo(Entry entry, K key, V value) {
        boolean notFound = true;
        while(notFound) {
            if(entry.hasNext()) {
                if(entry.key.equals(key)) {
                    entry.value = value;
                    notFound = false;
                    size--;
                }
            }
            else if (entry.key.equals(key)) {
                entry.value = value;
                notFound = false;
                size--;
            }
        }
    }

    private int backet(K key) {
        return key.hashCode() % backets.length;
    }
   
    static class Entry {
        K key;
        V value;
        Entry next;
       
        public Entry(K key, V value) {
            this.key = key;
            this.value = value;
        }
       
        public Entry next() {
            return next;
        }
       
        public boolean hasNext() {
            return next != null;
        }
    }
}