I/O Module

Stateful Functions’ I/O modules allow functions to receive and send messages to external systems. Based on the concept of Ingress (input) and Egress (output) points, and built on top of the Apache Flink® connector ecosystem, I/O modules enable functions to interact with the outside world through the style of message passing.

Ingress

An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions. An IngressIdentifier and an IngressSpec define it.

An ingress identifier, similar to a function type, uniquely identifies an ingress by specifying its input type, a namespace, and a name.

package com.ververica.statefun.docs.io.ingress;

import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.io.IngressIdentifier;

public final class Identifiers {

  public static final IngressIdentifier<User> INGRESS =
      new IngressIdentifier<>(User.class, "ververica", "user-ingress");
}

The spec defines the details of how to connect to the external system, which is specific to each individual I/O module. Each identifier-spec pair is bound to the system inside an stateful function module.

package com.ververica.statefun.docs.io.ingress;

import com.ververica.statefun.docs.io.MissingImplementationException;
import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.io.IngressIdentifier;
import com.ververica.statefun.sdk.io.IngressSpec;
import com.ververica.statefun.sdk.spi.StatefulFunctionModule;
import java.util.Map;

public class ModuleWithIngress implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    IngressSpec<User> spec = createIngress(Identifiers.INGRESS);
    binder.bindIngress(spec);
  }

  private IngressSpec<User> createIngress(IngressIdentifier<User> identifier) {
    throw new MissingImplementationException("Replace with your specific ingress");
  }
}

Router

A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions.

package com.ververica.statefun.docs.io.ingress;

import com.ververica.statefun.docs.FnUser;
import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.io.Router;

public class UserRouter implements Router<User> {

  @Override
  public void route(User message, Downstream<User> downstream) {
    downstream.forward(FnUser.TYPE, message.getUserId(), message);
  }
}

Routers are bound to the system via a stateful function module. Unlike other components, an ingress may have any number of routers.

package com.ververica.statefun.docs.io.ingress;

import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.io.IngressIdentifier;
import com.ververica.statefun.sdk.io.IngressSpec;
import com.ververica.statefun.sdk.io.Router;
import com.ververica.statefun.sdk.spi.StatefulFunctionModule;
import java.util.Map;

public class ModuleWithRouter implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    IngressSpec<User> spec = createIngressSpec(Identifiers.INGRESS);
    Router<User> router = new UserRouter();

    binder.bindIngress(spec);
    binder.bindIngressRouter(Identifiers.INGRESS, router);
  }

  private IngressSpec<User> createIngressSpec(IngressIdentifier<User> identifier) {
    throw new MissingImplementationException("Replace with your specific ingress");
  }
}

Egress

Egress is the opposite of ingress; it is a point that takes messages and writes them to external systems. Each egress is defined using two components, an EgressIdentifier and an EgressSpec.

An egress identifier uniquely identifies an egress based on a namespace, name, and producing type.

package com.ververica.statefun.docs.io.egress;

import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.io.EgressIdentifier;

public final class Identifiers {

  public static final EgressIdentifier<User> EGRESS =
      new EgressIdentifier<>("ververica", "egress", User.class);
}

An egress spec defines the details of how to connect to the external system, the details are specific to each individual I/O module. Each identifier-spec pair are bound to the system inside a stateful function module.

package com.ververica.statefun.docs.io.egress;

import com.ververica.statefun.docs.io.MissingImplementationException;
import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.io.EgressIdentifier;
import com.ververica.statefun.sdk.io.EgressSpec;
import com.ververica.statefun.sdk.spi.StatefulFunctionModule;
import java.util.Map;

public class ModuleWithEgress implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    EgressSpec<User> spec = createEgress(Identifiers.EGRESS);
    binder.bindEgress(spec);
  }

  public EgressSpec<User> createEgress(EgressIdentifier<User> identifier) {
    throw new MissingImplementationException("Replace with your specific egress");
  }
}

Stateful functions may then message an egress the same way they message another function, passing the egress identifier as function type.

package com.ververica.statefun.docs.io.egress;

import com.ververica.statefun.docs.models.User;
import com.ververica.statefun.sdk.Context;
import com.ververica.statefun.sdk.StatefulFunction;

/** A simple function that outputs messages to an egress. */
public class FnOutputting implements StatefulFunction {

  @Override
  public void invoke(Context context, Object input) {
    context.send(Identifiers.EGRESS, new User());
  }
}

I/O modules leverage Java’s Service Provider Interfaces (SPI) for discovery. This means that every JAR should contain a file com.ververica.statefun.sdk.spi.StatefulFunctionModule in the META_INF/services resource directory that lists all available modules that it provides.

com.ververica.statefun.docs.BasicFunctionModule