Introduction


Streametry lets you store and analyze metrics in real-time. Push events with metrics as soon as they occur and then process them using flexible queries. All queries are continuous so you get new results as soon as something changes.

Data is organized into "applications" which roughly resemble databases within a server. Each application can have one or more "collections" where you can store similar items. Each item in a collection is inserted and retrieved using a unique key.

Every time you push an update history of metric changes is saved automatically. You can then analyze this historic data and combine with real-time updates.

Getting Started


If using self-hosted server run: ./bin/streametry

To learn the basics try the Tutorial or navigate your browser to:
http://localhost:8181

and follow links to PlayGround to see live examples and practice running queries.

Quick Start

From the command line:

  1. Push some data to collection "sensors" for sensor named "sen":

     curl http://localhost:8181/met/sensors/sen -XPOST -d '{temp: 20}'
    
  2. Get data back:

     curl http://localhost:8181/met/sensors/sen
    

    should return {"temp": 20}

  3. Run a simple query using CoffeeScript syntax:

     curl http://localhost:8181/met -XPOST -d 'stream("sensors")
     .filter( (sensor) -> sensor.temp > 0 )'
    

To learn more try the full Tutorial.

Embed

To stream data to a webpage use the JavaScript API. See the embedding example: web/embed.html or web/embed-chart.html on the server.

Configure

To configure the port for HTTP api edit the config file ./conf/conf.json under section web.

Logging

Log files are located in the logs/ sub directory and logging level can be set in the config file.

Tutorial


The following tutorial will guide you through storing and querying data via the HTTP API. You will need to have curl available to run the example code and a running Streametry server. Replace host with the hostname:port of where the server is running.

We will use temperature sensors as an example. Imagine we have two sensors called A and B that publish temperature readings once in a while. The JSON form of their data is:

{temp: 20, city: "NY"}

1. Store initial data

Initially the sensors publish their current temperature reading and their location. Run these commands to store the initial data:

curl http://host/met/sensors/A -X POST -d '{temp: 20, city: "NY"}'
curl http://host/met/sensors/B -X POST -d '{temp: 25, city: "SF"}'

2. Lookup

If we want to see what the current state of a sensor is we run:

curl http://host/met/sensors/A

which should return:

{"temp": 20, "city": "NY"}

3. Push some updates

As the sensors read more data they can push more updates. The following commands will update the temp field but will leave the city field intact:

curl http://host/met/sensors/A -X PUT -d '{temp: 21}'
curl http://host/met/sensors/B -X PUT -d '{temp: 26}'

4. Lookup updated data

curl http://host/met/sensors/A

should return:

{"temp": 21, "city": "NY"}

5. Run a query

Let's say now we wanted to find all sensors that have temperature above a certain threshold. We could run a simple query using the Coffeescript syntax:

curl http://host/met -X POST -d 'stream("sensors")
    .filter( (sensor) -> sensor.temp > 25 )'

should return:

{"B": {"temp": 26, "city": "SF"}}

because currently only sensor B has temperature above 25.

Don't close the query yet! Now while the query is still running open another terminal window and update the temperature for sensor A to be 27. What you will notice is that the query will return additional results as soon as you do that.

6. Run an aggregate query

Now we would like to see what the average, min and max temperature is currently. Run the following query:

curl http://host/met -X POST -d 'stream("sensors")
    .filter( (sensor) -> sensor.temp > 25 )
    .stats("temp")'

will give you something like this:

{"stats": {"count": 2, "min": 26.0, "max": 27.0, "avg": 26.5}}

7. Run a historical query

While we were pushing all that data Streametry was automatically saving each change in temperature. Assuming the sensors have been publishing temperature for a while we might want to have a look at what the temperature changes have been in the past hour:

curl http://host/met -X POST -d 'history("sensors", "temp")
    .filter( Time.past(1, "hour") )'

should return all the reading with timestamps in the following form:

{"B": {"temp": 26.0, "t": 1403176288126}}

Having all the readings and timestamps is convenient for plotting them on a chart.

8. Run an aggregate query on historical data

