StreamBase and Scala

First Published Friday, 28th August 2009 11:16 am from StreamBase Systems : Matthew Fowles

The opinions expressed by this blogger and those providing comments are theirs alone, this does not reflect the opinion of Automated Trader or any employee thereof. Automated Trader is not responsible for the accuracy of any of the information supplied by this article.


(Matt

Matt<br /> Fowles

class="at-xid-6a00d8341c63ff53ef011278d9cb1528a4 "

src="http://streambase.typepad.com/.a/6a00d8341c63ff53ef011278d9cb1528a4-120pi"

style="margin: 0px 5px 5px 0px; float: left;" title="Matt Fowles"

/>Fowles is a

Senior Software Engineer for StreamBase, whose interests include

programming language, compiler, and virtual

machine

design.)

As

the ponytail (and the tiny bio above) imply, my posts will focus

on technical details such as language semantics, general

programming issues, compiler design, or anything else I feel is

nifty.

Anyone who follows programming languages

will probably have noticed href="http://http://www.scala-lang.org/">Scala href="http://java.dzone.com/articles/scala-long-term-replacement">cropping

href="http://uncountablymany.blogspot.com/2009/07/pimp-my-lock-case-study-in-scala-api.html">up

href="http://blog.xebia.com/2009/07/04/real-world-functional-programming-in-scala-comparing-java-clojure-and-scala/">a

href="http://www.eishay.com/2009/07/microbenchmarking-scala-vs-java.html">lot

href="http://yousry.de/blog/?p=166">lately.

Scala is an interesting language on several fronts, but the one

that sparked my interest was its strong interoperability with

Java. Often times academics forget that to be useful a

language needs to href="http://streambase.typepad.com/streambase_stream_process/2009/03/language-design-lessons-2-lists-and-custom-functions.html">integrate

with existing code and libraries. But Scala

has clearly been designed with heavy focus on industrial

usability as well as academic power. When I read about

href="http://www.ibm.com/developerworks/opensource/library/os-eclipse-scala/index.html?ca=dgr-twtrAndroid-Scaladth-o&S_TACT=105AGY46&S_CMP=TWDW">Scala

on Android, I guessed it wouldn't be hard to

integrate with StreamBase. It turns out I was mostly

right (and we will definitely be fixing the small pain points I

hit). What follows is an guide to embedding Scala into

a StreamBase application.

First we have to set

up StreamBase Studio to support Scala

