Edit me

Edgent is designed to accelerate your development of event driven flow-graph style analytic applications running on edge devices. This is achieved by Edgent's combination of API, connectors, basic analytics, utilities, and openness!

Let's have some fun with a shallow but broad view into what you can do in a few of lines of code... an introduction to Edgent's capabilities via a series of terse code fragments.

See the Getting Started Guide for a step by step introduction, and information about full samples and recipies.

Let's start with a complete application that periodically samples a sensor and publishes its values to an Enterprise IoT Hub in less than 10 lines of code

public class ImpressiveEdgentExample {
  public static void main(String[] args) {
    DirectProvider provider = new DirectProvider();
    Topology top = provider.newTopology();

    IotDevice iotConnector = IotpDevice.quickstart(top, "edgent-intro-device-2");
    // open https://quickstart.internetofthings.ibmcloud.com/#/device/edgent-intro-device-2

    // ingest -> transform -> publish
    TStream<Double> readings = top.poll(new SimulatedTemperatureSensor(), 1, TimeUnit.SECONDS);
    TStream<JsonObject> events = readings.map(JsonFunctions.valueOfNumber("temp"));
    iotConnector.events(events, "readingEvents", QoS.FIRE_AND_FORGET);


Ok, that was 11 lines and it omitted the imports, but there are only 7 lines in main()!

That leveraged the IotpDevice connector to the IBM Watson IoT Platform and the platform's Quickstart feature. The value of its Quickstart feature is no account or device preregistration and the ability to open a browser to see the data being published. Great to quickly get started.

Hopefully that had enough of a wow factor to encourage you to keep reading!

Connectors, Ingest and Sink

Edgent Applications need to create streams of data from external entities, termed ingest, and sink streams of data to external entities.
There are primitives for those operations and a collection of connectors to common external entities, more Connectors contributions are welcome!

Connectors are just code that make it easier for an Edgent application to integrate with an external entity. They use Edgent ingest primitives like (Topology.poll(), Topology.events(), etc), and TStream.sink() like any other Edgent code. A connector may provide Supplier and Consumer functions, for ingest and sink respectively, that an application can use directly with the Edgent API.

OK... fewer words, more code!

You've already seen publishing using the IotpDevice connector.

Want to receive IotDevice device commands? Simple!

    TStream<JsonObject> cmds = iotConnector.commands();
    cmds.sink(cmd -> System.out.println("I should handle received cmd: "+cmd));

    TStream<JsonObject> xzyCmds = iotConnector.command("xyzCmds");

There's an IotGateway device model too.

Don't want no stinkin IotDevice model and just want to pub/sub to an MQTT server? No worries! Use the MqttStreams connector

    //IotDevice iotConnector = IotpDevice.quickstart(top, "edgent-intro-device-2");
    MqttStreams iotConnector = new MqttStreams(top, "ssl://myMqttServer:8883", "my-device-client-id");


    //iotConnector.events(events, "readingEvents", QoS.FIRE_AND_FORGET);
    iotConnector.publish(events, "readingEvents", QoS.FIRE_AND_FORGET, false);

    TStream<JsonObject> xyzTopicMsgs = iotConnector.subscribe("xyzTopic");

Want to connect to Kafka? Use the KafkaProducer and KafkaConsumer connectors with similar ease.

There's a JdbcStreams connector too.

Want to sink a TStream to rolling text files? Use the FileStreams connector.

    new File("/tmp/MY-DEMO-FILES").mkdir();
    FileStreams.textFileWriter(events.asString(), () -> "/tmp/MY-DEMO-FILES/READINGS");

    // tail -f /tmp/MY-DEMO-FILES/.READINGS

Or watch for, ingest and process text files? Csv can be useful if your input lines of comma separated values

   String watchedDir = "/some/directory/path";
   List<String> csvFieldNames = ...
   TStream<String> pathnames = FileStreams.directoryWatcher(top, () -> watchedDir, null);
   TStream<String> lines = FileStreams.textFileReader(pathnames);
   TStream<JsonObject> parsedLines = lines.map(line -> Csv.toJson(Csv.parseCsv(line), csvFieldNames));

Want to sink to a command's stdin? Use the CommandStreams connector

    TStream<MyEvent> events = ...
    ProcessBuilder cmd = new ProcessBuilder("cat").redirectOutput(new File("/dev/stdout"));
    CommandStreams.sink(events.asString(), cmd);

Or ingest a command's stdout/err?

    ProcessBuilder cmd = new ProcessBuilder("date", "-R");
    TStream<List<String>> readings = CommandStreams.periodicSource(top, cmd, 1, TimeUnit.SECONDS);

    TStream<JsonObject> events =
          .flatMap(list -> list)

    // also note TStream support for a fluent programming style
    // and use of TStream.flatmap() to transform in input list to
    // an output list and then add each output list item as a separate
    // tuple to the output stream

Want to sink to a log via SLF4J or another logging system? Just do it!

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    private static final Logger logger = LoggerFactory.getLogger(MyClass.class);

    readings.sink(reading -> logger.info("reading: {}", reading));

Want to publish to Elasticsearch? See EDGENT-368 for a full code example.

More on Ingest

You've seen how to periodically poll a function to get a some data.
That's just one of the methods defined in Topology for ingesting data - for creating source streams.

Also note that the tuples (a.k.a. events, data, objects) in a TStream can be any type. There's no special Edgent tuple type hierarchy.

Want readings from multiple sensors in a single stream tuple?

    SimulatedTemperatureSensor tempSensor = new SimulatedTemperatureSensor();
    SimpleSimulatedSensor pressureSensor = new SimpleSimulatedSensor();

    TStream<JsonObject> events = top.poll( () -> {
          JsonObject jo = new JsonObject();
          jo.addProperty("temp", tempSensor.get());
          jo.addProperty("pressure", pressureSensor.get());
          return jo;
        }, 1, TimeUnit.SECONDS);

Want to define a class or use an existing one for a tuple?

    public class SensorReading {
        double temp;
        double pressure;
        public SensorReading(double temp, double pressure) {
            this.temp = temp;
            this.pressure = pressure;

    SimulatedTemperatureSensor tempSensor = new SimulatedTemperatureSensor();
    SimpleSimulatedSensor pressureSensor = new SimpleSimulatedSensor();

    TStream<SensorReading> readings = top.poll(
        () -> new Reading(tempSensor.get(), pressureSensor.get()),
        1, TimeUnit.SECONDS);

Simulated Sensors

Edgent provides some simple simulated sensors that can be helpful to get going. You've already seen SimulatedTemperatureSensor and SimpleSimulatedSensor - included in the Edgent Samples source release bundle. There are additional ones in the same Java package.

Sensor library

Wondering if Edgent has a sensor library? It does not because there seems to be little value in supplying one.

Using Edgent's stream ingest primitives (e.g., Topology.poll(), Topology.events(), etc) it's trivial for you to call the sensor's APIs to get readings and compose them into a stream data object of your choosing to be added a TStream.


Let's get back to our original ImpressiveEdgentExample and explore making it smarter - push more analytics out to the edge!

Want to only publish readings with values less than 5 or more than 30?

    // add this after the poll()
    readings = readings.filter(tuple -> tuple < 5d || tuple > 30d);

Your filter predicate function can do what ever it needs to do!

Or use the Range utility

    Range<Double> range = Ranges.open(5, 30);
    readings = readings.filter(reading -> !range.contains(reading));


    readings = readings.filter(Ranges.outsideOf(Ranges.open(5, 30)));

That alone isn't a very compelling use of Range but consider a larger context. A Range has a simple String representation (e.g., the above is "(5d,30d)") so that value could be read from an application configuration file on startup, or received from a device command to create a dynamically configurable application.

Filters.deadband offers a more sophisticated filter. There's a nice graphic depection of the filter behavior in the method's javadoc.

    Range<Double> range = Ranges.open(5, 30);
    readings = Filters.deadband(readings, reading -> reading, range);

There's also a Filters.deadtime filter can can come in handy.


Want to split a TStream into multiple TStreams - e.g., to handle different categories of tuples differently? TStream.split() is essentially an efficient group of multiple filters.

    List<TStream<Double>> twoStreams = readings.split(2, d -> range.test(d) ? 0 : 1);
    TStream<Double> inRange = twoStreams.get(0);
    TStream<Double> outOfRange = twoStreams.get(1);

Your split function can yield as many output streams as you need. There's also a form of split() that works with Enum identifiers.


Want to convert a value that's in Centigrade to Farenheit?

    readings = readings.map(reading -> (reading * 1.8) + 32);

Your map function can do what ever it needs to do! E.g., a tuple could be a video image frame and the map function could generate face detection events.

We've already seen converting a numeric value to a JsonObject using JsonFunctions

    TStream<JsonObject> events = readings.map(JsonFunctions.valueOfNumber("temp"));

What about scoring against a model? Edgent doesn't have anything special for that at least at this time.
But it's easy to integrate the use of some scoring model system into an Edgent application.
Imagine a package defined something like:

    public class Model {
      // load model from a File
      public Model(File f) { ... };
      // score String s against the model
      // return a confidence score between 0.0 and 1.0
      public Double score(String s);

An Edgent application might use such a model in this manner

    final Model model = new Model(...);

    TStream<String> events = ...
    TStream<JsonObject> scoredEvents = events.map(event -> {
        Double score = model.score(event);
        JsonObject jo = new JsonObject();
        jo.addProperty("event", event);
        jo.addProperty("score", score);
        return jo;
    TStream<JsonObject> interestingEvents = 
        scoredEvents.filter(jo -> jo.getAsJsonPrimitive.getAsDouble("score") > 0.75);

OK, maybe that one was a bit too large a code fragment for this introduction.

Windowing and aggregation

Want to do signal smoothing - create a continuously aggregated average over the last 10 readings? Here, each time a tuple is added to the window a new aggregated value computed and is added to the output stream.

    TWindow<Double,Integer> window = readings.last(10, Functions.unpartitioned());
    TStream<Double> readings = window.aggregate((List<Double> list, Integer partition) -> {
          double avg = 0.0;
          for (Double d : list) avg += d;
          if (list.size() > 0) avg /= list.size();
          return avg;

Want a window over the last 10 seconds instead?

    // TWindow<Double,Integer> window = readings.last(10, Functions.unpartitioned());
    TWindow<Double,Integer> window = readings.last(10, TimeUnit.SECONDS, Functions.unpartitioned());

Or want to do data reduction - reduce the readings to one average value every window batch? Once the window is full, the batch of tuples is aggregated, the result is added to the output stream and the window is cleared. The next aggregation isn't generated until the window is full again.

    TStream<Double> readings = window.batch((List<Double> list, Integer partition) -> {
          double avg = 0.0;
          for (Double d : list) avg += d;
          if (list.size() > 0) avg /= list.size();
          return avg;

Or use Aggregations for simple statistics

    TStream<Double> readings = window.batch((List<Double> list, Integer partition) ->
          Aggregations.aggregate(list, Statistics2.MEAN));

Want to compute several basic statistics and a regression for an aggregation? Use AggregationsN

    TWindow<Double,Integer> window = readings.last(10, Functions.unpartitioned());
    TStream<ResultMap> aggResults = window.batch((List<Double> list, Integer partition) -> 
                       AggregationsN(list, Statistic2.MIN, Statistic2.MAX,
                                     Statistic2.SUM, Statistic2.STDDEV,
                                     Statistic2.COUNT, Regression2.SLOPE));
    TStream<JsonObject> joAggResults = aggResults(ResultMap.toJsonObject()); // optional

There's also support for multi-variable aggregations - independent statistic aggregations for multiple variables in a list of tuples. e.g., temperatures and pressures variables in each tuple.

If the objects in the window are a JsonObject, JsonAnalytics can be handy.


Want to run an expensive computation on multiple tuples in parallel?
Easy with PlumbingStreams.parallel()!

    TStream<SensorReading> readings = ...
    // 3 parallel channels - i.e., can process 3 tuples simultaneously on separate theads
    TStream<MyData> analyzed = PlumbingStreams.parallel(readings,
                   3, PlumbingStreams.roundRobinSplitter(),
                   (input, channel, output) ->
                     input.map(reading -> myExpensiveComputation(reading)));

See PlumbingStreams.parallelBalanced for a load balanced form that will assigns a tuple to any idle channel.

There are a variety of useful features in PlumbingStreams.

Wrap up

We touched on a lot, but not all, of Edgent. Hopefully you're convinced that the combination of Edgent's API, connectors, etc are powerful and easy to use.

See the full Edgent APIs Javadoc and Getting Started Guide for more information including pointers to more introductory material and samples and recipies.