Google

Aug 29, 2012

Article: Scenarios and solutions for better concurrency and thread safety part-3 Semaphores and mutexes

Scenario 1: A web service receives http requests for data, places the request into an internal queue, and a separate worker thread pulls the work item from the queue and performs the work. For example, a trading system creates a buy order and places it in a queue, and a separate consumer thread picks up the order and sends it to the stock exchange. A typical producer and consumer scenario.

Scenario 2: Semaphore can be used to create worker threads even when the number of worker threads to be created is not known upfront. This is because a semaphore can be created with 0 permits, and wait until a number of releases have been made. For example, if you want to recursively traverse through nested folders and spawn a number of worker threads to move the html files found to a destination folder and increment the semaphore permits with a release() method call. A separate thread will wait with the acquire(numberofpermits) to acquire all the released permits before start archiving (i.e. zipping up) those files.

Scenario 3: Reference counting where a shared resource is incremented or decremented. The increment/decrement/test operations must be thread safe. For example, a counter that keeps track of the nummber of active logged in users by incrementing the count when users log in and decrementing the count when the users log out.



Solution: In general, a sem-a-phore means sending messages by holding the arms or two flags or poles in certain positions according to an alphabetic code. In programming, especially in Unix systems, semaphores are a technique for coordinating or synchronizing activities in which multiple processes compete for the same operating system resources. Semaphores are commonly use for two purposes: to share a common memory space and to share access to files. In Java, a semaphore is used for coordinating multiple threads.

Q. What is the difference between a mutex and a semaphore?
A. 

Mutex: is a single key to a toilet. One person can have the key and occupy the toilet  at the time. When finished, the person gives (or releases) the key to the next person in the queue. In Java, every object has a mutex and only a single thread can get hold of a mutex.

Semaphore: Is a number of free identical toilet keys. For example, having 3 toilets with identical locks and keys. The semaphore count is set to 3 at beginning and then the count is decremented as people are acquiring the key to the toilets. If all toilets are full, ie. there are no free keys left, the semaphore count is 0. Now, when one person leaves the toilet, semaphore is increased to 1 (one free key), and given to the next person in the queue.

Here is an example of producer and consumer using a mutex.

public class ProducerConsumer {

 private int count;  //shared resource, i.e. accessed by multiple threads

 public synchronized void consume() {
  while (count == 0) {
   try {
    wait();    //wait till notified
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }

  //gets here if count is > 0
  count--;     //consume the count by decrementing it by 1
  System.out.println("Consumed:" + count);
  notify();  //notify waiting threads to resume

 }

 public synchronized void produce() {
  while(count > 0) {
   try {
    wait();    //wait till notified
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
  
  //gets here if count == 0;
  count++;
  System.out.println("Produced: " + count);
  notify();   //notify waiting threads to resume
 }

 public static void main(String[] args)  {
        //inner classes can only access final variables
  final ProducerConsumer pc = new ProducerConsumer();

  //anonymous inner class for a new producer worker thread 
  Runnable producer = new Runnable() {
   @Override
   public void run() {
    for (int i = 0; i < 3; i++) {
     pc.produce(); 
    }
   }
  };

  
  //anonymous inner class for a new consumer worker thread
  Runnable consumer = new Runnable() {
   @Override
   public void run() {
    for (int i = 0; i < 3; i++) {
      pc.consume();
    }
   }
  };

  Thread producerThread = new Thread(producer); //creates a new worker thread from the main thread
  Thread consumerThread = new Thread(consumer); //creates a new worker thread from the main thread

  producerThread.start();  // executes the run() method in producer
  consumerThread.start();  // executes the run() method in consumer

 }
}


Now, let us look at the similar example with a Semaphore in Java 5. You can create a Semaphore with relevant number of permits, and the permits can be acquired or released.

import java.util.concurrent.Semaphore;

public class ProducerConsumer2 {

 private int count;
 
 /**
  * call to release will increment the permit(s) and 
  * call to acquire will decrement the permit(s)
  */
 static Semaphore semCon = new Semaphore(0);
 static Semaphore semProd = new Semaphore(1); //start with 1 permit

 public  void consume() {
  try {
   //acquire a permit from this semaphore before continuing
   semCon.acquire();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }

  count--;
  System.out.println("Consumed:" + count);
  //releases a permit, returning it to the semaphore.
  semProd.release(); 

 }

 public void produce() {
  try {
   //acquire a permit from this semaphore before continuing
   semProd.acquire(); 
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  count++;
  System.out.println("Produced: " + count);
  //releases a permit, returning it to the semaphore.
  semCon.release();   
 }

 public static void main(String[] args)  {

  final ProducerConsumer2 pc = new ProducerConsumer2();

  //anonymous inner class for a new producer worker thread 
  Runnable producer = new Runnable() {
   @Override
   public void run() {
    for (int i = 0; i < 3; i++) {
     pc.produce(); 
    }
   }
  };

  //anonymous inner class for a new consumer worker thread 
  Runnable consumer = new Runnable() {
   @Override
   public void run() {
    for (int i = 0; i < 3; i++) {
      pc.consume();
    }
   }
  };

  Thread producerThread = new Thread(producer); //creates a new worker thread from the main thread
  Thread consumerThread = new Thread(consumer); //creates a new worker thread from the main thread

  producerThread.start(); // executes the run() method in producer
  consumerThread.start(); // executes the run() method in consumer

 }
}


The output for both the above code:

Produced: 1
Consumed:0
Produced: 1
Consumed:0
Produced: 1
Consumed:0

Labels:

0 Comments:

Post a Comment

Subscribe to Post Comments [Atom]

<< Home