development.

  1. Install href="http://streambase.com/developers-downloads.htm">StreamBase

    6.4

  2. Add the href="http://www.scala-lang.org/scala-eclipse-plugin">Scala

    2.7.5 Eclipse Plugin to StreamBase Studio (Help

    -> Software Updates -> "Add

    Site...")

  3. Manually download the

    href="http://download.eclipse.org/tools/ajdt/34/update/ajdt_1.6.4_for_eclipse_3.4.zip">AJDT

    Weaving Plugin 1.6.4

  4. Unzip the

    weaving plugin into your studio install

    • make sure the contents of the zip's

      "plugins" directory goes into Studio's

      "plugins" directory (I accidentally got that

      wrong the first time)

  5. Restart Studio
  6. Enable

    "JDT Weaving" (Preferences -> JDT

    Weaving -> "Click to ENABLE")

  7. Restart Studio
  8. Verify that

    "JDT Weaving" is enabled (Preferences ->

    JDT Weaving)

Right there you see

95% of the trouble I had. Definitely easy to do, once

you know what to do, but figuring out that JDT Weaving thing

(especially the specific version needed) took some

searching.

Now we can get started using Scala

in StreamBase. The impatient can

  1. href="http://streambase.com/BROKEN"> href="http://streambase.typepad.com/files/streambase-scala-sample-pass-all.zip">Download

    the sample

  2. Import it (File

    -> Import ... -> Existing Projects into Workspace

    -> "Select archive file").

or the dedicated can create a new project

from scratch

  1. Create a new

    StreamBase project (Package Explorer -> Right Click

    -> New -> "StreamBase

    Project")

  2. Add the Scala Nature to

    it (Right Click on the Project -> Scala ->

    "Add Scala Nature")

Yeah that is really it. Turns out that

Eclipse does some clever and powerful things by having projects

include natures like this. We now have a project that

includes the Java, Scala, and StreamBase natures and they all

play nicely together.

Let's create a

basic Java Operator so we have something with which to compare

(File -> New -> "StreamBase Java

Operator"). This auto-generated operator is

good enough for your purposes. I ended up deleting all

the comments and variables and what not, so I could strip it down

as small as possible for the web.

name="code">package com.streambase.sample;

/>import com.streambase.sb.StreamBaseException;

/>import com.streambase.sb.Tuple;

import

com.streambase.sb.operator.Operator;

import

com.streambase.sb.operator.Parameterizable;

import

com.streambase.sb.operator.TypecheckException;

/>public class SampleJavaOperator extends Operator implements

Parameterizable {

public SampleJavaOperator() {

/>

setPortHints(1, 1);

/>

setDisplayName(this.getClass().getSimpleName());

/>

setShortDisplayName(this.getClass().getSimpleName());

/> }

/> public void typecheck()

throws TypecheckException {

/>

requireInputPortCount(1);

/>

setOutputSchema(0,

getInputSchema(0));

}

public

void processTuple(int inputPort, Tuple tuple) throws

StreamBaseException {

/>

sendOutput(0, tuple);

/> }

}

/>

Let's try a naive translation

into Scala.

package
com.streambase.sample;
import
com.streambase.sb.StreamBaseException;
import
com.streambase.sb.Tuple;
import
com.streambase.sb.operator.Operator;
import
com.streambase.sb.operator.Parameterizable;
import
com.streambase.sb.operator.TypecheckException;

/>class SampleScalaOperator extends Operator with
Parameterizable {

def SampleScalaOperator = {
/>
setPortHints(1, 1);
/>

setDisplayName(this.getClass().getSimpleName());
/>

setShortDisplayName(this.getClass().getSimpleName());
/> }

/> override def typecheck =
{

requireInputPortCount(1);
/>
setOutputSchema(0,
getInputSchema(0));

}


override def processTuple(port:Int, tuple:Tuple) = {
/>
sendOutput(0, tuple);
/> }
/>}

Wow that wasn't too hard, of

course it didn't save us any code either. But

it does give us enough to test and figure out if it works at

all. I guess we need to make a simple sbapp to

exercise these Operators we have just

written.

href="http://streambase.typepad.com/.a/6a00d8341c63ff53ef011570fa352e970c-pi"> alt="SimpleScalaApp" border="0"

class="at-xid-6a00d8341c63ff53ef011570fa352e970c "

src="http://streambase.typepad.com/.a/6a00d8341c63ff53ef011570fa352e970c-800wi"

title="SimpleScalaApp" /> Yeah, that was

easy. Alright, you caught me, I cheated.

Studio does not (yet) automatically find operators written in

Scala, so I had to hand edit the sbapp file. But

everything works after that, this app can be run and everything

works as you would expect.

Now for a slightly

less trivial example. I hear that Scala has a really

nice concurrency primitive called an href="http://java.dzone.com/articles/scala-threadless-concurrent">actor.

Well, StreamBase has a multi-threaded architecture so

let's see if the two can be made to play nicely

together. Once again here is a href="http://streambase.com/BROKEN"> href="http://streambase.typepad.com/files/streambase-scala-sample-actors.zip">full

project that you can import as above.

name="code">class SampleScalaOperator extends Operator with

Parameterizable {

private object coordinator extends Actor {

/>

def act = {

/>

loop {

/>

receive {

/>

case t: Tuple =>

sendOutput(0, t)

case None =>

exit()

/>

}

}

/>

}

/> }

/> def SampleScalaOperator =

{

setPortHints(1, 1)

/>

val name =

getClass.getSimpleName

/>

setDisplayName(name)

/>

setShortDisplayName(name)

/> }

/> override def typecheck =

{

requireInputPortCount(1)

/>

setOutputSchema(0,

getInputSchema(0))

}

override def processTuple(port:Int, t:Tuple) = {

/>

actor {

/>

val times =

t.getInt("times")

/>

val delays =

t.getLong("delays")

/>

for (i <- 0 until times)

{

Thread.sleep(delays)

/>

coordinator ! t

/>

}

/>

}

/> }

/> override def init = {

coordinator.start }

override def shutdown = { coordinator ! None }

/>}

Don't forget to edit the input

schema for your app to include an int field named

"times" and a long field named

"delays". Now we can run the app,

enqueue tuples, and watch the delayed tuples arriving out of

order. We could have simply called

"sendOutput" from the actor in

"processTuple", but I wanted to include actors

sending messages to other actors instead of just being glorified

threads.

All in all, really solid work on the

part of the href="http://www.scala-lang.org/node/292">Scala

developers. I am definitely looking forward

to trying Scala on some larger projects. Hopefully, I

will be able share more as I learn it.

  • Copyright © Automated Trader Ltd 2013 - The Gateway to Algorithmic and Automated Trading

click here to return to the top of the page