Soluzione con i semafori

Siccome monitor e semafori sono equivalenti, il problema del produttore-consumatore può essere risolto anche con i semafori. In particolare, ne servono tre:

I tre semafori vengono creati all’interno della classe ProdCons (quella contenente il main), che, per semplicità, li memorizza in degli attributi statici pubblici (cioè, di fatto, in delle variabili globali):

import java.util.concurrent.Semaphore;

public class ProdCons {
	public static final Semaphore mutex = new Semaphore(1);
	public static final Semaphore full = new Semaphore(0);
	public static final Semaphore empty = new Semaphore(1);

	public static void main(String[] args) {
		CellaCondivisa cella = new CellaCondivisa();
		new Produttore(cella).start();
		new Consumatore(cella).start();
	}
}

I valori di inizializzazione dei semafori sono importanti:

In questa versione della soluzione, la cella condivisa si occupa solo della mutua esclusione, che realizza usando il semaforo mutex:

public class CellaCondivisa {
	private static final int BUFFER_SIZE = 1;
	private int numItems = 0;
	private int value;

	public int getItem() throws InterruptedException {
		ProdCons.mutex.acquire();
		int tmp = value; //
		numItems--; // Sezione critica
		System.out.println(tmp + " read"); //
		ProdCons.mutex.release();
		return tmp;
	}

	public void setItem(int v) throws InterruptedException {
		ProdCons.mutex.acquire();
		value = v; //
		System.out.println(v + " written"); // Sezione critica
		numItems++; //
		ProdCons.mutex.release();
	}

	public int getCurrentSize() {
		return numItems;
	}
}

Invece, il blocco in caso di buffer pieno/vuoto è gestito direttamente nel codice dei due tipi di thread:

public class Produttore extends Thread {
	private CellaCondivisa cella;

	public Produttore(CellaCondivisa cella) {
		this.cella = cella;
	}

	public void run() {
		int i = 0;
		while (true) {
			try {
				// Blocca se il buffer è pieno.
				ProdCons.empty.acquire();
				cella.setItem(i);
			} catch (InterruptedException e) { break; }
			// Adesso il buffer contiene almeno un elemento:
			// sblocca un eventuale consumatore in attesa.
			ProdCons.full.release();
			i++;
		}
	}
}

public class Consumatore extends Thread {
	private CellaCondivisa cella;

	public Consumatore(CellaCondivisa cella) {
		this.cella = cella;
	}

	public void run() {
		while (true) {
			int v;
			try {
				// Blocca se il buffer non contiene almeno un elemento.
				ProdCons.full.acquire();
				v = cella.getItem();
			} catch(InterruptedException e) { break; }
			// Adesso il buffer è sicuramente non pieno:
			// sblocca un eventuale produttore in attesa.
			ProdCons.empty.release();
		}
	}
}

Osservazione: mentre la cella condivisa fa sempre acquire e release sullo stesso semaforo, mutex, qui le chiamate acquire e release in uno stesso metodo sono fatte su semafori diversi.

Realizzazione di una coda

Solitamente, il buffer usato dai produttori e consumatori ha dimensioni maggiori di uno, e gli elementi vengono consumati nello stesso ordine in cui sono prodotti. Per memorizzare gli elementi, serve allora una coda (cioè un buffer FIFO, First In First Out).

Il codice seguente usa un array per realizzare una coda “artigianale”, che è thread-safe (cioè non soggetta a race condition), ma non bloccante: se un thread prova fare un inserimento quando la coda è piena, o a estrarre un valore quando la coda è vuota, esso non rimane in attesa di poter effettivamente compiere tale operazione; invece, viene visualizzato un messaggio d’errore, e il programma termina. Spetterà quindi al codice che vi accede avitare inserimenti quando la coda è piena ed estrazioni quando essa è vuota.

public class CellaCondivisaCoda {
	private final int BUFFER_SIZE;
	private int numItems = 0;
	private int[] values;
	private int first = 0; // Indice dell'elemento meno recente
	private int last = 0; // Indice dopo l'elemento più recente
	// first == last corrisponde a una coda vuota

	public CellaCondivisaCoda(int bufferSize) {
		BUFFER_SIZE = bufferSize;
		values = new int[BUFFER_SIZE];
	}
	public int getItem() throws InterruptedException {
		ProdCons.mutex.acquire();
		if (numItems == 0) {
			System.err.println("Lettura di buffer vuoto!");
			System.exit(1);
		}
		numItems--;
		int tmp = values[first];
		first = (first + 1) % BUFFER_SIZE;
		System.out.println(tmp + " read");
		ProdCons.mutex.release();
		return tmp;
	}

	public void setItem(int v) throws InterruptedException {
		ProdCons.mutex.acquire();
		if (numItems == BUFFER_SIZE) {
			System.err.println("Scrittura di buffer pieno!");
			System.exit(1);
		}
		values[last] = v;
		last = (last + 1) % BUFFER_SIZE;
		numItems++;
		System.out.println(v + " written");
		ProdCons.mutex.release();
	}

	public int getCurrentSize() {
		return numItems;
	}
}