Finally we would like to ask questions about past temperature readings such as what was the average in the past day:

curl http://host/met -X POST -d 'history("sensors", "temp")
    .filter( Time.past(1, "day") )
    .avg("temp")'

Historical queries are live as well so as new data comes in the average will be updated.

Conclusion

These steps should give you an initial idea of how you can publish and query your own data. Have a look at the documentation for more types of queries you can run and how you can publish query results to a web browser.

The url in these examples consisted of four main parts http:// host / met / sensors / A:

Position Name Description
1 host The address of the server
2 met Name of an application chosen by the user (meteo)
3 sensors Name of a collection
4 A Key in the collection. Name of the sensor in this example.

Querying

Queries consist of multiple steps and data flows through all of them in sequence. The general structure of the query is:

stream -> filters -> transformations -> aggregations -> result

however everything after stream is optional.

Stream Queries

To start streaming JSON objects and their changes type:

stream("collection")  

Afterwards the query can be followed by one or both of:

.filter( (o) -> o.prop == 'value' ) # filter matching objects
.map( (o) -> {ab: o.a + o.b} ) # transform to another object

where filter and map functions work the same way as they would on Javascript arrays (Documentation). Arbitrary Javascript or Coffeescript function can be supplied to filter, map and the rest of query functions.

In this documentation we generally use the arrow -> syntax for functions however you can always use the JavaScript syntax:

filter( function(sensor) { return sensor.temp > 50; }  )

The query can be then followed by an aggregate function:

.sum("prop"); # or max, min, avg, count
.stats("prop"); #gives: max, min, avg, count, sum
.aggregateBy("group by prop", Math.max, "prop")
.hll("prop") # Count distinct values (estimated with HyperLogLog)
.topK("prop", 10) # Top 10 values for property
.reduce( (a, b) -> c ) # combine two objects
.reduceBy(Num.avg, "prop") # reduce by property

See Function Reference for other functions.

Finally, aggregations can be followed by one or more filter/map/flatMap steps.

Historical Queries

Every time you push an update history of changes is saved. You can then analyze this historic data by starting the query with:

history("collection", "prop") 

which will return past values of some property in the collection. These values can come from all documents in that collection.

The time period can be limited using:

.filter( Time.past(7, 'days') )

Historical queries can end with aggregates just like stream queries and they have additional aggregators:

.rollup(duration, "time unit")
# Example: rollup(5, 'min') # rollup to 5 min intervals

Historical queries are also live queries so as new data keeps coming in the results will be updated.

Limits

stream and history step can be followed by an optional limit:

.limit(10)

to reduce the number of items returned.

Output Throttling

To receive events less frequently you can specify the rate at which events are sent. Anywhere in the query add:

.rate( secondsBetweenEvents )

This is particulary useful in browser or mobile context where frequent updates downgrade UI performance or bandwidth is limited.

Passing arguments

You can pass arguments to queries as a Json object:

.args( {"myArg": 20} )

and then use those arguments in queries:

.filter( (sensor) -> sensor.temp > myArg )

Accessing keys

filter and map functions can access keys of objects using the second argument:

.filter( (sensor, key) -> key == 'sensor1' )
.map( (sensor, key) -> {id: key, temp: sensor.temp} )    

Nested fields

Nested fields of JSON objects can be accessed just like in JavaScript and can be used in all functions:

sensor.location.city == "NY"

Query Examples

Continuing the sensors examples:

View sensors with maximum temperature in each city:

stream("sensors")
 .filter( (sensor) -> sensor.country == "US" )
 .aggregateBy("city", Math.max, "temp")

Converting temperature to Fahrenheit:

stream("sensors")
 .filter( (sensor) -> sensor.country == "US" )
 .map( (s) -> {tempF: (s.temp*9)/5+32})
 .stats("tempF")     

View statistics (min/max/avg/count) for each day in the past 7 days:

history("sensors", "temp")
 .filter( Time.past(7, "days") )
 .rollup(1, "day")

For more Examples check the dropdown in PlayGround's Query section.

HTTP API

Url format

