Apache Storm - Trident


Advertisements

Trident is an extension of Storm. Like Storm, Trident was also developed by Twitter. The main reason behind developing Trident is to provide a high-level abstraction on top of Storm along with stateful stream processing and low latency distributed querying.

Trident uses spout and bolt, but these low-level components are auto-generated by Trident before execution. Trident has functions, filters, joins, grouping, and aggregation.

Trident processes streams as a series of batches which are referred as transactions. Generally the size of those small batches will be on the order of thousands or millions of tuples, depending on the input stream. This way, Trident is different from Storm, which performs tuple-by-tuple processing.

Batch processing concept is very similar to database transactions. Every transaction is assigned a transaction ID. The transaction is considered successful, once all its processing complete. However, a failure in processing one of the transaction's tuples will cause the entire transaction to be retransmitted. For each batch, Trident will call beginCommit at the beginning of the transaction, and commit at the end of it.

Trident Topology

Trident API exposes an easy option to create Trident topology using “TridentTopology” class. Basically, Trident topology receives input stream from spout and do ordered sequence of operation (filter, aggregation, grouping, etc.,) on the stream. Storm Tuple is replaced by Trident Tuple and Bolts are replaced by operations. A simple Trident topology can be created as follow −

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident tuple is a named list of values. The TridentTuple interface is the data model of a Trident topology. The TridentTuple interface is the basic unit of data that can be processed by a Trident topology.

Trident Spout

Trident spout is similar to Storm spout, with additional options to use the features of Trident. Actually, we can still use the IRichSpout, which we have used in Storm topology, but it will be non-transactional in nature and we won’t be able to use the advantages provided by Trident.

The basic spout having all the functionality to use the features of Trident is "ITridentSpout". It supports both transactional and opaque transactional semantics. The other spouts are IBatchSpout, IPartitionedTridentSpout, and IOpaquePartitionedTridentSpout.

In addition to these generic spouts, Trident has many sample implementation of trident spout. One of them is FeederBatchSpout spout, which we can use to send named list of trident tuples easily without worrying about batch processing, parallelism, etc.

FeederBatchSpout creation and data feeding can be done as shown below −

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Operations

Trident relies on the “Trident Operation” to process the input stream of trident tuples. Trident API has a number of in-built operations to handle simple-to-complex stream processing. These operations range from simple validation to complex grouping and aggregation of trident tuples. Let us go through the most important and frequently used operations.

Filter

Filter is an object used to perform the task of input validation. A Trident filter gets a subset of trident tuple fields as input and returns either true or false depending on whether certain conditions are satisfied or not. If true is returned, then the tuple is kept in the output stream; otherwise, the tuple is removed from the stream. Filter will basically inherit from the BaseFilter class and implement the isKeep method. Here is a sample implementation of filter operation −

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Filter function can be called in the topology using “each” method. “Fields” class can be used to specify the input (subset of trident tuple). The sample code is as follows −

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

Function is an object used to perform a simple operation on a single trident tuple. It takes a subset of trident tuple fields and emits zero or more new trident tuple fields.

Function basically inherits from the BaseFunction class and implements the execute method. A sample implementation is given below −

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Just like Filter operation, Function operation can be called in a topology using the each method. The sample code is as follows −

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

Aggregation is an object used to perform aggregation operations on an input batch or partition or stream. Trident has three types of aggregation. They are as follows −

  • aggregate − Aggregates each batch of trident tuple in isolation. During the aggregate process, the tuples are initially repartitioned using the global grouping to combine all partitions of the same batch into a single partition.

  • partitionAggregate − Aggregates each partition instead of the entire batch of trident tuple. The output of the partition aggregate completely replaces the input tuple. The output of the partition aggregate contains a single field tuple.

  • persistentaggregate − Aggregates on all trident tuple across all batch and stores the result in either memory or database.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Aggregation operation can be created using either CombinerAggregator, ReducerAggregator, or generic Aggregator interface. The "count” aggregator used in the above example is one of the build-in aggregators. It is implemented using “CombinerAggregator”. The implementation is as follows −

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Grouping

Grouping operation is an inbuilt operation and can be called by the groupBy method. The groupBy method repartitions the stream by doing a partitionBy on the specified fields, and then within each partition, it groups tuples together whose group fields are equal. Normally, we use “groupBy” along with “persistentAggregate” to get the grouped aggregation. The sample code is as follows −

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merging and Joining

Merging and joining can be done by using “merge” and “join” method respectively. Merging combines one or more streams. Joining is similar to merging, except the fact that joining uses trident tuple field from both sides to check and join two streams. Moreover, joining will work under batch level only. The sample code is as follows −

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

State Maintenance

Trident provides a mechanism for state maintenance. State information can be stored in the topology itself, otherwise you can store it in a separate database as well. The reason is to maintain a state that if any tuple fails during processing, then the failed tuple is retried. This creates a problem while updating the state because you are not sure whether the state of this tuple has been updated previously or not. If the tuple has failed before updating the state, then retrying the tuple will make the state stable. However, if the tuple has failed after updating the state, then retrying the same tuple will again increase the count in the database and make the state unstable. One needs to perform the following steps to ensure a message is processed only once −

  • Process the tuples in small batches.

  • Assign a unique ID to each batch. If the batch is retried, it is given the same unique ID.

  • The state updates are ordered among batches. For example, the state update of the second batch will not be possible until the state update for the first batch has completed.

Distributed RPC

Distributed RPC is used to query and retrieve the result from the Trident topology. Storm has an inbuilt distributed RPC server. The distributed RPC server receives the RPC request from the client and passes it to the topology. The topology processes the request and sends the result to the distributed RPC server, which is redirected by the distributed RPC server to the client. Trident's distributed RPC query executes like a normal RPC query, except for the fact that these queries are run in parallel.

When to Use Trident?

As in many use-cases, if the requirement is to process a query only once, we can achieve it by writing a topology in Trident. On the other hand, it will be difficult to achieve exactly once processing in the case of Storm. Hence Trident will be useful for those use-cases where you require exactly once processing. Trident is not for all use cases, especially high-performance use-cases because it adds complexity to Storm and manages the state.

Working Example of Trident

We are going to convert our call log analyzer application worked out in the previous section to Trident framework. Trident application will be relatively easy as compared to plain storm, thanks to its high-level API. Storm will be basically required to perform any one of Function, Filter, Aggregate, GroupBy, Join and Merge operations in Trident. Finally we will start the DRPC Server using the LocalDRPC class and search some keyword using the execute method of LocalDRPC class.

Formatting the call information

The purpose of the FormatCall class is to format the call information comprising “Caller number” and “Receiver number”. The complete program code is as follows −

Coding: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

The purpose of the CSVSplit class is to split the input string based on “comma (,)” and emit every word in the string. This function is used to parse the input argument of distributed querying. The complete code is as follows −

Coding: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

This is the main application. Initially, the application will initialize the TridentTopology and feed caller information using FeederBatchSpout. Trident topology stream can be created using the newStream method of TridentTopology class. Similarly, Trident topology DRPC stream can be created using the newDRCPStream method of TridentTopology class. A simple DRCP server can be created using LocalDRPC class. LocalDRPC has execute method to search some keyword. The complete code is given below.

Coding: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Building and Running the Application

The complete application has three Java codes. They are as follows −

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

The application can be built by using the following command −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can be run by using the following command −

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Output

Once the application is started, the application will output the complete details about the cluster startup process, operations processing, DRPC Server and client information, and finally, the cluster shutdown process. This output will be displayed on the console as shown below.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends
Advertisements