Heron Going Exactly-Once

Last year Twitter open sourced Heron, their next-generation streaming engine. Heron has been proven in production at Twitter for over three years and is powering real-time processes, workflows and analytics. Heron was developed to replace Storm, providing a system that scales better, improves developer productivity, is easier to debug, provides better efficiency and is easier to operate in a multi-tenant cluster environment. Heron was open sourced by Twitter in May, 2016.

Most streaming systems provide guarantees for the data they are processing. These guarantees fall into the following three categories:

  • At-most-once: The system processes the data with the best effort. It is possible that some of the data injected into the system might be lost due to process failures, machine failures and network failures.

  • At-least-once: The data injected into the system is guaranteed to be processed at least once. However, it is possible that the data is processed more than once in the presence of failures.

  • Exactly-once: The streaming system ensures that the data it receives is processed exactly once -- even in the presence of various failures -- leading to accurate results.

Most production Heron streaming applications were centered around at-most-once and at-least-once. As the adoption of Heron expands across enterprises and industries, there is a need to support exactly-once. Streamlio collaborated with Twitter and Microsoft to develop and incorporate exactly-once into Heron. In this blog post, we outline some of the requirements and provide previews of this upcoming feature.

Heron Overview

To better understand the support for exactly-once, we provide an overview of Heron concepts and data model. A streaming job in Heron is called a Topology. A topology is a directed acyclic graph whose nodes are the data computing elements and the edges represent streams of data flowing between those elements. There are two types of nodes: Spouts that connect to a data source and inject to a stream as tuples and Bolts that process the incoming data tuple and emit outgoing tuples. A Heron topology can be loosely thought of as a logical plan. However, in order to run on physical machines, the developer needs to indicate the number of instances of each spout/bolt. These instances are packed into several containers and these containers are run on a cluster of machines. An example Heron Topology and its physical plan is shown in Figures 1 and 2.

 

Figure 1: An example Heron topology

 

Based on the anticipated use cases, we identified the following initial requirements for supporting exactly-once:

  • Addition of exactly-once should not impact the performance and functionality of at-most-once and at-least-once topologies in production.

  • Latency overhead for exactly-once topologies should be negligible when compared to running them without exactly-once guarantees.

  • Writing exactly-once topologies should be easy for topology developers. Heron should provide the required minimal low-level APIs to achieve stateful processing and exactly-once, and these APIs can be leveraged for higher-level APIs, for instance built-in stateful window support.

  • State store to support exactly-once should provide an abstraction for supporting different types of storage.

 

Figure 2: Physical plan for the example Heron topology

 

State and Stateful Components

To support exactly-once, each spout/bolt should have the notion of State. State represents the results of computation accumulated over a period of time. For example, the state in a bolt might refer to the count of the number of retweets of several tweets. State is defined as an interface to the key value map that extends the java map.


/**
* State represents the interface as seen by stateful bolts and  
* spouts. In Heron, state gives a notional Key/Value interface
* along with the ability to iterate over the key/values
*/
public interface State <K extends Serializable,
                        V extends Serializable> extends Map<K, V> {
}

For adding state to a spout or a bolt, it needs to derive from an interface called IStatefulComponent for saving and retrieving state.


/**
* State represents the interface as seen by stateful bolts and  
* spouts. In Heron, state gives a notional Key/Value interface
* along with the ability to iterate over the key/values
*/
public interface State <K extends Serializable,
                        V extends Serializable> extends Map<K, V> {
}


/**
 * Defines the interface for a component to save its internal state
 * in the state interface
 */
public interface IStatefulComponent<K extends Serializable,
                                    V extends Serializable> 
                                                     extends IComponent {

  /**
   * Initializes the state of the spout/bolt to that of a previous
   * checkpoint. This method is invoked when a component is executed  
   * as part of a recovery run. In case, there was prior state   
   * associated with the component, the state will be empty.
   *
   * Stateful Spouts/Bolts are expected to hold on to the state
   * variable to save their internal state
   * 
   * Note that initState(State<K, V> state) is called before spout open() and bolt
   * prepare().
   *
   * @param state the previously saved state of the component.
   */
  void initState(State<K, V> state);

  /**
   * This is a hook for the component to perform some actions just 
   * before the framework saves its state.
   *
   * @param checkpointId the ID of the checkpoint
   */
  void preSave(String checkpointId);
}

State Storage

State Storage provides the abstract interface for storing and retrieving state. This abstract interface can be implemented for different types of storage systems.


public interface IStatefulStorage {
/**
 * Initialize the stateful storage with the given config
 */
 void init(Map<String, Object> conf);

/**
 * Cleanup the state storage backend
 */
 void close();

// store the checkpoint
boolean store(Checkpoint checkpoint);

// retrieve the checkpoint
boolean restore(Checkpoint checkpoint);

// dispose the checkpoint
boolean dispose(String topologyName, String oldestCheckpointId,
                boolean deleteAll);
}

Stateful Topology