The url consists of four main parts:

http(s):// host / app / collection / key

Part Name Description
1 host The address of the server (host:port)
2 app Name of an application chosen by the user
3 collection Name of a collection chosen by the user
4 key Key in the collection. Values are accessed via keys

Store data

curl http://host/met/sensors/s1 -X POST -d '{temp: 20, city: "NY"}'

Lookup data

Data lookup is performed via the Http GET method:

curl http://host/met/sensors/s1

Update data

PUT method will update individual fields:

curl http://host/met/sensors/s1 -X PUT -d '{temp: 21}'

Continuos updates

You can keep the http connection open for sending multiple updates:

curl http://host/met/sensors/a -X PUT -T -

and then add the updates one by one in each new line:

{temp: 20}
{temp: 22}   

Delete

To delete a single value from a collection run:

curl http://host/met/sensors/s1 -X DELETE 

And to delete the whole collection run:

curl http://host/met/sensors -X DELETE 

Deleting a collection also delets history for that collection.

Query

curl http://host/met -X POST -d 'stream("sensors")
    .filter( (sensor) -> sensor.temp > 0 )'

Historical Query

curl http://host/met -X POST -d 'history("sensors", "temp")
     .filter( Time.past(1, "hour") )'

Bulk inserts

Storing multiple items at the same time is faster than storing them one by one. Post data to the collection's url with all keys and values in one JSON object:

curl http://host/met/sensors -X POST -d '{s1: {temp: 20}, 
                                          s2: {temp: 25} }'

Bulk updates work the same way:

curl http://host/met/sensors -X PUT -d '{s1: {temp: 20}, 
                                         s2: {temp: 25} }'

JavaScript API

To store and query data from a webpage include the Streametry JavaScript library which comes with the server and is located in web/js/ directory:

<script src="http://host/web/js/streametry.min.js"></script>

Then connect to the Streametry server using:

var streametry = new Streametry(appName, 'http://host:port'); 

For a working example see embed.html on the server or the following JS Fiddle.

Query

Queries have the same general stream-to form as in other APIs:

stream(collection) -> [filter, transform, aggregate] -> to( callback )

In the JavaScript API this is:

var query = streametry.stream( collection )
                  .filter( ).map( ).aggregateBy( )
               .to( callback );

For example:

var query = streametry.stream("sensors")
            .filter( function (sensor) { return sensor.temp > 20; } )
            .to( function (sensors) {
                    // do something with sensors result object
                    console.log(sensors);
             });

Historical Queries

Historical queries have the form:

var query = streametry.history( collection, field ) ... .to( callback )

Closing Queries

To stop receiving data simply call:

query.close();

Store

To store a value in a collection use set:

 streametry.set(collection, key, value [, callback ]);

For example:

streametry.set("sensors", "s1", {"temp": 20});

Lookup

To get a value from a collection use get:

 streametry.get(collection, key, callback );

For example:

streametry.get("sensors", "s1", function(sensor) {
    console.log("temperature is " + sensor.temp);
});    

Disconnect

To disconnect from the server call:

streametry.close();

Java API

Java Library

Include the Streametry Client library in your class path:

./lib/streametry-client.jar

The library is located in the lib folder of the server and requires Java 1.6+. See also: Javadoc

Connect

To connect to a streametry server add:

Streametry sm = Streametry.client.connect("myApp", "host:8181");

Set/Get data

The Streametry interface provides a number of methods for setting and getting data:

.get 
.set
.setAll 
.update
.updateAll
.delete
.size

For example:

// set a value for 'sensor1' in collection 'sensors'
sm.set("sensors", "sensor1", new Json("temp", 30));

Refer to the Javadoc documentation of these methods to see the descriptions.

Queries

The main query api is as follows:

DataStream stream = sm.stream("sensors")
    .query("filter( (sensor) -> sensor.temp > 10 )")
    .to( Handler );

Where the Handler will be called asynchronously with results as they come in. It is also possible to process results synchronously:

DataStream stream = sm.stream("sensors")
    .query("filter( (sensor) -> sensor.temp > 10 )")

