StreamBase and Scala
First Published Friday, 28th August 2009 11:16 am from StreamBase Systems : Matthew Fowles
The opinions expressed by this blogger and those providing comments are theirs alone, this does not reflect the opinion of Automated Trader or any employee thereof. Automated Trader is not responsible for the accuracy of any of the information supplied by this article.
(Matt
class="at-xid-6a00d8341c63ff53ef011278d9cb1528a4 "
src="http://streambase.typepad.com/.a/6a00d8341c63ff53ef011278d9cb1528a4-120pi"
style="margin: 0px 5px 5px 0px; float: left;" title="Matt Fowles"
/>Fowles is a
Senior Software Engineer for StreamBase, whose interests include
programming language, compiler, and virtual
machine
design.)
As
the ponytail (and the tiny bio above) imply, my posts will focus
on technical details such as language semantics, general
programming issues, compiler design, or anything else I feel is
nifty.
Anyone who follows programming languages
will probably have noticed href="http://http://www.scala-lang.org/">Scala href="http://java.dzone.com/articles/scala-long-term-replacement">cropping
href="http://uncountablymany.blogspot.com/2009/07/pimp-my-lock-case-study-in-scala-api.html">up
href="http://blog.xebia.com/2009/07/04/real-world-functional-programming-in-scala-comparing-java-clojure-and-scala/">a
href="http://www.eishay.com/2009/07/microbenchmarking-scala-vs-java.html">lot
href="http://yousry.de/blog/?p=166">lately.
Scala is an interesting language on several fronts, but the one
that sparked my interest was its strong interoperability with
Java. Often times academics forget that to be useful a
language needs to href="http://streambase.typepad.com/streambase_stream_process/2009/03/language-design-lessons-2-lists-and-custom-functions.html">integrate
with existing code and libraries. But Scala
has clearly been designed with heavy focus on industrial
usability as well as academic power. When I read about
href="http://www.ibm.com/developerworks/opensource/library/os-eclipse-scala/index.html?ca=dgr-twtrAndroid-Scaladth-o&S_TACT=105AGY46&S_CMP=TWDW">Scala
on Android, I guessed it wouldn't be hard to
integrate with StreamBase. It turns out I was mostly
right (and we will definitely be fixing the small pain points I
hit). What follows is an guide to embedding Scala into
a StreamBase application.
First we have to set
up StreamBase Studio to support Scala
development.
- Install
href="http://streambase.com/developers-downloads.htm">StreamBase
6.4
- Add the
href="http://www.scala-lang.org/scala-eclipse-plugin">Scala
2.7.5 Eclipse Plugin to StreamBase Studio (Help
-> Software Updates -> "Add
Site...")
- Manually download the
href="http://download.eclipse.org/tools/ajdt/34/update/ajdt_1.6.4_for_eclipse_3.4.zip">AJDT
Weaving Plugin 1.6.4
- Unzip the
weaving plugin into your studio install
- make sure the contents of the zip's
"plugins" directory goes into Studio's
"plugins" directory (I accidentally got that
wrong the first time)
- make sure the contents of the zip's
- Restart Studio
- Enable
"JDT Weaving" (Preferences -> JDT
Weaving -> "Click to ENABLE")
- Restart Studio
- Verify that
"JDT Weaving" is enabled (Preferences ->
JDT Weaving)
Right there you see
95% of the trouble I had. Definitely easy to do, once
you know what to do, but figuring out that JDT Weaving thing
(especially the specific version needed) took some
searching.
Now we can get started using Scala
in StreamBase. The impatient can
- href="http://streambase.com/BROKEN">
href="http://streambase.typepad.com/files/streambase-scala-sample-pass-all.zip">Download
the sample
- Import it (File
-> Import ... -> Existing Projects into Workspace
-> "Select archive file").
or the dedicated can create a new project
from scratch
- Create a new
StreamBase project (Package Explorer -> Right Click
-> New -> "StreamBase
Project")
- Add the Scala Nature to
it (Right Click on the Project -> Scala ->
"Add Scala Nature")
Yeah that is really it. Turns out that
Eclipse does some clever and powerful things by having projects
include natures like this. We now have a project that
includes the Java, Scala, and StreamBase natures and they all
play nicely together.
Let's create a
basic Java Operator so we have something with which to compare
(File -> New -> "StreamBase Java
Operator"). This auto-generated operator is
good enough for your purposes. I ended up deleting all
the comments and variables and what not, so I could strip it down
as small as possible for the web.
name="code">package com.streambase.sample;
/>import com.streambase.sb.StreamBaseException;
/>import com.streambase.sb.Tuple;
import
com.streambase.sb.operator.Operator;
import
com.streambase.sb.operator.Parameterizable;
import
com.streambase.sb.operator.TypecheckException;
/>public class SampleJavaOperator extends Operator implements
Parameterizable {
public SampleJavaOperator() {
/>
setPortHints(1, 1);
/>
setDisplayName(this.getClass().getSimpleName());
/>
setShortDisplayName(this.getClass().getSimpleName());
/> }
/> public void typecheck()
throws TypecheckException {
/>
requireInputPortCount(1);
/>
setOutputSchema(0,
getInputSchema(0));
}
public
void processTuple(int inputPort, Tuple tuple) throws
StreamBaseException {
/>
sendOutput(0, tuple);
/> }
}
/>
Let's try a naive translation
into Scala.
package
com.streambase.sample;
import
com.streambase.sb.StreamBaseException;
import
com.streambase.sb.Tuple;
import
com.streambase.sb.operator.Operator;
import
com.streambase.sb.operator.Parameterizable;
import
com.streambase.sb.operator.TypecheckException;
/>class SampleScalaOperator extends Operator with
Parameterizable {
def SampleScalaOperator = {
/>
setPortHints(1, 1);
/>
setDisplayName(this.getClass().getSimpleName());
/>
setShortDisplayName(this.getClass().getSimpleName());
/> }
/> override def typecheck =
{
requireInputPortCount(1);
/>
setOutputSchema(0,
getInputSchema(0));
}
override def processTuple(port:Int, tuple:Tuple) = {
/>
sendOutput(0, tuple);
/> }
/>}
Wow that wasn't too hard, of
course it didn't save us any code either. But
it does give us enough to test and figure out if it works at
all. I guess we need to make a simple sbapp to
exercise these Operators we have just
written.
href="http://streambase.typepad.com/.a/6a00d8341c63ff53ef011570fa352e970c-pi">
alt="SimpleScalaApp" border="0"
class="at-xid-6a00d8341c63ff53ef011570fa352e970c "
src="http://streambase.typepad.com/.a/6a00d8341c63ff53ef011570fa352e970c-800wi"
title="SimpleScalaApp" /> Yeah, that was
easy. Alright, you caught me, I cheated.
Studio does not (yet) automatically find operators written in
Scala, so I had to hand edit the sbapp file. But
everything works after that, this app can be run and everything
works as you would expect.
Now for a slightly
less trivial example. I hear that Scala has a really
nice concurrency primitive called an href="http://java.dzone.com/articles/scala-threadless-concurrent">actor.
Well, StreamBase has a multi-threaded architecture so
let's see if the two can be made to play nicely
together. Once again here is a href="http://streambase.com/BROKEN"> href="http://streambase.typepad.com/files/streambase-scala-sample-actors.zip">full
project that you can import as above.
name="code">class SampleScalaOperator extends Operator with
Parameterizable {
private object coordinator extends Actor {
/>
def act = {
/>
loop {
/>
receive {
/>
case t: Tuple =>
sendOutput(0, t)
case None =>
exit()
/>
}
}
/>
}
/> }
/> def SampleScalaOperator =
{
setPortHints(1, 1)
/>
val name =
getClass.getSimpleName
/>
setDisplayName(name)
/>
setShortDisplayName(name)
/> }
/> override def typecheck =
{
requireInputPortCount(1)
/>
setOutputSchema(0,
getInputSchema(0))
}
override def processTuple(port:Int, t:Tuple) = {
/>
actor {
/>
val times =
t.getInt("times")
/>
val delays =
t.getLong("delays")
/>
for (i <- 0 until times)
{
Thread.sleep(delays)
/>
coordinator ! t
/>
}
/>
}
/> }
/> override def init = {
coordinator.start }
override def shutdown = { coordinator ! None }
/>}
Don't forget to edit the input
schema for your app to include an int field named
"times" and a long field named
"delays". Now we can run the app,
enqueue tuples, and watch the delayed tuples arriving out of
order. We could have simply called
"sendOutput" from the actor in
"processTuple", but I wanted to include actors
sending messages to other actors instead of just being glorified
threads.
All in all, really solid work on the
part of the href="http://www.scala-lang.org/node/292">Scala
developers. I am definitely looking forward
to trying Scala on some larger projects. Hopefully, I
will be able share more as I learn it.




