Alex Rodrigues

Data science, distributed systems and big data

Going Event-driven With Kafka in Two Weeks - Part II

In the first part, I’ve described the motivations and requirements for the transition into a event-driven architecture. In this post, I am going to talk about how to perform distributed counting and how Kafka partitioning is handy for this kind of task.

Online group-by operations

As mentioned on part one, each consumer will receive messages from a set of partitions in a way that each partition will have only one consumer. The subscriber process can then have a consumer stream per thread and can co-operate with other instances running on other machines by defining each consumer to belong to the same consumer group. There no gain on having the total number of threads in the consumer cluster higher than the number of partitions, as each partition will comunicate at most through one consumer stream.

Each consumer thread receives a message from a partition. The message is parsed into a POJO that will be a key to a counting hash map.

Parsing a message from Kafka stream.
1
2
3
4
5
6
7
ConsumerIterator<byte[], byte[]> it = stream.iterator();

while (it.hasNext()) {
  String value = new String(it.next().message());
  final String[] tokens = value.split(";", -1);

  // ...

The example above shows how to parse a message, assuming it’s a comma-separated value string. To create a compatible system with the existing log based solution and for debugging purposes, we first adopt the old CSV as message format. However, other solutions exist such as Kryo, Avro and Thrift.

The parsed POJO is the counter key and it has a custom hashCode method implementation. The hashCode is invoked by the Java’s HashMap and it uses the returning value to locate the value internally. When more than one entry exists with the same hashCode, the equals method is used to determine the equality to the query key.

To aggregate the counts by a set of attributes, we can use the hashCode and equals methods. These methods must have a consistent behaviour considering two objects to be equal if the set of group-by attributes have identical values in both. The hashCode calculation has to take this values into account, as shown in the snippet below.

Example of a GroupBy by hour and category.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;

public class SaleEventByHourAndCategory extends SaleEvent {
  static final TimeZone GMT_TZ = TimeZone.getTimeZone("GMT:00");

  protected Long getHour() {
      // consider move this to a Util class
      Calendar calendar = Calendar.getInstance(GMT_TZ);
      calendar.setTime(new Date(getUnixTimestamp()));

      calendar.set(Calendar.MINUTE, 0);
      calendar.set(Calendar.SECOND, 0);
      calendar.set(Calendar.MILLISECOND, 0);

      return Long.valueOf(calendar.getTime().getTime());
    }

  @Override
  public int hashCode() {
      return new HashCodeBuilder()
          .append(getHour())
          .append(getCategoryId())
          .hashCode();
  }

  @Override
  public boolean equals(Object obj) {
      if (obj instanceof SaleEventByHourAndCategory == false) {
          return false;
      } if (this == obj) {
          return true;
      }

      final SaleEventByHourAndCategory otherObject = (SaleEventByHourAndCategory) obj;

      return new EqualsBuilder()
          .append(getHour(), otherObject.getHour())
          .append(getCategoryId(), otherObject.getCategoryId())
          .isEquals();
  }
}

The implementation of hashCode and equals use the Apache Commons Lang library.

Counter Map Implementations

Each consumer thread will count the events in a shared data structure, to avoid spending too much memory with multiple duplicated entries in each thread’s counter map. ConcurrentHashMap is a native JVM implementation that aims to reduce contention by splitting the key space in blocks. Each block will have a lock to control the write accesses.

Other alternative HashMap implementation is the NonBlockingHashMap, written by Azul System’s Dr. Cliff Click, is an implementation that doesn’t assure full consistency ruturning only the result of the last completed update operation. This behaviour is lock free and leverage compare and set/swap operations to allow multiple update operations. A benchmark between both implementations can be found here.

Aggregated Result Flushing

The computed operations, either counts, sums or averages are hold in each subscriber process’s memory and must be flushed to a proper persistent storage. We decided to continue with MySQL to avoid porting all the existent visualization tools. In a scenario of multiple instances of the subscriber process, each instance will have a partial result of the aggregated operation. If there are two processes with 20 threads each, each one will have the count for the messages received from half the partitions. Because they only know part of the result, flushing must be done using an UPDATE SQL query or an INSERT in the case of non-existence of that key. To save the check of existence, an INSERT ON DUPLICATE KEY UPDATE is used.

In the first experiments, the subscriber processes flushed the in-memory counts to the same database. However, some of these subscribers were consuming from a broker in Europe region and had to upload the results to a US datacenter-based MySQL instance. To optimize the flow, all the subscribers will act as producers and will publish their partial results into a separate topic. This topic will be read by a single flusher process.

This change of approach improved the throughtput of MySQL ingestion, because less write transations fail. The flusher process executes transaction after transaction in batches of 2000 queries and has a consistent per batch execution time. The partial results publishing is done to the US broker using Kryo to serialize and Snappy to compress. This attempts to reduce both the bandwidth required and the latency time.

In this post, I’ve covered how to do group-by aggregation operations in-memory and temporarily flush them to a persistent engine, such as MySQL. In the next post I’ll cover infrastructure matters and deployment of the Kafka stack in AWS.

Comments