for(Json result: stream) 
    System.out.println("new result: " + result);

Historical Queries

Historical queries start with history:

DataStream stream = sm.history("sensors", "temp")

Close and Disconnect

Data streams and the connection have a .close() method.

Example

// Connect 
Streametry sm = Streametry.client.connect("myApp", "host:8181");

// and then call methods on the Streametry interface:

 // push some data       
 sm.set("sensors", "sensor1", new Json("temp", 25));

 // start streaming data
 DataStream stream = sm.stream("sensors")
   .query("filter( (sensor) -> sensor.temp > 10 )")

 for(Json result: stream) // process results synchronously
   System.out.println("new result: " + result);

 // or process results asynchronously:
 stream.to( handler );

Monitoring

Streametry publishes metrics about itself and can be monitored by subscribing to system data streams. These data streams can be used just like any other stream - they can be filtered/aggregated and pushed to a browser.

The following system data streams are supported:

sys.health

Publishes cpu, memory, disk usage and other metrics of Streametry

stream("sys.health")

It also possible to get the history of health metrics:

history("sys.health", "diskUsage")

sys.queries

Currently running queries.

sys.cluster

Shows status of all nodes in the cluster.

sys.collections

All collections and the number of objects in each.

sys.conf

Current configuration that was read from the config file.

sys.logs

The logs stream lets you view the Streametry log in real-time without logging into the remote machine. As soon as messages are logged they are published to this stream. Afterwards you can filter based on severity or message content.

Function Reference

filter

Filters objects matching a condition. For example, filter by city and temperature:

.filter( (sensor) -> sensor.city == 'NY' and sensor.temp > 50 ) 

or:

.filter( (sensor) -> sensor.city == 'NY' and (sensor.temp < 50 or sensor.temp > 100) ) 

map

Transforms each object in the stream to a different object. For example, converting temperature from Celsius to Fahrenheit:

.map( (sensor) -> {tempFahrenheit: (sensor.temp * 9) / 5 + 32 } )

flatMap

Transforms object to a different object and produces a stream of nested objects:

.flatMap( (sensor) -> sensor.readings )

For example if we have a sensor with multiple readings in nested objects:

{"readings": {"temp": {"val": 20},  "humidity": {"val": 50}} }

then the above flatMap will produce two json objects:

{"temp":         {"val": 20} }
{"humidity":     {"val": 50} } 

count

Count number of objects returned:

.count()

min

Compute minimum value for a property:

.min( property )

max

Compute maximum value for a property:

.max( property )

sum

Compute a sum of all property values:

.sum( property )

avg

Compute an average of all property values:

.avg( property )

stats

Returns min/max/count/sum/avg together in one object:

 .stats( property )

statsEx

Same as stats but also returns sample standard deviation:

 .statsEx( property )

reduce

Reduce combines a collections of JSON objects into one. The only parameter is an associative reduce function in the form:

.reduce( (a, b) -> c )

It takes two objects and produces a third object c. Object c will be passed to the reduce function again with the next object as an argument. The following examples computes a result object where the temperatures of all sensors are summed up:

.reduce( (a, b) -> {temp: a.temp + b.temp} )

reduceBy

Compute an aggregate value for a property:

.reduceBy(Function, property)

where Function is one of: Math.max, Math.min, Num.sum, Num.avg, Num.stats or a user defined reduce function for two property values. The function needs to be associative. For example, to compute a product of all property values:

.reduceBy( (a, b) -> a * b, "property")

aggregateBy

Groups objects by a property and computes an aggregate based on another property:

.aggregateBy("group by prop", Function, "prop")

where Function is the same as in reduceBy.

hyperLogLog

Count distinct values (estimated) for a property:

.hll("city")

topK

Computes top K most common values of a property:

.topK("city", 10)

percentile

Estimates a percentile for a property. Percentile is in the range 0-100:

.percentile("temp", 50)

rollup

Rollup splits historical data into specified periods and computes statistics for each period. To compute statistics for every day run:

.rollup(1, "day") 

Other available time periods are: "second", "minute", "hour"