Apache Kafka

Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics. It is based on Apache Flink’s universal Kafka connector and provides exactly-once processing semantics.

Dependency

To use the Kafka I/O Module, please include the following dependency in your pom.

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>stateful-functions-kafka-io</artifactId>
    <version>1.1-SNAPSHOT</version>
    <scope>provided</scope>
</dependency>

Kafka Ingress Builder

A KafkaIngressBuilder declares an ingress spec for consuming from Kafka cluster.

It accepts the following arguments:

  1. The ingress identifier associated with this ingress
  2. The topic name / list of topic names
  3. The address of the bootstrap servers
  4. A KafkaIngressDeserializer for deserializing data from Kafka
  5. Properties for the Kafka consumer
package com.ververica.statefun.docs.io.kafka;

import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.io.IngressIdentifier;
import com.ververica.statefun.sdk.io.IngressSpec;
import com.ververica.statefun.sdk.kafka.KafkaIngressBuilder;
import org.apache.kafka.clients.consumer.ConsumerConfig;

public class IngressSpecs {

  public static final IngressIdentifier<User> ID =
      new IngressIdentifier<>(User.class, "ververica", "input-ingress");

  public static final IngressSpec<User> kafkaIngress =
      KafkaIngressBuilder.forIdentifier(ID)
          .withKafkaAddress("localhost:9092")
          .withTopic("my-topic")
          .withDeserializer(UserDeserializer.class)
          .withProperty(ConsumerConfig.GROUP_ID_CONFIG, "greetings")
          .build();
}

Please refer to the Kafka consumer configuration documentation for the full list of available properties.

Kafka Deserializer

The Kafka ingress needs to know how to turn the binary data in Kafka into Java objects. The KafkaIngressDeserializer allows users to specify such a schema. The T deserialize(ConsumerRecord<byte[], byte[]> record) method gets called for each Kafka message, passing the key, value, and metadata from Kafka.

package com.ververica.statefun.docs.io.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.kafka.KafkaIngressDeserializer;
import java.io.IOException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserDeserializer implements KafkaIngressDeserializer<User> {

  private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);

  private final ObjectMapper mapper = new ObjectMapper();

  @Override
  public User deserialize(ConsumerRecord<byte[], byte[]> input) {
    try {
      return mapper.readValue(input.value(), User.class);
    } catch (IOException e) {
      LOG.debug("Failed to deserialize record", e);
      return null;
    }
  }
}

Kafka Egress Spec

A KafkaEgressBuilder declares an egress spec for writing data out to a Kafka cluster.

It accepts the following arguments:

  1. The egress identifier associated with this egress
  2. The address of the bootstrap servers
  3. A KafkaEgressSerializer for serializing data into Kafka
  4. The fault tolerance semantic
  5. Properties for the Kafka producer
package com.ververica.statefun.docs.io.kafka;

import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.io.EgressIdentifier;
import com.ververica.statefun.sdk.io.EgressSpec;
import com.ververica.statefun.sdk.kafka.KafkaEgressBuilder;

public class EgressSpecs {

  public static final EgressIdentifier<User> ID =
      new EgressIdentifier<>("ververica", "output-egress", User.class);

  public static final EgressSpec<User> kafkaEgress =
      KafkaEgressBuilder.forIdentifier(ID)
          .withKafkaAddress("localhost:9092")
          .withSerializer(UserSerializer.class)
          .build();
}

Please refer to the Kafka producer configuration documentation for the full list of available properties.

Kafka Egress and Fault Tolerance

With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees. You can choose three different modes of operating based through the KafkaEgressBuilder.

  • KafkaEgressBuilder#withNoProducerSemantics: Nothing is guaranteed. Produced records can be lost or duplicated.
  • KafkaEgressBuilder#withAtLeastOnceProducerSemantics: Stateful Functions will guarantee that nor records will be lost but they can be duplicated.
  • KafkaEgressBuilder#withExactlyOnceProducerSemantics: Stateful Functions uses Kafka transactions to provide exactly-once semantics.

Kafka Serializer

The Kafka egress needs to know how to turn Java objects into binary data. The KafkaEgressSerializer allows users to specify such a schema. The ProducerRecord<byte[], byte[]> serialize(T out) method gets called for each message, allowing users to set a key, value, and other metadata.

package com.ververica.statefun.docs.io.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.kafka.KafkaEgressSerializer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserSerializer implements KafkaEgressSerializer<User> {

  private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);

  private static final String TOPIC = "user-topic";

  private final ObjectMapper mapper = new ObjectMapper();

  @Override
  public ProducerRecord<byte[], byte[]> serialize(User user) {
    try {
      byte[] key = user.getUserId().getBytes();
      byte[] value = mapper.writeValueAsBytes(user);

      return new ProducerRecord<>(TOPIC, key, value);
    } catch (JsonProcessingException e) {
      LOG.info("Failed to serializer user", e);
      return null;
    }
  }
}