Querying

Queries consist of multiple steps and data flows through all of them in sequence. The general structure of the query is:

stream -> filters -> transformations -> aggregations -> result

however everything after stream is optional.

Stream Queries

To start streaming JSON objects and their changes type:

stream("collection")  

Afterwards the query can be followed by one or both of:

.filter( (o) -> o.prop == 'value' ) # filter matching objects
.map( (o) -> {ab: o.a + o.b} ) # transform to another object

where filter and map functions work the same way as they would on Javascript arrays (Documentation). Arbitrary Javascript or Coffeescript function can be supplied to filter, map and the rest of query functions.

In this documentation we generally use the arrow -> syntax for functions however you can always use the JavaScript syntax:

filter( function(sensor) { return sensor.temp > 50; }  )

The query can be then followed by an aggregate function:

.sum("prop"); # or max, min, avg, count
.stats("prop"); #gives: max, min, avg, count, sum
.aggregateBy("group by prop", Math.max, "prop")
.hll("prop") # Count distinct values (estimated with HyperLogLog)
.topK("prop", 10) # Top 10 values for property
.reduce( (a, b) -> c ) # combine two objects
.reduceBy(Num.avg, "prop") # reduce by property

See Function Reference for other functions.

Finally, aggregations can be followed by one or more filter/map/flatMap steps.

Historical Queries

Every time you push an update history of changes is saved. You can then analyze this historic data by starting the query with:

history("collection", "prop") 

which will return past values of some property in the collection. These values can come from all documents in that collection.

The time period can be limited using:

.filter( Time.past(7, 'days') )

Historical queries can end with aggregates just like stream queries and they have additional aggregators:

.rollup(duration, "time unit")
# Example: rollup(5, 'min') # rollup to 5 min intervals

Historical queries are also live queries so as new data keeps coming in the results will be updated.

Limits

stream and history step can be followed by an optional limit:

.limit(10)

to reduce the number of items returned.

Output Throttling

To receive events less frequently you can specify the rate at which events are sent. Anywhere in the query add:

.rate( secondsBetweenEvents )

This is particulary useful in browser or mobile context where frequent updates downgrade UI performance or bandwidth is limited.

Passing arguments

You can pass arguments to queries as a Json object:

.args( {"myArg": 20} )

and then use those arguments in queries:

.filter( (sensor) -> sensor.temp > myArg )

Accessing keys

filter and map functions can access keys of objects using the second argument:

.filter( (sensor, key) -> key == 'sensor1' )
.map( (sensor, key) -> {id: key, temp: sensor.temp} )    

Nested fields

Nested fields of JSON objects can be accessed just like in JavaScript and can be used in all functions:

sensor.location.city == "NY"

Query Examples

Continuing the sensors examples:

View sensors with maximum temperature in each city:

stream("sensors")
 .filter( (sensor) -> sensor.country == "US" )
 .aggregateBy("city", Math.max, "temp")

Converting temperature to Fahrenheit:

stream("sensors")
 .filter( (sensor) -> sensor.country == "US" )
 .map( (s) -> {tempF: (s.temp*9)/5+32})
 .stats("tempF")     

View statistics (min/max/avg/count) for each day in the past 7 days:

history("sensors", "temp")
 .filter( Time.past(7, "days") )
 .rollup(1, "day")

For more Examples check the dropdown in PlayGround's Query section.