User Defined I/O Modules

An I/O module provides access to data which is stored in an external system. If a pre-built I/O module for a particular system does not exist, you can define your own.

Stateful Functions I/O modules are built on top of Apache Flink® connectors, for details of how to build a custom connector see the official Apache Flink® documentation.

A Two Package Approach

Stateful Functions applications are typically modular, containing many modules multiplexed into a single Flink application. For that reason, I/O modules provide two components, a specification, and an implementation. That way, multiple modules can depend on the same I/O type while the implementation only needs to be provided once on the classpath.

Specifications

Specifications are the user-facing component of an I/O module and only depend on stateful-functions-sdk. They include an ingress or egress type and spec.

Ingress and egress types are similar to function types, they provide an namespace and type associated with a class of I/O components. Specs are what users configure to set properties for a particular instance of an I/O connection. The only required parameter is the ingress or egress identifier, all other properties will by system specific.

package com.ververica.statefun.docs.io.custom;

import com.ververica.statefun.sdk.IngressType;
import com.ververica.statefun.sdk.io.IngressIdentifier;
import com.ververica.statefun.sdk.io.IngressSpec;

public class MyIngressSpec<T> implements IngressSpec<T> {

  public static final IngressType TYPE = new IngressType("ververica", "my-ingress");

  private final IngressIdentifier<T> identifier;

  public MyIngressSpec(IngressIdentifier<T> identifier) {
    this.identifier = identifier;
  }

  @Override
  public IngressType type() {
    return TYPE;
  }

  @Override
  public IngressIdentifier<T> id() {
    return identifier;
  }
}
package com.ververica.statefun.docs.io.custom;

import com.ververica.statefun.sdk.EgressType;
import com.ververica.statefun.sdk.io.EgressIdentifier;
import com.ververica.statefun.sdk.io.EgressSpec;

public class MyEgressSpec<T> implements EgressSpec<T> {

  public static final EgressType TYPE = new EgressType("ververica", "my-egress");

  private final EgressIdentifier<T> identifier;

  public MyEgressSpec(EgressIdentifier<T> identifier) {
    this.identifier = identifier;
  }

  @Override
  public EgressType type() {
    return TYPE;
  }

  @Override
  public EgressIdentifier<T> id() {
    return identifier;
  }
}

Implementations

The implementation maps specs to Flink sources and sinks. They depend on stateful-functions-flink-io, your specifications module, and the underlying Flink connector.

Source and Sink Providers

Providers take in the ingress and egress specs and return configured Flink sources and sinks.

package com.ververica.statefun.docs.io.custom.flink;

import com.ververica.statefun.docs.io.custom.MyIngressSpec;
import com.ververica.statefun.docs.io.custom.flink.source.MySourceFunction;
import com.ververica.statefun.flink.io.spi.SourceProvider;
import com.ververica.statefun.sdk.io.IngressSpec;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class MySourceProvider implements SourceProvider {

  @Override
  public <T> SourceFunction<T> forSpec(IngressSpec<T> ingressSpec) {
    MyIngressSpec<T> spec = asMyIngressSpec(ingressSpec);
    MySourceFunction source = new MySourceFunction();

    // configure the source based on the provided spec
    return source;
  }

  private static <T> MyIngressSpec<T> asMyIngressSpec(IngressSpec<T> ingressSpec) {
    if (ingressSpec == null) {
      throw new NullPointerException("Unable to translate a NULL spec");
    }

    if (ingressSpec instanceof MyIngressSpec) {
      return (MyIngressSpec<T>) ingressSpec;
    }

    throw new IllegalArgumentException(String.format("Wrong type %s", ingressSpec.type()));
  }
}
package com.ververica.statefun.docs.io.custom.flink;

import com.ververica.statefun.docs.io.custom.MyEgressSpec;
import com.ververica.statefun.docs.io.custom.flink.sink.MySinkFunction;
import com.ververica.statefun.flink.io.spi.SinkProvider;
import com.ververica.statefun.sdk.io.EgressSpec;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class MySinkProvider implements SinkProvider {

  @Override
  public <T> SinkFunction<T> forSpec(EgressSpec<T> egressSpec) {
    MyEgressSpec<T> spec = asMyEgressSpec(egressSpec);
    MySinkFunction<T> sink = new MySinkFunction<>();

    // configure the sink based on the provided spec
    return sink;
  }

  private static <T> MyEgressSpec<T> asMyEgressSpec(EgressSpec<T> egressSpec) {
    if (egressSpec == null) {
      throw new NullPointerException("Unable to translate a NULL spec");
    }

    if (egressSpec instanceof MyEgressSpec) {
      return (MyEgressSpec<T>) egressSpec;
    }

    throw new IllegalArgumentException(String.format("Wrong type %s", egressSpec.type()));
  }
}