Querying
Queries consist of multiple steps and data flows through all of them in sequence. The general structure of the query is:
stream -> filters -> transformations -> aggregations -> result
however everything after stream
is optional.
Stream Queries
To start streaming JSON objects and their changes type:
stream("collection")
Afterwards the query can be followed by one or both of:
.filter( (o) -> o.prop == 'value' ) # filter matching objects
.map( (o) -> {ab: o.a + o.b} ) # transform to another object
where filter
and map
functions work the same way as they would on Javascript arrays (Documentation). Arbitrary Javascript or Coffeescript function can be supplied to filter, map and the rest of query functions.
In this documentation we generally use the arrow ->
syntax for functions however you can always use the JavaScript syntax:
filter( function(sensor) { return sensor.temp > 50; } )
The query can be then followed by an aggregate function:
.sum("prop"); # or max, min, avg, count
.stats("prop"); #gives: max, min, avg, count, sum
.aggregateBy("group by prop", Math.max, "prop")
.hll("prop") # Count distinct values (estimated with HyperLogLog)
.topK("prop", 10) # Top 10 values for property
.reduce( (a, b) -> c ) # combine two objects
.reduceBy(Num.avg, "prop") # reduce by property
See Function Reference for other functions.
Finally, aggregations can be followed by one or more filter/map/flatMap steps.
Historical Queries
Every time you push an update history of changes is saved. You can then analyze this historic data by starting the query with:
history("collection", "prop")
which will return past values of some property in the collection. These values can come from all documents in that collection.
The time period can be limited using:
.filter( Time.past(7, 'days') )
Historical queries can end with aggregates just like stream queries and they have additional aggregators:
.rollup(duration, "time unit")
# Example: rollup(5, 'min') # rollup to 5 min intervals
Historical queries are also live queries so as new data keeps coming in the results will be updated.
Limits
stream
and history
step can be followed by an optional limit:
.limit(10)
to reduce the number of items returned.
Output Throttling
To receive events less frequently you can specify the rate at which events are sent. Anywhere in the query add:
.rate( secondsBetweenEvents )
This is particulary useful in browser or mobile context where frequent updates downgrade UI performance or bandwidth is limited.
Passing arguments
You can pass arguments to queries as a Json object:
.args( {"myArg": 20} )
and then use those arguments in queries:
.filter( (sensor) -> sensor.temp > myArg )
Accessing keys
filter
and map
functions can access keys of objects using the second argument:
.filter( (sensor, key) -> key == 'sensor1' )
.map( (sensor, key) -> {id: key, temp: sensor.temp} )
Nested fields
Nested fields of JSON objects can be accessed just like in JavaScript and can be used in all functions:
sensor.location.city == "NY"
Query Examples
Continuing the sensors examples:
View sensors with maximum temperature in each city:
stream("sensors")
.filter( (sensor) -> sensor.country == "US" )
.aggregateBy("city", Math.max, "temp")
Converting temperature to Fahrenheit:
stream("sensors")
.filter( (sensor) -> sensor.country == "US" )
.map( (s) -> {tempF: (s.temp*9)/5+32})
.stats("tempF")
View statistics (min/max/avg/count) for each day in the past 7 days:
history("sensors", "temp")
.filter( Time.past(7, "days") )
.rollup(1, "day")
For more Examples check the dropdown in PlayGround's Query section.