Streaming analytics
With streaming analytics, you can constantly perform statistical analysis while moving within a data stream.
In this topic:
You can use streaming analytics to manage, monitor, and record real-time analytics of live streaming data so you can quickly extract the necessary information from big volumes of data to react to changing conditions in real time. Businesses generate continuous data from the following sources:
Log files generated by customers using mobile or web applications, e-commerce purchases, and in-game player activity.
Telemetry, such as data from connected devices, sensors, and instrumentation in data centers.
Data collected from social networks, financial trading systems, and geospatial services.
Once collected, the streaming data values from these sources will be processed sequentially and incrementally on a record-by-record basis or a time-based sliding window. Ingesting a window of values allows for both processing and analysis of the data, such as through correlating, aggregating, filtering, and sampling. The following figure is an example of a time-based sliding window.

Companies use this information to gain insights into their business and customer activity, such as service usage for billing rates, server activity, website clicks, and geo-locations of devices, people, or physical goods. For example, businesses can track changes in public sentiment on their brands and products by continuously monitoring and analyzing social media streams, and then quickly respond as needed.
The Internet of Things (IoT) also creates large stores of streaming data. Smart objects, such as cars, appliances, and electronic devices, produce data points throughout their operations, activities, and behaviors. Businesses can analyze these points in the data streams to reduce operating costs, improve product reliability, or optimize usage models. For example, you can monitor equipment performance based on its data output. Continuous pattern detection finds anomalies referred to as data gaps. These gaps help to pinpoint when to buy material, plan modifications, and staff personnel.
IoT devices and communication protocols, including text data and transmissions from both legacy and modern equipment sensors, for example, create streaming data of various formats. These multiple formats must be normalized, cleansed, and standardized to process individual events in-memory. Data must be continually corrected and assessed in windows before analysis.
Before you can use streaming analytics, you must ingest the data into PDI as it is received. Within PDI, you can also send event messages to trigger a process of Extract, Transform, and Load (ETL) alerts.
Get started
Think of Pentaho Data Integration (PDI) as a set of pipes. Data flows through them like water.
PDI is designed for continuous processing. The flow does not stop, even with large sources.
Pipe “size” maps to record volume and memory use. Performance depends on which steps change flow rate.
You can build a transformation that always waits for new data. Steps keep running and wait for records.
Input steps ingest stream records into PDI. You then process the data to refine it.
After processing, you can push results back to the stream. You can also retain them for analysis.
Data ingestion
Data is ingested into PDI by pulling messages from a stream into a transformation through a specified window.
A consumer step in a parent transformation pulls data into PDI. It then runs a child sub-transformation.
The child sub-transformation executes based on the window parameters. The window creates a continuous stream of records in near real time.
In the consumer step, you define how many messages to accept. You also define the data formats.
You can set up the step to collect events. You can also monitor alerts. You can also track consumption.
You can stream records back from the child transformation. Select a step in the child transformation.
The parent transformation then passes records downstream. It can include other steps.
Consumer steps that ingest streaming data into PDI include:
AMQP Consumer: Advanced Message Queuing Protocol (AMQP) brokers
JMS Consumer: Apache ActiveMQ Java Messaging Service server or IBM MQ middleware
Kafka Consumer: Kafka server
Kinesis Consumer: Amazon Kinesis Data Streams service
MQTT Consumer: Message Queuing Telemetry Transport (MQTT) broker or clients
In PDI, the data stream window is defined by duration (milliseconds) or number of rows.
The window is created when either limit is reached. For example:
Duration is
1000ms and rows is1000.PDI creates a window every
1000ms or every1000rows.
If you set duration or number of rows to 0, PDI ignores that parameter. For example:
Duration is
1000ms and rows is0.PDI creates a window every
1000ms only.
You can also set the maximum number of concurrent batches.
Set this only when the consumer step cannot keep pace with the stream.
Ensure your environment has enough CPU and memory. Otherwise, errors can occur.
Depending on your setup, you can run the transformation in PDI. You can also run it on Spark.
Spark runs in the Adaptive Execution Layer (AEL). Set the engine in the Run Options dialog box.
Spark executes the child transformation by duration only. It does not use the row limit.
If you use the Spark engine on streaming data, your transformation uses native Spark Streaming.
PDI does not report execution results in this case. Review results in Spark on your cluster.
Before using a consumer step with big data, connect Pentaho to a cluster. See Connecting to a Hadoop cluster with the PDI client.
Data processing
After ingestion through windowing, you can process windows in the child transformation.
Use the child transformation to adjust the data. Use it to handle event alerts.
After processing, you can load windowed data to outputs. You can also publish it back to the stream.
Producer steps that publish data back to the stream include:
AMQP Producer: Advanced Message Queuing Protocol (AMQP) brokers
JMS Producer: Apache ActiveMQ Java Messaging Service server or IBM MQ middleware
Kafka Producer: Kafka server
Kinesis Producer: Amazon Kinesis Data Streams service
MQTT Producer: Message Queuing Telemetry Transport (MQTT) broker or clients
You can also capture data for analysis using the streaming window.
You can create streaming Pentaho Data Services from output steps in the child transformation.
You can build dashboards with CTools using those services as sources. See App Builder, CDE, and CTools.
Once started, streaming transformations run continuously. You can stop them using:
The Stop option in the PDI client
The Abort step in the parent or child transformation
Restarting the Pentaho or Spark execution engine
Stopping or aborting a continuous transformation can cause data loss.
Plan changes carefully. Changing stream flow affects what is ingested.
If you use Kafka, you can control when offsets are committed.
Use this to retain data if message flow is interrupted.
Last updated
Was this helpful?

