Thursday, September 25, 2008

The Y Combinator in Scala

A while ago I ran across an implementation of the Y combinator in Java. My first thought was, "Woah, this is cool!", followed immediately by "Woah, this is hideous!"

Below is my port of that code to Scala. Hopefully it's just as cool, and just a little less hideous.

case class B[F,T](c: B[F, T] => (F => T)) extends (B[F, T] => (F => T)) {
def apply(b: B[F, T]) = c(b);
}

def Y[F, T] = (f: (F => T) => F => T) =>
B[F, T](x => f(x(x)(_)))(B(x => f(x(x)(_))))

val factorial = Y[Int, Int](f => i => if (i <= 0) 1 else f(i - 1) * i)
factorial(6) == 720

Note that it's a straight port of the Java code, so it doesn't make any use at all of Scala's more powerful type system. (It does, however, make liberal use of type inference and syntactic sugar, especially for Function types.) A question for the type experts: can Scala do better with more advanced types?

Wednesday, September 3, 2008

A Scalable Language, and a Scalable Framework

I'm a pretty big fan of the MapReduce framework. With two fairly simple classes, a Map and a Reduce (influenced by, but not the same as, functional programming constructors of the same name), you can easily write programs that operate on terabytes of data. Apache's Hadoop is a popular open source version of MapReduce, and it's used by Yahoo, Amazon, and Facebook, to name a few.

I'm also a pretty big fan of Scala, and luckily Hadoop is written in Java, which is to say that you can use Hadoop from Scala pretty easily. Unfortunately, like many Java libraries, Hadoop requires a lot of boiler plate, and it's of course not well integrated with some of the higher-order paradigms that Scala supports so well. But maybe we can fix that.

Since my group is just now moving to both Scala (slowly) and Hadoop (somewhat more quickly), I thought it would be good to help combine the two. The result is SMR, which provides a wrapper around Hadoop to make everything much more Scala-like.

WordCount: Java

A pretty basic introduction to MapReduce is the word count example. The task is, given a series of files, to segment all the words in a collection of files, and return a list of each word and the number of time it appears. Here's the vanilla Java Hadoop example, taken from Hadoop's tutorial, leaving out imports:

public class WordCount {
public static class Map extends MapReduceBase
implements Mapper<LongWritable, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);
}
}

The basic idea is pretty simple. For the map: take each line in each file, tokenize it, and for each word emit a pair of that word and 1. The reduce receives a word and an iterator of counts and then sums them together, and emits the word with the sum.

WordCount: Scala

The problem is that this remarkably simple program gets lost in all that boilerplate. Let's take a look at how SMR tackles the problem:
object WordCount {
def main(args : Array[String]) {
val (h,remainingArgs) = Hadoop.fromArgs(args,new Path("output"));
val words = for( (offset,line) <- h.loadLines(remainingArgs);
word <- line.split(" \n\r\t\f"))
yield(word,1);

val sums = words.hreduce{ (word,iter) =>
(word,iter.reduceLeft(_+_));
}
sums.elements foreach println
}
}

Both programs accomplish the same task, but one takes 1/4 the number of lines. The reasons for the disparity are multiple, but there are a few key features of Scala that account for most of it:
  1. Syntactic Support for Map: By treating for loops as syntactic sugar for map and its cousins, Scala allows you to create new classes that interact with the primitive looping construct in intuitive ways.
  2. Strong Type System: Scala's type system not only takes a lot of the boiler plate away from the code, but it makes library writer's lives easier by making much richer type information available at compile time. In SMR, the type system is used to automatically select the correct "input format type" and figure out what the types of the Maps and the Reduces are.
  3. Higher-Order and Anonymous Functions: The combination of functions that take other functions and the ability to create new functions on the fly easily and succinctly obviates a lot of the boiler plate associated with defining a new "Mapper" and "Reducer" for every task. Instead, the compiler automatically creates the functions that perform the task for you.
SMR is still very much in development, as my group is still transitioning to Hadoop, but I hope that people find the code useful in scaling up their problems to deal with terabytes and terabytes of data.

Update:

I was criticized for comparing unfairly comparing Scala to Java, and that I should compare Scala/SMR to a less uninspired language like Erlang. So I found this from this blog post:



test_map_reduce() ->
M_func = fun(Line) ->
lists:map(
fun(Word) ->
{Word, 1}
end, Line)
end,

R_func = fun(V1, Acc) ->
Acc + V1
end,

map_reduce(3, 5, M_func, R_func, 0,
[[this, is, a, boy],
[this, is, a, girl],
[this, is, lovely, boy]]).



I don't know Erlang, but this looks pretty reasonable to me (thought it doesn't have to deal with string tokenization), and it's is competitive in legibility with SMR (and, in some ways, I'll readily admit, better). But as someone like me stuck in JVM-land, I'm still pretty satisfied with the Scala wrapper, especially since it buys compatibility with all the goodies available in Hadoop.