Introduction
Streametry lets you store and analyze metrics in real-time. Push events with metrics as soon as they occur and then process them using flexible queries. All queries are continuous so you get new results as soon as something changes.
Data is organized into "applications" which roughly resemble databases within a server. Each application can have one or more "collections" where you can store similar items. Each item in a collection is inserted and retrieved using a unique key.
Every time you push an update history of metric changes is saved automatically. You can then analyze this historic data and combine with real-time updates.
Getting Started
If using self-hosted server run: ./bin/streametry
To learn the basics try the Tutorial or navigate your browser to:
http://localhost:8181
and follow links to PlayGround to see live examples and practice running queries.
Quick Start
From the command line:
Push some data to collection "sensors" for sensor named "sen":
curl http://localhost:8181/met/sensors/sen -XPOST -d '{temp: 20}'
Get data back:
curl http://localhost:8181/met/sensors/sen
should return
{"temp": 20}
Run a simple query using CoffeeScript syntax:
curl http://localhost:8181/met -XPOST -d 'stream("sensors") .filter( (sensor) -> sensor.temp > 0 )'
To learn more try the full Tutorial.
Embed
To stream data to a webpage use the JavaScript API. See the embedding example: web/embed.html or web/embed-chart.html on the server.
Configure
To configure the port for HTTP api edit the config file ./conf/conf.json
under section web
.
Logging
Log files are located in the logs/
sub directory and logging level can be set in the config file.
Tutorial
The following tutorial will guide you through storing and querying data via the HTTP API. You will need to have curl
available to run the example code and a running Streametry server. Replace host
with the hostname:port of where the server is running.
We will use temperature sensors as an example. Imagine we have two sensors called A
and B
that publish temperature readings once in a while. The JSON form of their data is:
{temp: 20, city: "NY"}
1. Store initial data
Initially the sensors publish their current temperature reading and their location. Run these commands to store the initial data:
curl http://host/met/sensors/A -X POST -d '{temp: 20, city: "NY"}'
curl http://host/met/sensors/B -X POST -d '{temp: 25, city: "SF"}'
2. Lookup
If we want to see what the current state of a sensor is we run:
curl http://host/met/sensors/A
which should return:
{"temp": 20, "city": "NY"}
3. Push some updates
As the sensors read more data they can push more updates.
The following commands will update the temp
field but will leave the city
field intact:
curl http://host/met/sensors/A -X PUT -d '{temp: 21}'
curl http://host/met/sensors/B -X PUT -d '{temp: 26}'
4. Lookup updated data
curl http://host/met/sensors/A
should return:
{"temp": 21, "city": "NY"}
5. Run a query
Let's say now we wanted to find all sensors that have temperature above a certain threshold. We could run a simple query using the Coffeescript syntax:
curl http://host/met -X POST -d 'stream("sensors")
.filter( (sensor) -> sensor.temp > 25 )'
should return:
{"B": {"temp": 26, "city": "SF"}}
because currently only sensor B
has temperature above 25.
Don't close the query yet! Now while the query is still running open another terminal window and update the temperature for sensor A
to be 27. What you will notice is that the query will return additional results as soon as you do that.
6. Run an aggregate query
Now we would like to see what the average, min and max temperature is currently. Run the following query:
curl http://host/met -X POST -d 'stream("sensors")
.filter( (sensor) -> sensor.temp > 25 )
.stats("temp")'
will give you something like this:
{"stats": {"count": 2, "min": 26.0, "max": 27.0, "avg": 26.5}}
7. Run a historical query
While we were pushing all that data Streametry was automatically saving each change in temperature. Assuming the sensors have been publishing temperature for a while we might want to have a look at what the temperature changes have been in the past hour:
curl http://host/met -X POST -d 'history("sensors", "temp")
.filter( Time.past(1, "hour") )'
should return all the reading with timestamps in the following form:
{"B": {"temp": 26.0, "t": 1403176288126}}
Having all the readings and timestamps is convenient for plotting them on a chart.
8. Run an aggregate query on historical data
Finally we would like to ask questions about past temperature readings such as what was the average in the past day:
curl http://host/met -X POST -d 'history("sensors", "temp")
.filter( Time.past(1, "day") )
.avg("temp")'
Historical queries are live as well so as new data comes in the average will be updated.
Conclusion
These steps should give you an initial idea of how you can publish and query your own data. Have a look at the documentation for more types of queries you can run and how you can publish query results to a web browser.
The url in these examples consisted of four main parts http:// host / met / sensors / A
:
Position | Name | Description |
---|---|---|
1 | host | The address of the server |
2 | met | Name of an application chosen by the user (meteo) |
3 | sensors | Name of a collection |
4 | A | Key in the collection. Name of the sensor in this example. |
Querying
Queries consist of multiple steps and data flows through all of them in sequence. The general structure of the query is:
stream -> filters -> transformations -> aggregations -> result
however everything after stream
is optional.
Stream Queries
To start streaming JSON objects and their changes type:
stream("collection")
Afterwards the query can be followed by one or both of:
.filter( (o) -> o.prop == 'value' ) # filter matching objects
.map( (o) -> {ab: o.a + o.b} ) # transform to another object
where filter
and map
functions work the same way as they would on Javascript arrays (Documentation). Arbitrary Javascript or Coffeescript function can be supplied to filter, map and the rest of query functions.
In this documentation we generally use the arrow ->
syntax for functions however you can always use the JavaScript syntax:
filter( function(sensor) { return sensor.temp > 50; } )
The query can be then followed by an aggregate function:
.sum("prop"); # or max, min, avg, count
.stats("prop"); #gives: max, min, avg, count, sum
.aggregateBy("group by prop", Math.max, "prop")
.hll("prop") # Count distinct values (estimated with HyperLogLog)
.topK("prop", 10) # Top 10 values for property
.reduce( (a, b) -> c ) # combine two objects
.reduceBy(Num.avg, "prop") # reduce by property
See Function Reference for other functions.
Finally, aggregations can be followed by one or more filter/map/flatMap steps.
Historical Queries
Every time you push an update history of changes is saved. You can then analyze this historic data by starting the query with:
history("collection", "prop")
which will return past values of some property in the collection. These values can come from all documents in that collection.
The time period can be limited using:
.filter( Time.past(7, 'days') )
Historical queries can end with aggregates just like stream queries and they have additional aggregators:
.rollup(duration, "time unit")
# Example: rollup(5, 'min') # rollup to 5 min intervals
Historical queries are also live queries so as new data keeps coming in the results will be updated.
Limits
stream
and history
step can be followed by an optional limit:
.limit(10)
to reduce the number of items returned.
Output Throttling
To receive events less frequently you can specify the rate at which events are sent. Anywhere in the query add:
.rate( secondsBetweenEvents )
This is particulary useful in browser or mobile context where frequent updates downgrade UI performance or bandwidth is limited.
Passing arguments
You can pass arguments to queries as a Json object:
.args( {"myArg": 20} )
and then use those arguments in queries:
.filter( (sensor) -> sensor.temp > myArg )
Accessing keys
filter
and map
functions can access keys of objects using the second argument:
.filter( (sensor, key) -> key == 'sensor1' )
.map( (sensor, key) -> {id: key, temp: sensor.temp} )
Nested fields
Nested fields of JSON objects can be accessed just like in JavaScript and can be used in all functions:
sensor.location.city == "NY"
Query Examples
Continuing the sensors examples:
View sensors with maximum temperature in each city:
stream("sensors")
.filter( (sensor) -> sensor.country == "US" )
.aggregateBy("city", Math.max, "temp")
Converting temperature to Fahrenheit:
stream("sensors")
.filter( (sensor) -> sensor.country == "US" )
.map( (s) -> {tempF: (s.temp*9)/5+32})
.stats("tempF")
View statistics (min/max/avg/count) for each day in the past 7 days:
history("sensors", "temp")
.filter( Time.past(7, "days") )
.rollup(1, "day")
For more Examples check the dropdown in PlayGround's Query section.
HTTP API
Url format
The url consists of four main parts:
http(s):// host
/ app
/ collection
/ key
Part | Name | Description |
---|---|---|
1 | host | The address of the server (host:port) |
2 | app | Name of an application chosen by the user |
3 | collection | Name of a collection chosen by the user |
4 | key | Key in the collection. Values are accessed via keys |
Store data
curl http://host/met/sensors/s1 -X POST -d '{temp: 20, city: "NY"}'
Lookup data
Data lookup is performed via the Http GET method:
curl http://host/met/sensors/s1
Update data
PUT method will update individual fields:
curl http://host/met/sensors/s1 -X PUT -d '{temp: 21}'
Continuos updates
You can keep the http connection open for sending multiple updates:
curl http://host/met/sensors/a -X PUT -T -
and then add the updates one by one in each new line:
{temp: 20}
{temp: 22}
Delete
To delete a single value from a collection run:
curl http://host/met/sensors/s1 -X DELETE
And to delete the whole collection run:
curl http://host/met/sensors -X DELETE
Deleting a collection also delets history for that collection.
Query
curl http://host/met -X POST -d 'stream("sensors")
.filter( (sensor) -> sensor.temp > 0 )'
Historical Query
curl http://host/met -X POST -d 'history("sensors", "temp")
.filter( Time.past(1, "hour") )'
Bulk inserts
Storing multiple items at the same time is faster than storing them one by one. Post data to the collection's url with all keys and values in one JSON object:
curl http://host/met/sensors -X POST -d '{s1: {temp: 20},
s2: {temp: 25} }'
Bulk updates work the same way:
curl http://host/met/sensors -X PUT -d '{s1: {temp: 20},
s2: {temp: 25} }'
JavaScript API
To store and query data from a webpage include the Streametry JavaScript library which comes with the server and is located in web/js/
directory:
<script src="http://host/web/js/streametry.min.js"></script>
Then connect to the Streametry server using:
var streametry = new Streametry(appName, 'http://host:port');
For a working example see embed.html on the server or the following JS Fiddle.
Query
Queries have the same general stream-to form as in other APIs:
stream(collection) -> [filter, transform, aggregate] -> to( callback )
In the JavaScript API this is:
var query = streametry.stream( collection )
.filter( ).map( ).aggregateBy( )
.to( callback );
For example:
var query = streametry.stream("sensors")
.filter( function (sensor) { return sensor.temp > 20; } )
.to( function (sensors) {
// do something with sensors result object
console.log(sensors);
});
Historical Queries
Historical queries have the form:
var query = streametry.history( collection, field ) ... .to( callback )
Closing Queries
To stop receiving data simply call:
query.close();
Store
To store a value in a collection use set
:
streametry.set(collection, key, value [, callback ]);
For example:
streametry.set("sensors", "s1", {"temp": 20});
Lookup
To get a value from a collection use get
:
streametry.get(collection, key, callback );
For example:
streametry.get("sensors", "s1", function(sensor) {
console.log("temperature is " + sensor.temp);
});
Disconnect
To disconnect from the server call:
streametry.close();
Java API
Java Library
Include the Streametry Client library in your class path:
./lib/streametry-client.jar
The library is located in the lib
folder of the server and requires Java 1.6+. See also: Javadoc
Connect
To connect to a streametry server add:
Streametry sm = Streametry.client.connect("myApp", "host:8181");
Set/Get data
The Streametry interface provides a number of methods for setting and getting data:
.get
.set
.setAll
.update
.updateAll
.delete
.size
For example:
// set a value for 'sensor1' in collection 'sensors'
sm.set("sensors", "sensor1", new Json("temp", 30));
Refer to the Javadoc documentation of these methods to see the descriptions.
Queries
The main query api is as follows:
DataStream stream = sm.stream("sensors")
.query("filter( (sensor) -> sensor.temp > 10 )")
.to( Handler );
Where the Handler will be called asynchronously with results as they come in. It is also possible to process results synchronously:
DataStream stream = sm.stream("sensors")
.query("filter( (sensor) -> sensor.temp > 10 )")
for(Json result: stream)
System.out.println("new result: " + result);
Historical Queries
Historical queries start with history
:
DataStream stream = sm.history("sensors", "temp")
Close and Disconnect
Data streams and the connection have a .close()
method.
Example
// Connect
Streametry sm = Streametry.client.connect("myApp", "host:8181");
// and then call methods on the Streametry interface:
// push some data
sm.set("sensors", "sensor1", new Json("temp", 25));
// start streaming data
DataStream stream = sm.stream("sensors")
.query("filter( (sensor) -> sensor.temp > 10 )")
for(Json result: stream) // process results synchronously
System.out.println("new result: " + result);
// or process results asynchronously:
stream.to( handler );
Monitoring
Streametry publishes metrics about itself and can be monitored by subscribing to system data streams. These data streams can be used just like any other stream - they can be filtered/aggregated and pushed to a browser.
The following system data streams are supported:
sys.health
Publishes cpu, memory, disk usage and other metrics of Streametry
stream("sys.health")
It also possible to get the history of health metrics:
history("sys.health", "diskUsage")
sys.queries
Currently running queries.
sys.cluster
Shows status of all nodes in the cluster.
sys.collections
All collections and the number of objects in each.
sys.conf
Current configuration that was read from the config file.
sys.logs
The logs stream lets you view the Streametry log in real-time without logging into the remote machine. As soon as messages are logged they are published to this stream. Afterwards you can filter based on severity or message content.
Function Reference
filter
Filters objects matching a condition. For example, filter by city and temperature:
.filter( (sensor) -> sensor.city == 'NY' and sensor.temp > 50 )
or:
.filter( (sensor) -> sensor.city == 'NY' and (sensor.temp < 50 or sensor.temp > 100) )
map
Transforms each object in the stream to a different object. For example, converting temperature from Celsius to Fahrenheit:
.map( (sensor) -> {tempFahrenheit: (sensor.temp * 9) / 5 + 32 } )
flatMap
Transforms object to a different object and produces a stream of nested objects:
.flatMap( (sensor) -> sensor.readings )
For example if we have a sensor with multiple readings in nested objects:
{"readings": {"temp": {"val": 20}, "humidity": {"val": 50}} }
then the above flatMap
will produce two json objects:
{"temp": {"val": 20} }
{"humidity": {"val": 50} }
count
Count number of objects returned:
.count()
min
Compute minimum value for a property:
.min( property )
max
Compute maximum value for a property:
.max( property )
sum
Compute a sum of all property values:
.sum( property )
avg
Compute an average of all property values:
.avg( property )
stats
Returns min/max/count/sum/avg together in one object:
.stats( property )
statsEx
Same as stats
but also returns sample standard deviation:
.statsEx( property )
reduce
Reduce combines a collections of JSON objects into one. The only parameter is an associative reduce function in the form:
.reduce( (a, b) -> c )
It takes two objects and produces a third object c
. Object c
will be passed to the reduce function again with the next object as an argument. The following examples computes a result object where the temperatures of all sensors are summed up:
.reduce( (a, b) -> {temp: a.temp + b.temp} )
reduceBy
Compute an aggregate value for a property:
.reduceBy(Function, property)
where Function
is one of:
Math.max
, Math.min
, Num.sum
, Num.avg
, Num.stats
or a user defined reduce function for two property values. The function needs to be associative.
For example, to compute a product of all property values:
.reduceBy( (a, b) -> a * b, "property")
aggregateBy
Groups objects by a property and computes an aggregate based on another property:
.aggregateBy("group by prop", Function, "prop")
where Function is the same as in reduceBy
.
hyperLogLog
Count distinct values (estimated) for a property:
.hll("city")
topK
Computes top K most common values of a property:
.topK("city", 10)
percentile
Estimates a percentile for a property. Percentile is in the range 0-100:
.percentile("temp", 50)
rollup
Rollup splits historical data into specified periods and computes statistics for each period. To compute statistics for every day run:
.rollup(1, "day")
Other available time periods are: "second", "minute", "hour"