The last time I had occasion to work my way through SICP I found myself reading the first edition… because that’s all there was. So when some of my fellow developers at my day job started up a technical book club and led off with the second edition of SICP I was quite intrigued. This post reflects the first of (hopefully) several observations on re-reading a classic.
At a recent meetup for this book club one of the exercises in the book gathered a fair share of attention. That exercise, 1.11, can be found in the HTML version of the second edition and reads as follows:
Exercise 1.11. A function f is defined by the rule that f(n) = n if n<3 and
f(n) = f(n - 1) + 2f(n - 2) + 3f(n - 3) if n> 3. Write a procedure that
computes f by means of a recursive process. Write a procedure that computes
f by means of an iterative process.
We consider this problem from the perspective of Haskell, Scheme and Clojure. The sample code below was known to run correctly on GHC 6.12.1, Chicken Scheme 4.5.0 and Clojure 1.5.1 via Leiningen [1].
A Haskell version of the recursive process is almost a literal transcription of the problem statement:
The problem is that we’ve also got a straightforward instance of tree recursion here. To highlight this fact uncomment the printf statement in the above code and compute the value of (recur-fn 4):
A moment of reflection should make it clear that this is exactly the behaviour you’d expect to see from tree recursion.
So what might an iterative solution look like? The base case of our recursive definition is f(0), f(1) and f(2) and larger values of f(x) build on smaller values so it seems fairly obvious that we want an ascending iteration from the values that make up our base case. At each iteration we might add the newly-computed value to a list of observed values, returning the last value from this list once we’ve iterated up to our input value [2]. Another moment of reflection allows us to see that what we’re describing is a fold which produces a list as it’s value. Fold (sometimes called reduce or, in really odd languages, inject… for no obvious reason [3]) is a higher-order function, a topic which will be covered in the next chapter.
One might object that a list of values, one for each index in our iteration, sounds a lot more like a map operation than a fold/reduce. And you’d be correct… so long as you don’t consider the inputs. A map operation is allowed a single input, in this case the index of an item in the list, and that input isn’t sufficient to compute values of f(x) for x >= 3; the history we need to consider is simply lost. But a fold/reduce operation introduces the notion of an accumulator (of some arbitrary type) which is also made available to our computation as a parameter. So long as that accumulator has access to the history of our computation our function has all the inputs it needs. And if our accumulator is nothing more than a repository of previously-computed values (in the form of, say, a list) we have exactly that history.
The key here is the realization that fold/reduce is isomorphic onto a recursive procedure which is also an iterative process:
So we crank out an implementation based on fold, using a few of the methods from SRFI-1 to simplify things just a little bit:
1234567891011121314151617
(usesrfi-1); Use some of the list manipulation methods from SRFI-1 to make our lives easier (define iter-fn(lambda (x)(define compute-value(lambda (prevs)(letrec ((prevs-last(take-rightprevs3))(c(car prevs-last))(b(cadr prevs-last))(a(caddr prevs-last)))(+ a(* 2b)(* 3c)))))(if (< x3)x(letrec ((range(iota(- x2)3))(fold-fn(lambda (xprevs)(append prevs(list (compute-valueprevs)))))(vals(foldfold-fn'(012)range)))(lastvals)))))
For kicks we do a quick translation of the iterative process into Clojure, and don’t tell the Scheme people but there’s something to this “syntactic sugar” stuff. The use of destructuring here makes the function much more concise:
Unfortunately the same can’t be said of the recursive process. Clojure can’t do much to save us here. Be forewarned; the code that follows is intended for mature audiences only. The multiple instances of recursion outside of the loop/recur forms is very likely to induce eye bleeding in any seasoned Clojure programmer:
[1] I’m finally starting to come around when it comes to Leiningen. Language-specific builders still irritate me, but at least this one seems sensible.
[2] We really only need to preserve three values in our list at any given time in order to maintain enough state to define the computation at every point. We could move to something like this in the future if necessary, but doing it now doesn’t do anything to enhance the logic of our solution.
[3] Ruby gets a pass here (but just barely) since it offers both inject and reduce. That said, I expect better from you, Ruby!
In an attempt to make my Ruby code a bit more idiomatic I’ve been spending a bit of time recently with Russ Olsen’s excellent Eloquent Ruby. There are many reasons to love writing Ruby code, not least of which is that Ruby deploys the same terse but expressive power of Perl while employing better overall principles of programming. The effect isn’t universal; on occasion my problems with Ruby look quite a bit like my problems with Perl. Given the overall elegance of the language it seems likely that there’s a “better” (or at least more idiomatic) way to accomplish my goal. And so I turn to Eloquent Ruby.
As an example of this tension consider the following example.
Perl has a well-deserved reputation for efficiently processing text files with regular expressions. We’ll consider an example from another text I’ve been spending a bit of time with: Hofstadter’s seminal Godel, Escher, Bach. A simple implementation of the productions of the MIU system in Perl [1] might look like the following:
1234567891011
# Simple implementation of the productions for the MIU systemopenINFILE,"<",$ARGV[0];while(<INFILE>){chomp;nextif$_=~ /#.*/;print"$1IU\n"if$_=~ /^(.+?)I$/;print"M$1$1\n"if$_=~ /^M(.+)$/;print"$1U$2\n"if$_=~ /^(.*)III(.*)$/;print"$1$2\n"if$_=~ /^(.*)UU(.*)$/;}
Reasonable enough, but there’s a lot of magic going on here. We’re relying on the “magic” variable $_ to access the current line, and to make things worse we have to obtain those lines using the INFILE identifier that only has meaning due to a side effect of the open() call [2]. There’s also those “magic” $1 and $2 variables for accessing capture groups in a regex.
The Ruby version is both slightly shorter and a bit cleaner:
123456789
# Simple implementation of the productions for the MIU systemFile.new(ARGV[0]).readlines.eachdo|line|nextifline=~/#.*/puts"#{$1}IU\n"ifline=~/^(.+)I$/puts"M#{$1}#{$1}\n"ifline=~/^M(.+)$/puts"#{$1}U#{$2}\n"ifline=~/^(.*)III(.*)$/puts"#{$1}#{$2}\n"ifline=~/^(.*)UU(.*)$/end
We’ve made some nice strides here. The use of File.new() allows us to avoid open() and it’s side effects. The use of a code block allows us to remove the global $_ in favor of a scoped variable line.
But we’re still stuck with $1 and $2 for those capture groups.
One can imagine an elegant object-oriented solution based on match objects. Any such implementation would have to accomplish three things:
The match object will be used as the condition of an if/unless expression so nil should be returned if there’s no match
The match object should be bound to a variable name in scope
References to capture groups in the if-clause should use the scoped variable rather than the $1,$2, etc.
But remember, this exercise is only useful if we don’t have to compromise on elegance. If all we’re after is an explicit object-oriented solution we could go with the Python version:
12345678910111213141516171819202122232425
# Simple implementation of the productions for the MIU systemfromsysimportargvimportrere_c=re.compile("#.*")re1=re.compile("^(.+?)I$")re2=re.compile("^M(.+)$")re3=re.compile("^(.*)III(.*)$")re4=re.compile("^(.*)UU(.*)$")forlinein[line.strip()forlineinopen(argv[1])]:ifre_c.match(line):continuem1=re1.match(line)ifm1:printm1.group(1)+"IU"m2=re2.match(line)ifm2:print"M"+m2.group(1)+m2.group(1)m3=re3.match(line)ifm3:printm3.group(1)+"U"+m3.group(2)m4=re4.match(line)ifm4:printm4.group(1)+m4.group(2)
That’s probably not what we want. [3]
After pondering this question for a bit we realize we may not be in such a bad spot. /regex/.match(str) already returns nil if there is no match so our first requirement is satisfied. Assignment is just another expression, so our match object (or nil) will be returned to the if-expression test, helping us with our second goal. And match objects provide access to capture groups using []. So long as the assigned variable is in scope we should have everything we need. A bit of scuffling [4] brings us to the following:
1234567891011
# Implementation of the productions for the MIU system with match objectsFile.new(ARGV[0]).readlines.eachdo|line|nextifline=~/#.*/foo=nilputs"#{foo[1]}IU\n"iffoo=/^(.+)I$/.match(line)puts"M#{foo[1]}#{foo[1]}\n"iffoo=/^M(.+)$/.match(line)puts"#{foo[1]}U#{foo[2]}\n"iffoo=/^(.*)III(.*)$/.match(line)puts"#{foo[1]}#{foo[2]}\n"iffoo=/^(.*)UU(.*)$/.match(line)end
This example is free of any “magic” variables, although we have sacrificed a bit on the clarity front. It’s also worth noting that we could have accomplished something very similar in Perl:
1234567891011
# Implementation of the productions for the MIU system with match objectsopenINFILE,"<",$ARGV[0];while(<INFILE>){chomp;nextif$_=~ /#.*/;print"$foo[0]IU\n"if(@foo=($_=~ /^(.+?)I$/));print"M$foo[0]$foo[0]\n"if(@foo=($_=~ /^M(.+)$/));print"$foo[0]U$foo[1]\n"if(@foo=($_=~ /^(.*)III(.*)$/));print"$foo[0]$foo[1]\n"if(@foo=($_=~ /^(.*)UU(.*)$/));}
This implementation is hardly idiomatic. It’s also quite a bit less clear than our earlier efforts in the language.
Where does this leave us? Do we keep in touch with our Perl roots and live with $1 in order to keep things terse and expressive? Do we sacrifice a bit of clarity and go with an object-oriented approach? Or do we do something else entirely?
Answers to this question (and others like it) are what I’m hoping to get out of Eloquent Ruby.
[1] We’re ignoring nasty things like error handling and complex edge cases in order to keep the conversation focused.
[2] We could use lexical file handles here but that doesn’t really change the underlying point. Even in that case we still have to call open() in order for $fh to be useful.
[3] Python does a lot of things very, very well, but this solution to this problem seems unnecessarily verbose.
[4] The requirement to declare foo in advance with this conditional modifier form was a bit surprising. Shifting to a more conventional if-test-end form removed this requirement, although this isn’t too surprising since you have an explicit assignment in the if test. The Perl version also didn’t require this advance declaration when using the conditional modifier form. An MRI quirk, perhaps?
In a semi-recent post we looked at how we might define actors in JRuby using a combination of Akka and code blocks. That post was very focused on the process of creating actors; we didn’t necessarily consider how to build systems with actors and/or whether Ruby might be helpful in doing so. There are plenty of issues to ponder here, but before we dig in let’s take a step back and talk briefly about some of the characteristics of actors.
Actors implement a shared-nothing model for concurrency. Mutable system state should be contained by one or more actors in the system. This state is not exposed to external entities directly; it can only be accessed or modified via messages sent to the actor. Since the actor is the only player in this drama that can access the state it contains there is no need to lock or synchronize state access. It should be fairly clear that an actor consists of some amount of mutable system state and some logic for handling incoming messages… and not much more.
Okay, sounds great… but how does it work in practice? We’ll consider this question by way of an alogrithm that by now should be pretty familiar: prime number generation using the Sieve of Eratosthenes. We considered this chestnut (or perhaps you prefer “war horse” at this point) in a previous post discussing an implementation of this algorithm in the Go language. Let’s review that implementation and see what else we can do with it.
The support of lightweight goroutines in Go encouraged a pipeline implementation with one goroutine per discovered prime. We can think of the “state” of this system as the total set of discovered primes. Each goroutine knows only about the prime it contains and the channel where candidates should be sent if they “pass” (i.e. do not divide evenly into it’s prime number). Once the goroutine is created it’s state doesn’t change. New state is added by creating a new goroutine for a newly-discovered prime. State is never deleted; once a prime is discovered removing it from consideration is non-sensical. As a consequence state is completely distributed; no entity in the system knows about all discovered primes.
We also note that the algorithm described here isn’t very friendly to parallel decomposition [1]. Candidate values are compared to previously discovered primes one at a time: if the candidate passes the test it’s allowed to move on, otherwise no further evaluation occurs. This technique is referred to as “fail-fast” behaviour; if a value fails a test it doesn’t lead to any wasteful extra work. The concurrent approach is quite different: compare the candidate to all discovered primes at once and return success only if all tests pass. Comparisons are done independently, so even if the “first” [2] comparison fails all other comparisons still execute. We lose our fail-fast behaviour but gain the ability to decompose the entire process into smaller jobs that can execute in parallel. A trade-off in engineering… surprising, I know.
We’ll set out to implement the Sieve of Eratosthenes in Ruby using JRuby and Akka. Our new implementation will have more of a bias towards parallelism; this time we’re okay with throwing away some work. Clearly we’ll need an actor to compare a candidate to one or more discovered primes; that is, after all, why we’re here. We can think of these actors as maintaining the state of our system (just like the goroutines in the Go implementation) so we’ll borrow a term from MVC and call these “model” actors. Concurrently evaluating candidates against these models implies an organizing actor that is aware of all models; it seems natural to call this the “controller” actor. To keep things simple we don’t want to support an unlimited number of models so we create a fixed-size “pool” and distribute system state (i.e. the set of discovered primes) between these models. When a new prime is discovered the controller will be responsible for adding that prime to an existing model.
require'java'require'lib/akka-actor-1.2.jar'require'lib/scala-library.jar'java_import'akka.actor.Actors'java_import'akka.actor.UntypedActor'java_import'akka.actor.UntypedActorFactory'moduleSieve# Basic Enumerable wrapper for a Controller actor... just a convenience thing reallyclassPrimesincludeEnumerabledefinitialize(controller)@controller=controllerenddefeachloopdoyield@controller.sendRequestReply(:next)endendend# Enumerable implementation representing the set of prime number candidates > 10. Use of an# enumerable here allows us to isolate the state associated with candidate selection to this# class, freeing up the model and controller actors to focus on other parts of the computationclassCandidatesincludeEnumerabledefinitialize# Note that this initial value is never actually returned; we're only setting the stage for the# first increment to generate the first candidate@next=9end# Primes must be a number ending in 1, 3, 7 or 9... a bit of reflection will make it clear whydefeachloopdo@next+=(@next%10==3)?4:2yield@nextendendendclassController<UntypedActordefinitialize@models=0.upto(3).mapdo|idx|model=Actors.actorOf{Sieve::Model.new}model.startmodelend# Seed models with a few initial values... just to get things going@seeds=[2,3,5,7]0.upto(3).each{|idx|@models[idx].tell[:add,@seeds[idx]]}@candidates=Candidates.newend# Part of the lifecycle for an Akka actor. When this actor is shut down# we'll want to shut down all the models we're aware of as welldefpostStop@models.each{|m|m.stop}enddefonReceive(msg)casemsgwhen:next# If we still have seeds to return do so up frontseed=@seeds.shiftifseedself.getContext.replySafe(seed)returnend# If we're still here then we need to evaluate candidates against our models. Each# candidate value is fed into the models in parallel. The first value that all models# agree is prime is returned as the valueval=@candidates.finddo|candidate|@models.map{|m|m.sendRequestReplyFuture[:isprime,candidate]}.all?do|f|f.awaitreturnfalseifnotf.result.isDefinedf.result.getendend# Now that we have a prime value we need to update the state of one of our models to# include this new value. For now we just choose a model at random@models[(rand@models.size)].tell[:add,val]# Finally, send a response back to the callerself.getContext.replySafevalendendend# A model represents a fragment of the state of our sieve, specifically some subset# of the primes discovered so far.classModel<UntypedActordefinitialize@primes=[]enddefonReceive(msg)# It's times like this that one really does miss Scala's pattern matching# but case fills in nicely enough(type,data)=msgcasetypewhen:add@primes<<datawhen:isprime# If we haven't been fed any primes yet we can't say much...if@primes.empty?self.getContext.replySafenilreturnend# This model only considers a value prime if it doesn't divide evenly into any# prime it already knows about. Of course we have to make an exception if we're# testing one of the primes we already know aboutresp=@primes.none?do|prime|data!=primeanddata%prime==0endself.getContext.replySaferespelseputs"Unknown type #{type}"endendendend
Ruby offers a number of features which help us out here. As shown in this implementation messages can be as simple as a list with a leading symbol (used to indicate the message “type”) and a payload contained in the remainder of the list. This follows a similar convention found in Erlang, although that language uses tuples rather than lists. Support for destructuring/multiple assignment makes working with these messages quite simple.
Our previous work building actors from code blocks doesn’t apply here due to an implementation detail so we define both the controller and model actors as distinct classes. As it turns out this change isn’t much of a problem; we’re able to implement both classes, a helper Enumerable and a second Enumerable wrapping the controller in less than 140 lines with comments. Full code (including tests) can be found on github.
Spend a little time with JRuby and Akka and it becomes clear that they work well together and form a good basis for building concurrent applications in Ruby.
[1] This is a bit of a loaded statement; you will get some parallelism as multiple candidates move through the “stages” (i.e. goroutines) of the pipeline. That said, this notion of parallelism applies to the system as a whole. The process of determining whether a single candidate is or is not a prime number is still very sequential.
[2] “First” here means nothing more than “whichever test happens to complete and return a value first”
For simplicity and elegance in your programming constructs the list comprehension is hard to beat. [1] A list comprehension can filter an input list, transform it or do both, all in a single expression and with very readable syntax. At it’s best a list comprehension is “beautiful code” distilled: very clear and expressive with no unnecessary noise. You can, of course, make list comprehensions “ugly” but at least you have to try a bit to do so.
List comprehensions have their roots in functional programming and several modern functional languages include support for them. We’ll see comprehensions in Haskell, Clojure and Scala. [2] Python also includes support for comprehensions; in fact the basis for Guido’s argument to remove the map and filter functions from py3k was that expressions using these functions could be easily re-written as comprehensions. We also consider comprehensions in Python, including differences between the Python implementation and those in other languages and what effect those differences may have.
Let’s consider a fairly straightforward problem problem: given a list of (x,y) coordinates in some two-dimensional space provide a list of the coordinates that are within some fixed distance of the origin of (0,0). Our application also requires that results be displayed in polar coordinates, so for extra bonus points we should return our results in that notation. We can solve the problem quite easily in Haskell and Clojure using list comprehensions:
Note that in each of these solutions we’re doing a bit more work than we need to. Our implementation computes the distance from the origin twice, once when filtering values and again when generating the final output of the transformation process. This seems unnecessary; a better option would be to somehow define this value as intermediate state. This state could then be available to both filter and transform expressions. Haskell and Clojure support the introduction of intermediate bindings in their comprehension syntax:
What about Python? It turns out we cannot solve this problem in Python using a single comprehension; the syntax doesn’t allow for the introduction of intermediate state which can be used in either the predicate or transform expression. On the face of it this seems a bit odd; the language encourages the use of comprehensions for filtering and/or transformation while providing a less robust version of that very construct. To some degree this discrepancy reflects differing language goals. Guido’s post on the history of list comprehensions seems to indicate that the motivation for adding these features was pragmatic; the syntax is an elegant way to express most filter and transform operations. Functional languages use list comprehensions as “syntactic sugar” for monadic effects [3] that don’t really have an equivalent in standard Python usage. The syntax may look the same, but if you’re coming from a functional perspective they can feel just a bit off. The same is true for a few other common functional idioms:
Lazy evaluation - List comprehensions in Python are not lazily evaluated. Generator expressions, which look very similar to list comprehensions, are lazily evaluated.
Higher-order functions - Anonymous functions are supported in Python but these functions are famously limited to a single expression. Functions can return functions but for non-trivial functions a named function must be declared and returned.
A couple things should be noted here. First, let us clearly state that Python is not and does not claim to be a functional programming language. While absolutely true, this fact doesn’t change the underlying point. Moving from functional concepts back into Python can be a bit jarring; some things look similar but don’t behave quite like you’d expect.
It’s also worth noting that the inability to solve this problem with list comprehensions in Python doesn’t mean that this problem cannot be solved in idiomatic Python. We wish to return our intermediate state as well as filter results based on it’s value; this dual use allows us to solve the problem with nested comprehensions. The inner comprehension will generate the final representation (including the intermediate state) and the outer comprehension will filter results based on that representation. In Python this looks something like:
This works only because the intermediate state is also returned in the final result. If that state were not explicitly returned (i.e. if it’s values were used as input to a conditional expression which returned, say, a string value describing the distance) this solution would not apply.
We can also solve this problem using generators. Using the state maintained by the generator we can iterate through the list, compute the intermediate state and yield a value only when we’ve satisfied our predicate. A generator-based solution would look something like:
Finally, none of these comments should be construed as a criticism of Python, the design choices that went into the language or the inclusion of list comprehensions generally. The pragmatic case for inclusion of this feature seems very strong. This post is interested only in the interplay between these features and similar features in other languages.
[1] Some languages (perhaps most notably Scala) use the term “for comprehension” or “for expression”, and in some of these languages (Scala again) these constructs are more flexible than a list comprehension. That said, it’s fairly straightforward to make Scala’s for expressions behave like conventional list comprehensions.
[2] A purist might object that Scala is designed to mix features of object-oriented and functional languages, but the bias in favor of functional constructs justifies Scala’s inclusion here.
[3] As an example, note that in Haskell list comprehensions can be replaced with do-notation. See the Haskell wiki for details.
I’m interested in concurrency. I’m also interested in Ruby. There doesn’t seem to be much reason to keep these two guys apart anymore.
This post marks the beginning of an occasional series on the topic of using Ruby to write concurrent code. Ruby doesn’t yet have a big reputation in the world of concurrent and/or parallel applications, but there is some interesting work being done in this space. And since problems in concurrency are notoriously difficult to reason about we could probably do a lot worse than attempt to address those problems in a language designed to make life easier for the developer.
We begin with Akka, an excellent library designed to bring actors, actor coordination and STM to Scala and, to a slightly lesser degree, the JVM generally. Our initial task is simple: we wish to be able to define an Akka actor by providing a message handler as a code block. We’ll start by attempting to implement the actor as a standalone class and then abstract that solution into code which requires the input block and nothing else.
A simple initial implementation looks something like this:
12345678910111213141516171819202122
require'java'require'akka-actor-1.2-RC6.jar'require'scala-library.jar'java_import'akka.actor.UntypedActor'java_import'akka.actor.Actors'# Start with something simple. Implement the actor as a distinct# class and start it up within the Akka runtime.# Look, I've implemented the actor API!classSomeActor<UntypedActordefonReceivemsgputs"Received message: #{msg.to_s}"endend# So we should be fine then, right?ref=Actors.actorOfSomeActor.newref.startref.tell"foo"
[@varese ruby]$ ruby --version
jruby 1.6.2 (ruby-1.8.7-p330) (2011-05-23 e2ea975) (OpenJDK Client VM 1.6.0_22) [linux-i386-java]
[@varese ruby]$ ruby akka_sample1.rb
ArgumentError: Constructor invocation failed: ActorRef for instance of actor [org.jruby.proxy.akka.actor.UntypedActor$Proxy0] is not in scope.
You can not create an instance of an actor explicitly using 'new MyActor'.
You have to use one of the factory methods in the 'Actor' object to create a new actor.
Either use:
'val actor = Actor.actorOf[MyActor]', or
'val actor = Actor.actorOf(new MyActor(..))'
(root) at akka_sample1.rb:20
Well, that didn’t work very well. Apparently the class we defined in JRuby is being exposed to the Java lib as a proxy object and that proxy’s class is unknown to the Akka runtime. No problem; Akka supports a factory model for actor creation, and by using that approach the underlying class of our actor should become a non-issue. With a few simple changes we’re ready to try again:
1234567891011121314151617181920212223242526272829
require'java'require'akka-actor-1.2-RC6.jar'require'scala-library.jar'java_import'akka.actor.UntypedActor'java_import'akka.actor.Actors'# A second attempt. Define our actor in a standlone class again# but this time use an ActorFactory (via closure coercion) to# interact with Akka.# Define our actor in a class again...classSomeActor<UntypedActordefonReceivemsgputs"Received message: #{msg.to_s}"endend# ... and then provide an UntypedActorFactory to instantiate# the actor. The factory interface qualifies as a SAM so# we only need to provide a closure to generate the actor and# let JRuby's closure coercion handle the rest.ref=Actors.actorOfdoSomeActor.newendref.startref.tell"foo"
[@varese ruby]$ ruby akka_sample2.rb
Received message: foo
We now have a working actor, but we still have some work to do; remember, we want to be able to define arbitrary actors by supplying just a code block. We need a few additional pieces to make this work:
A generic actor implementation whose state includes a block or Proc instance. The onReceive method of this actor could then simply call the contained block/Proc, passing the input message as an arg.
An ActorFactory implementation which takes a code block as an arg, stores it in internal state and then uses that block to build an instance of the generic actor described above on demand.
A first cut at this concept might look something like this:
require'java'require'akka-actor-1.2-RC6.jar'require'scala-library.jar'java_import'akka.actor.UntypedActor'java_import'akka.actor.UntypedActorFactory'java_import'akka.actor.Actors'# Shift to working with code blocks via a generic actor and a simple# factory for creating them.classSomeActor<UntypedActor# Look, I've got a constructor... but it'll get ignored!definitialize(proc)@proc=procenddefonReceive(msg)@proc.callmsgendendclassSomeActorFactoryincludeUntypedActorFactorydefinitialize(&b)@proc=benddefcreateSomeActor.new@procendend# Create a factory for an actor that uses the input block to pass incoming messages.factory=SomeActorFactory.newdo|msg|puts"Message recieved: #{msg.to_s}"endref=Actors.actorOffactoryref.startref.tell"foo"
[@varese ruby]$ ruby akka_sample3.rb
ArgumentError: wrong number of arguments for constructor
create at akka_sample3.rb:29
(root) at akka_sample3.rb:37
What went wrong here? UntypedActor is a concrete class with a defined no-arg constructor. That constructor is being called in favor of the one we’ve provided, and as a consequence our block never gets into the mix. There’s almost certainly a cleaner way to solve this using JRuby, but for the moment we can get around the problem (in an admittedly ugly way) by providing a setter on our generic actor class:
require'java'require'akka-actor-1.2-RC6.jar'require'scala-library.jar'java_import'akka.actor.UntypedActor'java_import'akka.actor.UntypedActorFactory'java_import'akka.actor.Actors'# Continue with the generic actor implementation, but shift to using a setter# for passing in the Proc to which we'll delegate message handling.classSomeActor<UntypedActordefproc=(b)@proc=benddefonReceive(msg)@proc.callmsgendendclassSomeActorFactoryincludeUntypedActorFactorydefinitialize(&b)@proc=benddefcreaterv=SomeActor.newrv.proc=@procrvendend# Create a factory for an actor that uses the input block to pass incoming messages.factory=SomeActorFactory.newdo|msg|puts"Message recieved: #{msg.to_s}"endref=Actors.actorOffactoryref.startref.tell"foo"
If you’re interested in this topic note that Nick Sieger has covered similar ground (including the interaction between JRuby and Akka) here. Nick’s article draws on some very good work done by Daniel Ribeiro late last year. The code referenced in Daniel’s article is available on Github. I didn’t come across Daniel’s post until my own was nearly done but there is quite a bit of overlap between his code and mine. That said, I recommend taking a look at both articles, if for no other reason than the fact that both authors are much better at writing Ruby code than I am.
In a previous post we briefly discussed the idea of using idiomatic Clojure to access data contained in a Cassandra instance, including transparent conversion to and from Clojure data. We’ll explore an implementation of this idea, although we won’t address the question of laziness, in part because there are sizable trade-offs to consider. For example, any solution that provides lazy evaluation must do so while also attempting to minimize the number of trips to the underlying data store. This question may be picked up in yet more future work, but for now we’ll continue on. We’ve upgraded to Cassandra 0.8.2 and Clojure 1.2, and we’re using a new data model (see below), but for the most part we’ll try to pick up where we left off.
At the core the Cassandra data model is centered around columns. Each of these columns contains both a name and a value, both of which are represented as binary data. This model is very simple, and while it may appear limiting it is in reality quite flexible. The lack of any pre-defined data types avoids any “impedance mismatch” resulting from structured data being shoehorned into data types that don’t really fit. We’re free to represent column values in any way we see fit; if we can convert it into bytes it’s fair game. Our problem thus devolves into a question of serialization, and suddenly there are many suitors vying for our attention. Among the set of well-known serialization packages we find Google’s Protocol Buffers, Thrift and Avro. And since we’re working in a language with good Java interop we can always have Java’s built-in serialization (or something similar like Kryo) available. Finally we’re always free to roll our own.
Let’s begin by ruling out that last idea. There are already well-tested third-party serialization libraries so unless we believe that all of them suffer from some fatal error it’s difficult to justify the risk and expense of creating something new. We’d also like our approach to have some level of cross-platform support so native Java serialization is excluded (along with Kryo). We also need to be able to encode and decode arbitrary data without defining message types or their payload(s) in advance, a limitation that rules out Protocol Buffers and Thrift. The last man standing is Avro, and fortunately for us he’s a good candidate. Avro is schema-based but the library includes facilities for generating schemas on the fly by inspecting objects via Java reflection. Also included is a notion of storing schemas with data, allowing the original object to be easily reconstructed as needed. The Avro data model includes a rich set of basic types as well as support for “complex” types such as arrays and maps.
We’ll need to implement a Clojure interface into the Avro functionality; this can be as simple as methods to encode and decode data and schemas. At least some implementations of Avro data files use a pre-defined “meta-schema” (consisting of the schema for the embedded data and that data itself) for storing both items. Consumers of these files first decode the meta-schema then use the discovered schema to decode the underlying data. We’ll follow a similar path for our library. We’ll also extend our Cassandra support a bit in order to support the insertion of new columns for a given key and column family.
We wind up with the following for our Avro library:
(ns fencepost.avro); Dependencies; avro 1.5.2; paranamer 2.0 (required by Avro); jackson 1.8.5 (required by Avro)(import '(org.apache.avroSchema)'(org.apache.avro.genericGenericDatumReader)'(org.apache.avro.ioBufferedBinaryEncoderEncoderFactoryDecoderFactory)'(org.apache.avro.reflectReflectDataReflectDatumWriter)'(java.ioByteArrayOutputStreamByteArrayInputStream))(defn get_meta_schema[]; Ideally we'd use a record for this sort of thing but doing so would require setX(); accessors for all fields. Use of a map here is less precise but quite a bit easier; to code."{\"type\": \"map\", \"values\": \"bytes\"}")(defn encode_with_schema[target]"Use Java reflection to generate a schema for the input object and return that schema along with encoded data"(let [targetschema(.getSchema(ReflectData/get)(.getClasstarget))targetwriter(ReflectDatumWriter.targetschema)buffer(ByteArrayOutputStream.)encoder(.binaryEncoder(EncoderFactory.)buffernil)]; Populate the buffer with Avro data for the target. Note that we can't bundle the; whole thing up into a single let expression because of our reliance on side effects.(.writetargetwritertargetencoder)(.flushencoder)(let [targetdata(.toByteArraybuffer)metawriter(ReflectDatumWriter.(Schema/parse(get_meta_schema)))](.resetbuffer)(.writemetawriter{"schema"(.getBytes(.toStringtargetschema))"data"targetdata}encoder)(.flushencoder)(.toByteArraybuffer))))(defn decode_from_schema[indata]"Use the meta_schema to extract data and schema and then extract raw data"(let [metareader(GenericDatumReader.(Schema/parse(get_meta_schema)))buffer(ByteArrayInputStream.indata)decoder(.binaryDecoder(DecoderFactory.)buffernil)middata(.readmetareadernildecoder); The data type returned from the underlying Avro Java code needs a bit; of massaging before we can move forward. Avro decoders return strings; as instances of Utf8 objects so we have to apply "str" to them directly; in order to get back something we can work with.{schema"schema"data"data"}(zipmap (map str (keys middata))(vals middata))schemabytes(byte-array(.capacityschema))databytes(byte-array(.capacitydata))]; Ah, more side effects. The Java Avro implementation makes heavy use of; NIO ByteBuffers, so we're forced to convert them into byte arrays before; continuing. (.getschemaschemabytes)(.getdatadatabytes)(let [targetreader(GenericDatumReader.(Schema/parse(String.schemabytes)))targetdecoder(.binaryDecoder(DecoderFactory.)databytesnil)](.readtargetreaderniltargetdecoder))))
After the improvements described above our Cassandra library now looks like:
(ns fencepost.cassandra); Dependencies; apache-cassandra-thrift-0.8.2.jar; libthrift-0.6.jar; slf4j-api-1.6.1.jar; slf4j-log4j12-1.6.1.jar; log4j-1.2.16.jar(import '(org.apache.cassandra.thriftCassandra$ClientSliceRangeSlicePredicateColumnParentKeyRangeConsistencyLevelColumnPathColumn)'(org.apache.thrift.transportTFramedTransportTSocket)'(org.apache.thrift.protocolTBinaryProtocol)'(java.nioByteBuffer)); A quick note on usage. The Cassandra Java API returns "this" for many of the setX() methods on the setters of core objects in the Thrift; API. Our usage here consistently employs the doto syntax rather than relying on these return values. This is largely a matter of ; convention, but in this case it appears to make the code a bit clearer to read. Your mileage may vary.(defn connect[hostportkeyspace]"Connect to a Cassandra instance on the specified host and port. Set things up to use the specified key space."(let [transport(TFramedTransport.(TSocket.hostport))protocol(TBinaryProtocol.transport)client(Cassandra$Client.protocol)](.opentransport)(.set_keyspaceclientkeyspace)client))(defn get_range_slices[clientcfstartend]"Get a list of KeySlices for every key in the range beginning with start and ending with end"(let [slice_range(doto (SliceRange.)(.setStart(byte-array0))(.setFinish(byte-array0))(.setReversedfalse)(.setCount100))slice_predicate(doto (SlicePredicate.)(.setSlice_rangeslice_range))column_parent(ColumnParent.cf)key_range(doto (KeyRange.)(.setStart_key(.getBytesstart))(.setEnd_key(.getBytesend)))](.get_range_slicesclientcolumn_parentslice_predicatekey_rangeConsistencyLevel/ONE)))(defn range_slices_keys[slices]"Retrieve the set of keys in a list of KeySlices"(map #(String.(.getKey%))slices))(defn range_slices_columns[sliceskey]"Given a list of KeySlices retrieve a map of the columns associated with the specified key"(let [match(first (filter #(= key (String.(.getKey%)))slices))](cond (nil? match)nil(true? true)(let [urcols(.getColumnsmatch)cols(map #(.getColumn%)urcols)](zipmap (map #(keyword (String.(.getName%)))cols)(map #(.getValue%)cols))))))(defn insert[clientkey cfcolnamecolval_bytes]"Insert the specified column into the specified column family. At present we don't support super columns"(let [key_bytes(.getByteskey)key_buffer(ByteBuffer/allocate(alength key_bytes))column_parent(doto (ColumnParent.)(.setColumn_familycf))colname_bytes(.getBytescolname)column(doto (Column.)(.setNamecolname_bytes)(.setValuecolval_bytes)(.setTimestamp(System/currentTimeMillis)))]; Populate the built ByteBuffer with the contents of the input key(.putkey_bufferkey_bytes)(.flipkey_buffer)(.insertclientkey_buffercolumn_parentcolumnConsistencyLevel/ONE)))
Our core example will be built around a collection of employee records. We want to create a report on these employees using attributes defined in various well-known columns. We’d like to access the values in these columns as simple data types (booleans, integers, perhaps even an array or a map) but we’d like to do so through a uniform interface. We don’t want to access certain columns in certain ways, as if we “knew” that a specific column contained data of a specific type. Any such approach is by definition brittle if the underlying data model should shift in flight (as data models are known to do).
After finishing up with our Clojure libraries we implement a simple app for populating our Cassandra instance with randomly-generated employee information:
; Populate data for a set of random users to a Cassandra instance.;; Users consist of the following set of data:; - a username [String]; - a user ID [integer]; - a flag indicating whether the user is "active" [boolean]; - a list of location IDs for each user [list of integer];; User records are keyed by username rather than user IDs, mainly because at the moment ; we only support strings for key values. The Cassandra API exposes keys as byte arrays ; so we could extend our Cassandra support to include other datatypes.(use'[fencepost.avro])(use'[fencepost.cassandra])(import '(org.apache.commons.lang3RandomStringUtils)'(java.utilRandom)); Utility function to combine our Avro lib with our Cassandra lib(defn add_user[clientusernameuseridactivelocationids](let [userid_data(encode_with_schemauserid)active_data(encode_with_schemaactive)locationids_data(encode_with_schemalocationids)](insertclientusername"employee""userid"userid_data)(insertclientusername"employee""active"active_data)(insertclientusername"employee""locationids"locationids_data))); Generate a list of random usernames(let [client(connect"localhost"9160"employees")](dotimes [n10](let [username(RandomStringUtils/randomAlphanumeric16)random(Random.)userid(.nextIntrandom1000)active(.nextBooleanrandom)locationids(into [](repeatedly10#(.nextIntrandom100)))](add_userclientusernameuseridactivelocationids)(println (format"Added user %s: [%s %s %s]"usernameuseridactivelocationids)))))
; Retrieve information from the Cassandra database about one of our employees(use'[fencepost.avro])(use'[fencepost.cassandra])(defn evaluate_user[slicesusername]"Gather information for the specified user and display a minimal report about them"; Note that the code below says nothing about types. We specify the column names we; wish to access but whatever Cassandra + Avro supplies for the value of that column; is what we get.(let [user_data(range_slices_columnsslicesusername)userid(decode_from_schema(user_data:userid))active(decode_from_schema(user_data:active))locationids(decode_from_schema(user_data:locationids))](println (format"Username: %s"username))(println (format"Userid: %s"userid))(println (if (> userid0)"Userid is greater than zero""Userid is not greater than zero"))(println (format"Active: %s"active))(println (if active"User is active""User is not active")); Every user should have at least one location ID.;; Well, they would if we were able to successfully handle an Avro record.;(assert (> (count locationids) 0))))(let [client(connect"localhost"9160"employees")key_slices(get_range_slicesclient"employee""!""~")keys (range_slices_keyskey_slices)](println (format"Found %d users"(count keys)))(dotimes [n(count keys)](evaluate_userkey_slices(nth keys n))))
[varese clojure]$ ~/local/bin/clojure get_employee_data.clj
Found 10 users
Username: 5gGh9anHwFINpr5t
Userid: 459
Userid is greater than zero
Active: true
User is active
...
And in order to verify our assumptions about simple cross-platform support we create a Ruby version of something very much like our reporting application:
require'rubygems'require'avro'require'cassandra'defevaluate_avro_databytes# Define the meta-schemameta_schema=Avro::Schema.parse("{\"type\": \"map\", \"values\": \"bytes\"}")# Read the meta source and extract the contained data and schemameta_datum_reader=Avro::IO::DatumReader.new(meta_schema)meta_val=meta_datum_reader.read(Avro::IO::BinaryDecoder.new(StringIO.new(bytes)))# Build a new reader which can handle the indicated schemaschema=Avro::Schema.parse(meta_val["schema"])datum_reader=Avro::IO::DatumReader.new(schema)val=datum_reader.read(Avro::IO::BinaryDecoder.new(StringIO.new(meta_val["data"])))endclient=Cassandra.new('employees','127.0.0.1:9160')client.get_range(:employee,{:start_key=>"!",:finish_key=>"~"}).eachdo|k,v|userid=evaluate_avro_datav["userid"]active=evaluate_avro_datav["active"]locationids=evaluate_avro_datav["locationids"]puts"Username: #{k}, user ID: #{userid}, active: #{active}"puts"User ID #{(userid>0)?"is":"is not"} greater than zero"puts"User #{active?"is":"is not"} active"# Ruby's much more flexible notion of truthiness makes the tests above somewhat less# compelling. For extra validation we add the following"Oops, it's not a number"unlessuserid.is_a?Fixnumend
[varese 1.8]$ ruby get_employee_data.rb
Username: 5gGh9anHwFINpr5t, user ID: 459, active: true
User ID is greater than zero
User is active
Username: 76v8iEJcc79Huj9L, user ID: 469, active: false
User ID is greater than zero
User is not active
...
This code meets our basic requirement, but as always there were a few stumbling blocks along the way. Avro includes strings as a primitive type, but unfortunately the Java API (which we leverage for our Clojure code) returns string instances as a Utf8 type. We can get a java.lang.String from these objects, but unfortunately we need another toString() method call that (logically) is completely unnecessary. We also don’t fully support complex types. The Avro code maps the Clojure classes representing arrays and maps onto a “record” type that includes the various fields exposed via getters. Supporting these types requires the ability to reconstruct the underlying object based on these fields, and doing so reliably is beyond the scope of this work. Finally, we were forced to use Ruby 1.8.x for the Ruby examples since the Avro gem apparently doesn’t yet support 1.9.
Soon after I began working with Cassandra it became clear to me that if you were in the market for a platform for creating applications that interact with this database you could do a lot worse than Clojure. The lack of a query language [1] suggests that filtering and slicing lists of keys and columns might be a fairly common activity for apps powered by Cassandra. And while many languages support the map/filter/reduce paradigm Clojure’s use of sequences throughout the core suggest a natural means to integrate this data into the rest of your application.
Cassandra itself provides an API that uses the Thrift protocol for manipulating data. We’ll use this interface to implement a simple proof-of-concept application that might serve as a testbed for manipulating data managed by Cassandra in idiomatic Clojure. Note that the Clojure ecosystem already includes several open-source projects that connect Clojure to Cassandra: these include clj-cassandra and clj-hector, the latter leveraging the Hector Java client to do it’s work. In order to keep things simple we choose to avoid any of these third-party libraries; it’s not as if the Thrift interface imposes a heavy burden on us. Let’s see how far we can get with what’s already in the packaging.
That sounds great… so what exactly are we trying to do? Beginning with the database generated during our previous work with Cassandra we should be able to access sets of keys within a keyspace and a set of columns for any specific key. These structures should be available for manipulation in idiomatic Clojure as sequences. Ideally these sequences would be at least somewhat lazy and transparently support multiple datatypes. [2]
Using the Thrift interface requires working with a fair number of Java objects representing return types and/or arguments to the various exposed functions. My Clojure isn’t yet solid enough to hash out Java interop code without flailing a bit so we kick things off with a Scala implementation. This approach allows us to simplify the interop problem without sacrificing the functional approach, all within a language that is by now fairly familiar.
The Scala code includes a fair number of intermediate objects but is otherwise fairly clean:
packageorg.fencepost.cassandraimportscala.collection.JavaConversionsimportorg.apache.thrift.protocol.TBinaryProtocolimportorg.apache.thrift.transport._importorg.apache.cassandra.service._importorg.apache.cassandra.thrift._objectThriftCassandraClient{defconnect(host:String,port:Int,keyspace:String):Cassandra.Client={valtransport=newTFramedTransport(newTSocket(host,port))valprotocol=newTBinaryProtocol(transport)valclient=newCassandra.Client(protocol)transport.open()clientset_keyspacekeyspaceclient}// Execute a range slice query against the specified Cassandra instance. Method returns// an object suitable for later interrogation by range_slices_keys() or range_slices_columns()defget_range_slices(client:Cassandra.Client,cf:String,start:String,end:String):Iterable[KeySlice]={valsliceRange=newSliceRange()sliceRangesetStartnewArray[Byte](0)sliceRangesetFinishnewArray[Byte](0)sliceRangesetReversedfalsesliceRangesetCount100valslicePredicate=newSlicePredicate()slicePredicatesetSlice_rangesliceRangevalcolumnParent=newColumnParent(cf)valkeyRange=newKeyRange()keyRangesetStart_key(startgetBytes)keyRangesetEnd_key(endgetBytes)valjavakeys=client.get_range_slices(columnParent,slicePredicate,keyRange,ConsistencyLevel.ONE)// Return from Thrift Java client is List<KeySlice> so we have to explicitly convert it hereJavaConversionsasScalaIterablejavakeys}// Return an Iterable for all keys in an input query state objectdefrange_slices_keys(slices:Iterable[KeySlice])=slicesmap{c=>newString(c.getKey)}// Return an Option containing column information for the specified key in the input query// state object. If the key isn't found None is returned, otherwise the Option contains a// map of column names to column values.defrange_slices_columns(slices:Iterable[KeySlice],key:String):Option[Map[String,String]]={slicesfind{c=>newString(c.getKey())==key}match{caseNone=>NonecaseSome(keyval)=>valurcols=JavaConversionsasScalaIterable(keyvalgetColumns)valcols:Seq[Column]=(urcolsmap(_getColumn)).toSeqSome(Map(colsmap{c=>(newString(c.getName()))->(newString(c.getValue()))}:_*))}}defmain(args:Array[String]):Unit={valclient=connect("localhost",9160,"twitter")valslices=get_range_slices(client,"authors","!","~")valfivekeys=range_slices_keys(slices)take5println("fivekeys: "+fivekeys)for(key<-fivekeys){range_slices_columns(slices,key)match{caseNone=>caseSome(cols)=>println("Key "+key+": name => "+(colsgetOrElse("name",""))+", following => "+(colsgetOrElse("following","")))}}}}
Translating this code into Clojure is more straightforward than expected:
(import '(org.apache.thrift.transportTFramedTransportTSocket)'(org.apache.thrift.protocolTBinaryProtocol)'(org.apache.cassandra.thriftCassandra$ClientSliceRangeSlicePredicateColumnParentKeyRangeConsistencyLevel))(defn connect[hostportkeyspace]"Connect to a Cassandra instance on the specified host and port. Set things up to use the specified key space."(let [transport(TFramedTransport.(TSocket.hostport))protocol(TBinaryProtocol.transport)client(Cassandra$Client.protocol)](.opentransport)(.set_keyspaceclientkeyspace)client))(defn get_range_slices[clientcfstartend]"Simple front end into the get_range_slices function exposed via Thrift"(let [slice_range(doto (SliceRange.)(.setStart(byte-array0))(.setFinish(byte-array0))(.setReversedfalse)(.setCount100))slice_predicate(doto (SlicePredicate.)(.setSlice_rangeslice_range))column_parent(ColumnParent.cf)key_range(doto (KeyRange.)(.setStart_key(.getBytesstart))(.setEnd_key(.getBytesend)))](.get_range_slicesclientcolumn_parentslice_predicatekey_rangeConsistencyLevel/ONE)))(defn range_slices_keys[slices]"Retrieve the set of keys in a get_range_slices result"(map #(String.(.getKey%))slices))(defn range_slices_columns[sliceskey]"Retrieve a map of the columns associated with the specified key in a get_range_slices result"(let [match(first (filter #(= key (String.(.getKey%)))slices))](cond (nil? match)nil(true? true)(let [urcols(.getColumnsmatch)cols(map #(.getColumn%)urcols)](zipmap (map #(keyword (String.(.getName%)))cols)(map #(String.(.getValue%))cols))))))(let [client(connect"localhost"9160"twitter")key_slices(get_range_slicesclient"authors""!""~")five_keys(take 5(range_slices_keyskey_slices))](print five_keys)(let [formatfn(fn [key](let [cols(range_slices_columnskey_sliceskey)](format"Key %s: name => %s, following => %s\n"key (cols:name)(cols:following))))](print (reduce str (map formatfnfive_keys)))))
These results seem fairly promising, although we’re nowhere near done. This code assumes that all column names and values are strings, a perfectly ridiculous assumption. We also don’t offer any support for nested data types, although in fairness this was a failing of our earlier work as well. Finally we haven’t built in much support for lazy evaluation; we lazily convert column names to Clojure keywords but that’s about it. But fear not, gentle reader; we’ll revisit some or all of these points in future work.
Cassandra implements a data model built on ColumnFamilies which differs from both SQL and document-oriented databases such as CouchDB. We want to spend a little time with this data model, get to know it a bit, so we setup a simple but quasi-realistic problem space: given a database of tweets, tweet authors and tweet followers attempt to answer a sequence of questions about that data. The questions should increase in complexity and each new question should explore a different facet of the data model.
The examples below are implemented in Python with a little help from a few excellent libraries:
Twitter tools when we have to interact with Twitter via their REST API
We’ll begin by looking at how we might query an Cassandra instance populated with data in order to find the answers to the questions in our problem space. We’ll close by briefly discussing how to get the data into Cassandra.
We should be all set; bring on the questions!
FOR EACH AUTHOR: WHAT LANGUAGE DO THEY WRITE THEIR TWEETS IN AND HOW MANY FOLLOWERS DO THEY HAVE?
The organizing structure of the Cassandra data model is the column family defined within a keyspace. The keyspace is exactly what it sounds like: a collection of keys, each identifying a distinct entity in your data model. Each column family represents a logical grouping of data about these keys. This data is represented by one or more columns, which are really not much more than a tuple containing a name, value and timestamp. A column family can contain one or more columns for a key in the keyspace, and as it turns out you have a great deal of flexibility here; columns in a column family don’t have to be defined in advance and do not have to be the same for all keys.
We begin by imagining a column family named “authors” in a keyspace defined by the user’s Twitter handle or screen name. Assume that for each of these keys the “authors” column family contains a set of columns, one for each property returned by the “user/show” resource found in the Twitter REST API. Let’s further assume that included in this data are fields named “lang” and “followers_count” and that these fields correspond to exactly the data we’re looking for. We can satisfy our requirement by using a range query to identify all keys that fall within a specified range. In our case we want to include all alphanumeric screen names so we query across the range of printable ASCII characters. The Pycassa API makes this very easy [1]:
Okay, so we’ve got the idea of a column family; we can use them to define a set of information for keys in our keyspace. Clearly this is a useful organizing principle, but in some cases we need a hierarchy that goes a bit deeper. The set of tweets written by an author illustrates just such a case: tweets are written by a specific author, but each tweet has data of it’s own (the actual content of the tweet, a timestamp indicating when it’s written, perhaps some geo-location info, etc.). How can we represent this additional level of hierarchy?
We could define a new keyspace consisting of some combination of the author’s screen name and the tweet ID but this doesn’t seem terribly efficient; identifying all tweets written by an author is now unnecessarily complicated. Fortunately Cassandra provides a super column family which meets our needs exactly. The value of each column in a super column family is itself a collection of regular columns.
Let’s apply this structure to the problem at hand. Assume that we also have a super column family named “tweets” within our keyspace. For each key we define one or more super columns, one for each tweet written by the author identified by the key. The value of any given super column is a collection of columns, one for each field contained in the results we get when we search for tweets using Twitter’s search API. Once again we utilize a range query to access the relevant keys:
1234567891011
importpycassacassandra=pycassa.connect("twitter")tweets_cf=pycassa.ColumnFamily(cassandra,"tweets")# Key in return value from "tweets" super column family is the# tweet ID, value is a map of per-tweet data. We're only interested# in the number of tweets so we only need to compute the size# of the returned hash.for(k,values)intweets_cf.get_range('!','~'):print"Authors: %s, tweets written: %d"%(k,len(values))
HOW MANY TWEET AUTHORS ARE ALSO FOLLOWERS OF @SPYCED?
We’re now attempting to replicate functionality that looks very much like a join in a relational database. Somehow we have to relate one type of resource (the set of followers for a Twitter user) to the set of authors defined in our “authors” column family. The Twitter REST API exposes the set of user IDs that follow a given user, so one approach to this problem might be to obtain these IDs and, for each of them, query the “authors” table to see if we have an author with a matching ID. As for the user to search for… well, Jonathan Ellis (@spyced) seemed like the obvious choice.
Newer versions of Cassandra provide support for indexed slices on a column family. The database maintains an index for a named column within a column family, enabling very efficient lookups for rows in which the column matches a simple query. Query terms can test for equality or whether a column value is greater than or less than an input value [2]. Multiple search clauses can be combined within a single query, but in our case we’re interested in strict equality only. Our solution to this problem looks something like the following:
12345678910111213141516171819202122
fromtwitter.apiimportTwitterimportpycassafrompycassa.indeximport*cassandra=pycassa.connect("twitter")authors_cf=pycassa.ColumnFamily(cassandra,"authors")tweets_cf=pycassa.ColumnFamily(cassandra,"tweets")twitter=Twitter()# Iterate through the set of IDs returned by the Twitter API and execute# an index search against each ID. The Pycassa API will return a generator# for each query so we make use of the for expression to determine when# we should increment the total count.count=0forauthoridintwitter.followers.ids(id="spyced"):author_expr=create_index_expression('id_str',str(authorid))author_clause=create_index_clause([author_expr])for(authorkey,authorprops)inauthors_cf.get_indexed_slices(author_clause):printauthorkeycount+=1printcount
Some spot checking of these results using the Twitter Web interface seems to confirm the results.
POPULATING THE DATA
So far we’ve talked about accessing data from a Cassandra instance that has already been populated with the data we need. But how do we get it in there in the first place? The answer to this question is a two-step process; first we create the relevant structures within Cassandra and then we use our Python tools to gather and add the actual data.
My preferred tool for managing my Cassandra instance is the pycassaShell that comes with Pycassa. This tool makes it’s easy to create the column families and indexes we’ve been working with:
fromtwitter.apiimportTwitterimportpycassafromitertoolsimportifilterfalse# Query to use when finding tweets.searchquery="#cassandra"# Borrowed from the itertools docsdefunique_everseen(iterable,key=None):"List unique elements, preserving order. Remember all elements ever seen."seen=set()seen_add=seen.addifkeyisNone:forelementinifilterfalse(seen.__contains__,iterable):seen_add(element)yieldelementelse:forelementiniterable:k=key(element)ifknotinseen:seen_add(k)yieldelementdefpopulate():cassandra=pycassa.connect("twitter")authors_cf=pycassa.ColumnFamily(cassandra,"authors")tweets_cf=pycassa.ColumnFamily(cassandra,"tweets")# The Twitter API returns Unicode vals for all string results. In addition# Pycassa appears to complain when we give it a string encoded in something# other than UTF-8 or a non-string value. To get around this we perform# an intelligent string conversion; if we get a Unicode type return the# UTF-8 encoding of that string, otherwise return the standard string# representation.defsmart_str(val):ifisinstance(val,unicode):returnval.encode('utf-8')else:returnstr(val)search=Twitter(domain="search.twitter.com")twitter=Twitter()search_results=search.search(q=searchquery,rpp=100)tweets=search_results["results"]printtweets[0]fortweetintweets:tweet_str=dict([(k,smart_str(v))for(k,v)intweet.iteritems()])tweets_cf.insert(tweet["from_user"],{tweet["id_str"]:tweet_str})print"Found %d tweets"%len(tweets)author_names=list(unique_everseen([t["from_user"]fortintweets]))print"Found %d distinct authors"%len(author_names)# Convert everything into strings; in Cassandra name and values of a column# are apparently normally converted into stringsforauthor_infoin(twitter.users.show(id=name)fornameinauthor_names):author_name=author_info['screen_name']author_info_str=dict([(k,smart_str(v))for(k,v)inauthor_info.iteritems()])authors_cf.insert(author_name,author_info_str)print"Added data for author %s"%author_nameif__name__=="__main__":populate()
[1] For sake of brevity this discussion omits a few details, including the configuration of a partitioner and tokens in order to use range queries effectively and how keys are ordered. Consult the project documentation and wiki for additional details.
[2] Here “greater than” and “less than” are defined in terms of the type provided at index creation time.
A short while ago Alex Miller (aka puredanger) wrote up a blog entry detailing how to use Clojure to interact with the new fork/join concurrency framework to be included with Java7 (and already available from Doug Lea’s site). The meat of Alex’s solution is a Java wrapper for Clojure’s IFn type which integrates nicely with the fork/join framework. At the time there was some discussion about whether a Scala implementation would require an equivalent amount of scaffolding. It turns out that supporting this functionality in Scala requires a bit more effort than one might expect, although the problems one faces are quite different than what Alex worked through in Clojure. We review the steps to a working Scala implementation and see what we can learn from them.
Before we move on, a few notes about fork/join itself. A full overview is outside the scope of this post, however details can be found in the jsr166y section of Doug Lea’s site. A solid (although slightly out-of-date) review can be found in a developerWorks article by Brian Goetz.
We begin as simply as possible; a straight port of Alex’s code (in this case an implementation of Fibonacci using fork/join) to Scala. We create a named function fib that will be recursively called by our RecursiveTasks. We also use an implicit conversion to map no-arg anonymous functions onto RecursiveTask instances using the technique for supporting SAM interfaces discussed in an earlier post. The resulting code looks pretty good:
Unfortunately an attempt to build with sbt gives an unexpected error:
[info] == test-compile ==
[info] Source analysis: 2 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling test sources...
[error] .../src/test/scala/org/fencepost/forkjoin/ForkJoinTestFailOne.scala:23: method compute cannot be accessed in jsr166y.RecursiveTask[Int]
[error] (f2 compute) + (f1 join)
[error] ^
[error] one error found
A bit of investigation reveals the problem; RecursiveTask.compute() is a protected abstract method in the fork/join framework. The compute method in the class returned by our implicit conversion consists of nothing more than a call to fib.apply() but there’s no way to know this when fib is defined. It follows that an attempt to access RecursiveTask.compute() from within the body of fib is (correctly) understood as an attempt to access a protected method.
How does Alex get around this problem in Clojure? He doesn’t; the problem is actually resolved via the Java wrapper. Note that in his Java code compute() has been “elevated” to be a public method. He’s thus able to call this method without issue from his “fib-task” function. Without this modification his code fails with similar errors [1].
Strangely, there’s no obvious way to resolve this issue within Scala itself. The language provides protected and private keywords to restrict access to certain methods but there is no public keyword; methods are assumed to be public if not annotated otherwise. As a consequence there’s no obvious way to “raise” the access level of a method in a subclass. We work around this constraint by way of an egregious hack; we define a new subclass of RecursiveTask containing a surrogate method that provides public access to compute. We then return this subclass from our implicit conversion.
packageorg.fencepost.forkjoinimportjsr166y._importorg.scalatest.SuiteclassForkJoinTestFailTwoextendsSuite{classWrapper[T](somefunc:()=>T)extendsRecursiveTask[T]{overridedefcompute=somefunc.apply// RecursiveTask.compute() is protected and there's no obvious way to// elevate it's access level within a Scala class definition. The type// system (very correctly) doesn't allow a random function access to// protected methods of a class even when that method is invoked by// a method within the class itself. Defining a new method with standard// access resolves the issue.defsurrogate=compute}implicitdeffn2wrapper(fn:()=>Int):Wrapper[Int]={returnnewWrapper[Int](fn)}deftestFibonacci()={deffib(n:Int):Int={if(n<=1)returnnvalf1={()=>fib(n-1)}f1forkvalf2={()=>fib(n-2)}(f2compute)+(f1join)}valpool=newForkJoinPool()assert((poolinvoke{()=>fib(1)})==1)assert((poolinvoke{()=>fib(2)})==1)assert((poolinvoke{()=>fib(3)})==2)assert((poolinvoke{()=>fib(4)})==3)assert((poolinvoke{()=>fib(5)})==5)assert((poolinvoke{()=>fib(6)})==8)assert((poolinvoke{()=>fib(7)})==13)}}
This code now compiles, but when we try to actually execute the test with sbt we see a curious error; the test starts up and then simply hangs:
[info] == test-compile ==
[info]
[info] == copy-test-resources ==
[info] == copy-test-resources ==
[info]
[info] == test-start ==
[info] == test-start ==
[info]
[info] == org.fencepost.forkjoin.ForkJoinTestFailTwo ==
[info] Test Starting: testFibonacci
*wait for a long, long time*
Well, this isn’t good. What’s going on here?
The key point to understand is that the implicit conversion is called every time an existing value needs to be converted to a different type. As written the calls to f1.fork() and f1.join() both require conversion, and the implicit conversion function in play here will return a new instance on each invocation. This means that even though the input to the conversion function is identical in both cases (f1) the object that invokes fork() will be a different object than that which invokes join(). For fork/join tasks this matters; join() is called on an object that was never forked(). Voila, an unterminated wait.
We can resolve this issue one of two ways:
Caching previous values returned by the implicit conversion function and re-using them when appropriate
Explicitly specifying the type of f1 and f2 as our wrapper subclass. This has the effect of forcing implicit conversion at the time of assignment rather than when we call fork() and join()
I chose the first approach, mainly because it seemed a bit cooler. Oh, and it’s a bit cleaner logically as well. The final result runs without issue:
packageorg.fencepost.forkjoinimportscala.collection.mutable.LinkedListimportjsr166y._importorg.scalatest.SuiteclassForkJoinTestextendsSuite{valcache=newLinkedList[(()=>Int,Wrapper[Int])]()classWrapper[T](somefunc:()=>T)extendsRecursiveTask[T]{overridedefcompute=somefunc.apply// RecursiveTask.compute() is protected and there's no obvious way to// elevate it's access level within a Scala class definition. The type// system (very correctly) doesn't allow a random function access to// protected methods of a class even when that method is invoked by// a method within the class itself. Defining a new method with standard// access resolves the issue.defsurrogate=compute}// Rationale for a caching implicit conversion can be found below. We use// a list of tuples rather than a map since hashing buys us nothing here;// there is always one (and exactly one) object with a given object identity.implicitdeffn2wrapper(fn:()=>Int):Wrapper[Int]={valcacheval=cachefind(t=>(t!=null)&&(t._1eqfn))cachevalmatch{caseSome((_,cval))=>cvalcaseNone=>{valnewcval=newWrapper[Int](fn)cache.next=cache.next:+(fn,newcval)newcval}}}deftestFibonacci()={deffib(n:Int):Int={if(n<=1)returnn// The approach used here only works if our implicit conversion always// returns the same Wrapper instance for a given function. For a given// RecursiveTask instance join() must be called on the same instance that// originally called fork(). It follows that an implicit conversion such// as://// implicit def fn2wrapper(fn:()=>Int):Wrapper[Int] = {// return new Wrapper(fn)// }//// won't be adequate; fork() and join() will be called on two separate// objects (since implicit conversion is performed on each method call)// and join() will never return.//// Alternately we can force conversion when f1 and f2 are assigned values// using something like the following;//// val f1:Wrapper[Int] = { ()=> fib(n - 1) }// val f2:Wrapper[Int] = { ()=> fib(n - 2) }//// Clearly in this case f1.fork() and f1.join() will be called on the// same instance of a RecursiveTask subclass.valf1={()=>fib(n-1)}f1forkvalf2={()=>fib(n-2)}(f2surrogate)+(f1join)}valpool=newForkJoinPool()assert((poolinvoke{()=>fib(1)})==1)assert((poolinvoke{()=>fib(2)})==1)assert((poolinvoke{()=>fib(3)})==2)assert((poolinvoke{()=>fib(4)})==3)assert((poolinvoke{()=>fib(5)})==5)assert((poolinvoke{()=>fib(6)})==8)assert((poolinvoke{()=>fib(7)})==13)}}
After changing IFnTask.compute() to protected (with no other changes):
bartok ~/git/puredanger_forkjoin $ clojure process.clj
(Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at jline.ConsoleRunner.main(Unknown Source)
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: No matching field found: compute for class revelytix.federator.process.IFnTask (process.clj:0)
at clojure.lang.Compiler.eval(Compiler.java:5440)
at clojure.lang.Compiler.load(Compiler.java:5857)
at clojure.lang.Compiler.loadFile(Compiler.java:5820)
at clojure.main$load_script.invoke(main.clj:221)
at clojure.main$script_opt.invoke(main.clj:273)
Okay, it’s not actually alchemy. But it is pretty cool nonetheless.
A brief refresher on terminology: a single abstract method (or SAM) interface or abstract method contains exactly one method that concrete implementations must define. Java includes a number of interfaces that satisfy this property, including Runnable, Callable and Comparable.
The basic idea at play here is that a closure or anonymous function (either will do here) can be used in place of a SAM implementation if the parameters and return type of the closure match up to those of the method. That assumes your language cares about such things, of course; duck typing makes this less of an issue (as we’ll see).
JRuby automatically performs this operation via closure conversion (scroll down to the bottom of the page). Stuart Sierra has recently published a macro for doing something similar in Clojure. Even Java is considering including this feature in an eventual closure implementation (see Brian Goetz’s writeup for details). Why should Scala miss out on all the fun?
Let’s take a look at some code to make this discussion more tangible. A simple example of closure conversion in JRuby looks something like the following:
1234567891011121314151617181920
require'java'queue=java.util.concurrent.LinkedBlockingQueue.new()exec=java.util.concurrent.ThreadPoolExecutor.new(2,2,1,java.util.concurrent.TimeUnit::SECONDS,queue)exec.prestartAllCoreThreads# Use a closure that does some kind of computation rather than just returning a value. # As always last expression in the closure is the return value. future=exec.submitdoarr=[]1.upto(5){|val|arr.push(2*val)}arrendexec.shutdownrv=future.getifrv.length==5&&(rv.include?2)&&(rv.include?6)print"Good\n"elseprint"Not good\n"end
This is where duck typing helps us out; the only real requirement on our closure is that we actually return something (in order to clearly distinguish between Runnable and Callable). Our objective is to implement a Scala unit test that does something similar. Any such approach will be built around Scala’s support for implicit conversion of types, but in this case a bit of care and feeding is required to line up the parameters and return types of the closure with that of the contents of the SAM interface. The basic approach works as follows:
The implicit conversion accepts a function with the same parameter list and return value as the lone method in the SAM interface
The conversion then returns a new concrete instance of the SAM interface. The implementation of the method doesn’t need to be anything other than invoking apply() on the input function
The resulting Scala implementation (implemented as a ScalaTest class) is:
packageorg.fencepostimportscala.collection.mutable._importorg.scalatest.Suiteimportjava.util.concurrent._classSAMTestextendsSuite{implicitdeffn2runnable(fn:()=>Unit):Runnable={newRunnable{defrun=fn.apply}}implicitdeffn2callable[A](fn:()=>A):Callable[A]={newCallable[A]{defcall=fn.apply}}// We can now use a closure as a replacement for a Runnable instancedeftestClosureAsRunnable()={varresult=ListBuffer(1,2,3)valt=newThread({()=>result+=4// Addition of new item to the list buffer returns the item added so// leaving it as the last expression would violate our ()=>Unit type.// A simple println solves this.println("Done")})t.startt.joinassert(result.size==4)assert(result.contains(4))}// Verify that parameterized types are supported as well while demonstrating// integration with java.util.concurrent. We deliberately avoid references// to scala.concurrent in order to avoid confusing the issue.deftestClosureAsParameterizedSAM()={valexec=newThreadPoolExecutor(2,2,1,TimeUnit.SECONDS,newLinkedBlockingQueue())exec.prestartAllCoreThreadsvalfuture=exec.submit({()=>List(1,2,3).map(_*2)})valresult=future.getexec.shutdownassert(result.size==3)assert(result.contains(2))assert(result.contains(4))assert(result.contains(6))}}