Thursday, March 27, 2008

Callable Future - Some code! It's what I do for a living.

package mytest;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;

public class CallableTest implements Callable {
private String myName = "Unknown";
private Random rand = new Random();
public CallableTest(String name) {
// Wahoo I was given a birth name.
myName = name;
}
public String call() throws Exception {
for(int i = 0; i < 100; i++) {
System.out.println("CallableTest - Name [" + myName + "] Count [" + i + "]");
// Lets snooze the thread for a bit so they get out of sequence.
int value = rand.nextInt(101);
/* Execute Brittany Spears Exception when the value is 42 (The answer to all questions)
if(value == 42) {
String oops = "Ooops I did it again!";
System.out.println(oops);
throw new Exception(oops);
}
*/
Thread.sleep(value * 10);
}
return myName;
}
public String getName() {
return myName;
}
public static void main(String[] args) {
int total = 5;
ExecutorService executor = new ScheduledThreadPoolExecutor(total);
Future[] futures = new Future[total];
for(int i = 0; i < total; i++) {
futures[i] = executor.submit(new CallableTest("Test [" + i + "]"));
}
/*
// Bah, we really didn't want to execute these threads. So lets see what
// happens below.
try {
for(int i = 0; i < total; i++) {
boolean flag = futures[i].cancel(true);
if(flag) {
System.out.println("Cancelled");
}
}
} catch (Exception e) {
System.out.println("Exception caught [" + e + "]");
}
*/
/*
// Even though we have multiple threads this logic lines the threads
// back up in the order of execution (submitting).
try {
for(int i = 0; i < total; i++) {
String name = futures[i].get();
System.out.println("Thread Complete [" + name + "]");
}
} catch (ExecutionException e) {
System.out.println("ExecutionException caught [" + e + "]");
} catch (CancellationException e) {
System.out.println("CancellationException caught [" + e + "]");
} catch (Exception e) {
System.out.println("Exception caught [" + e + "]");
}
*/
// Lets try to find the first completed thread.
try{
// Need to create a bucket mechanism so we know which thread completed.
// As a thread completes we empty the bucket. When the bucket is empty
// everything is done.
Vector bucketOhThreads = new Vector();
for(int i = 0; i < total; i++) {
bucketOhThreads.add(futures[i]);
}
while(bucketOhThreads.size() > 0) {
for(int i = 0; i < bucketOhThreads.size(); i++) {
try {
if (((Future)bucketOhThreads.elementAt(i)).isCancelled() ) {
System.out.println("Thread Cancelled - ThreadCount[" + (bucketOhThreads.size() - 1) + "]");
bucketOhThreads.removeElementAt(i);
}
else if (((Future)bucketOhThreads.elementAt(i)).isDone() ) {
Future future = (Future)bucketOhThreads.elementAt(i);
// Doing the above it loses track that the Future objects were
// returning a String. So we have to type cast.
String name = (String)future.get();
System.out.println("Thread Complete - Name[" + name + "] ThreadCount[" + (bucketOhThreads.size() - 1) + "]");
bucketOhThreads.removeElementAt(i);
}
} catch (ExecutionException e) {
System.out.println("ExecutionException caught - [" + e + "] ThreadCount[" + (bucketOhThreads.size() - 1) + "]");
bucketOhThreads.removeElementAt(i);
} catch (CancellationException e) {
System.out.println("CancellationException caught - [" + e + "]. We are checking the isCancelled property. We should NEVER receive this exception here.");
bucketOhThreads.removeElementAt(i);
}
}
// This is were other work should happen. We really shouldn't have
// this thread waiting around for the other threads to execute.
// We really don't want to spin in a tight loop do we, so let's sleep
// for a bit. ;)
Thread.sleep(100);
}
} catch (Exception e) {
System.out.println("Exception caught [" + e + "]");
}
}
}

/*
* ITEMS Not really messed with:
* - ScheduledThreadPoolExecutor
* - Future.get(timeout)
* - I want to know how we get the name (or any data for that matter) from
* the Thread object as well. Right now we have to wait for the thread
* to complete or generate and exception. (lol, I guess that's enough
* info.)
*/