Now that we have defined the notion of state and the methods each component needs to implement for storing and recovering state, let us look at how a simple stateful word count topology can be written. In the topology, the TestWordSpout generates words from a list of words and pass it on to the ConsumerBolt that counts the words.

The state in TestWordSpout is essentially the offset position in the word array. When the spout fails, it can start from the last known state when it is check pointed.


public static class TestWordSpout extends BaseRichSpout 
                            implements IStatefulComponent<String, Integer> {

  private SpoutOutputCollector collector;
  private String[] words;

  // accumulate the state
  private Map<String, Integer> posMap;

  // current state of the word count
  private State posState;

  public void open(Map<String, Object> conf, TopologyContext context,
                                          SpoutOutputCollector acollector) {
    collector = acollector;
    words = new String[]{"nathan", "mike", "jackson", "golda", "bertels"};
    posMap = new HashMap<String, Integer>();

    for (Map.Entry<Serializable, Serializable> entry: posState.entrySet()) {
      if (entry.getKey() instanceof String && entry.getValue() instanceof   
                                                                 Integer) {
        posMap.put((String)entry.getKey(), (Integer)entry.getValue());
      }
    }
  }

  public void nextTuple() {
    int offset = posMap.getOrDefault("current", 0);
    final String word = words[offset];
    posMap.put("current", ++offset % words.length);
    collector.emit(new Values(word));
  }

  // initialize the state when the spout recovers from failure
  @Override
  public void initState(State state) {
    posState = state;
  }

  // copy the state before saving into persistent store
  @Override
  public void preSave(String checkpointId) {
    for (Map.Entry<String, Integer> entry : posMap.entrySet()) {
      posState.put(entry.getKey(), entry.getValue());
    }
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

In the ConsumerBolt, the state is count of the number of words that has been accumulated so far. This state is saved and retrieved whenever a failure occurs. The bolt variable countMap accumulates the count of words and it transfers the state to countState before it is saved. Similarly, the state retrieved first populates countState before it is transferred to countMap during the call to prepare.


public static class ConsumerBolt extends BaseRichBolt 
                            implements IStatefulComponent<String, Integer> {

  private OutputCollector collector;

  // accumulate the state
  private Map<String, Integer> countMap;

  // current state of the word count
  private State countState;

  public void prepare(Map map, TopologyContext topologyContext,
                      OutputCollector outputCollector) {
    collector = outputCollector;
    countMap = new HashMap<String, Integer>();

    for(Map.Entry<Serializable, Serializable> entry:countState.entrySet()) {
      if (entry.getKey() instanceof String && entry.getValue() instanceof  
                                                               Integer) {
        countMap.put((String)entry.getKey(), (Integer)entry.getValue());
      }
    }
  }

  @Override
  public void execute(Tuple tuple) {
    String key = tuple.getString(0);
    int val = countMap.getOrDefault(key, 0);
    countMap.put(key, ++val);
  }

  // initialize the state when the bolt recovers from failure
  @Override
  public void initState(State state) {
    countState = state;
  }

  // copy the state before before saving into persistent store
  @Override
  public void preSave(String checkpointId) {
    for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
      countState.put(entry.getKey(), entry.getValue());
    }
  }
}

Finally, the topology is assembled using TestWordSpout and ConsumerBolt. It also sets the desired amount of resources needed and finally enables the flag that it is a stateful topology.


TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), parallelism);
builder.setBolt("consumer", new ConsumerBolt(), parallelism)
        .fieldsGrouping("word", new Fields("word"));

Config conf = new Config();
conf.setNumStmgrs(parallelism);


// Set the config for resources here
conf.setComponentRam("word", ByteAmount.fromGigabytes(2));
conf.setComponentRam("consumer", ByteAmount.fromGigabytes(3));
conf.setContainerCpuRequested(6);

// For stateful processing
conf.put(Config.TOPOLOGY_STATEFUL, true);
conf.put(Config.TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL, 30);

HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());

With the inclusion of support for exactly-once, Heron provides at-most-once, at-least-once and exactly-once guarantees seamlessly in a single streaming engine. This provides the flexibility for developers to choose the appropriate level of guarantee depending on the application. We are looking forward to releasing exactly-once to open source soon for wider consumption, and welcome collaborators and contributors joining the growing Heron (www.heron.io) community.

References

[1] Twitter Heron: Towards Extensible Streaming Engines, IEEE International Conference on Data Engineering, April 2017.

[2] Streaming@Twitter, IEEE Data Engineering Bulletin, Special Issue on Next-Generation Stream Processing, Edited by David Maier and Badrish Chandramouli, Dec 2015.

[3] Open Sourcing Twitter Heron, Twitter Engineering Blog, May 25, 2016.

[4] Flying faster with Twitter Heron, Twitter Engineering Blog, June 2015.

[5] Twitter Heron: Streaming at Scale, Proceedings of ACM SIGMOD Conference, Australia, June 2015.

Sanjeev Kulkarni