User Defined I/O Modules¶
An I/O module provides access to data which is stored in an external system. If a pre-built I/O module for a particular system does not exist, you can define your own.
Stateful Functions I/O modules are built on top of Apache Flink® connectors, for details of how to build a custom connector see the official Apache Flink® documentation.
A Two Package Approach¶
Stateful Functions applications are typically modular, containing many modules multiplexed into a single Flink application. For that reason, I/O modules provide two components, a specification, and an implementation. That way, multiple modules can depend on the same I/O type while the implementation only needs to be provided once on the classpath.
Specifications¶
Specifications are the user-facing component of an I/O module and only depend on stateful-functions-sdk
.
They include an ingress or egress type and spec.
Ingress and egress types are similar to function types, they provide an namespace and type associated with a class of I/O components. Specs are what users configure to set properties for a particular instance of an I/O connection. The only required parameter is the ingress or egress identifier, all other properties will by system specific.
package com.ververica.statefun.docs.io.custom;
import com.ververica.statefun.sdk.IngressType;
import com.ververica.statefun.sdk.io.IngressIdentifier;
import com.ververica.statefun.sdk.io.IngressSpec;
public class MyIngressSpec<T> implements IngressSpec<T> {
public static final IngressType TYPE = new IngressType("ververica", "my-ingress");
private final IngressIdentifier<T> identifier;
public MyIngressSpec(IngressIdentifier<T> identifier) {
this.identifier = identifier;
}
@Override
public IngressType type() {
return TYPE;
}
@Override
public IngressIdentifier<T> id() {
return identifier;
}
}
package com.ververica.statefun.docs.io.custom;
import com.ververica.statefun.sdk.EgressType;
import com.ververica.statefun.sdk.io.EgressIdentifier;
import com.ververica.statefun.sdk.io.EgressSpec;
public class MyEgressSpec<T> implements EgressSpec<T> {
public static final EgressType TYPE = new EgressType("ververica", "my-egress");
private final EgressIdentifier<T> identifier;
public MyEgressSpec(EgressIdentifier<T> identifier) {
this.identifier = identifier;
}
@Override
public EgressType type() {
return TYPE;
}
@Override
public EgressIdentifier<T> id() {
return identifier;
}
}
Implementations¶
The implementation maps specs to Flink sources and sinks.
They depend on stateful-functions-flink-io
, your specifications module, and the underlying Flink connector.
Source and Sink Providers¶
Providers take in the ingress and egress specs and return configured Flink sources and sinks.
package com.ververica.statefun.docs.io.custom.flink;
import com.ververica.statefun.docs.io.custom.MyIngressSpec;
import com.ververica.statefun.docs.io.custom.flink.source.MySourceFunction;
import com.ververica.statefun.flink.io.spi.SourceProvider;
import com.ververica.statefun.sdk.io.IngressSpec;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySourceProvider implements SourceProvider {
@Override
public <T> SourceFunction<T> forSpec(IngressSpec<T> ingressSpec) {
MyIngressSpec<T> spec = asMyIngressSpec(ingressSpec);
MySourceFunction source = new MySourceFunction();
// configure the source based on the provided spec
return source;
}
private static <T> MyIngressSpec<T> asMyIngressSpec(IngressSpec<T> ingressSpec) {
if (ingressSpec == null) {
throw new NullPointerException("Unable to translate a NULL spec");
}
if (ingressSpec instanceof MyIngressSpec) {
return (MyIngressSpec<T>) ingressSpec;
}
throw new IllegalArgumentException(String.format("Wrong type %s", ingressSpec.type()));
}
}
package com.ververica.statefun.docs.io.custom.flink;
import com.ververica.statefun.docs.io.custom.MyEgressSpec;
import com.ververica.statefun.docs.io.custom.flink.sink.MySinkFunction;
import com.ververica.statefun.flink.io.spi.SinkProvider;
import com.ververica.statefun.sdk.io.EgressSpec;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class MySinkProvider implements SinkProvider {
@Override
public <T> SinkFunction<T> forSpec(EgressSpec<T> egressSpec) {
MyEgressSpec<T> spec = asMyEgressSpec(egressSpec);
MySinkFunction<T> sink = new MySinkFunction<>();
// configure the sink based on the provided spec
return sink;
}
private static <T> MyEgressSpec<T> asMyEgressSpec(EgressSpec<T> egressSpec) {
if (egressSpec == null) {
throw new NullPointerException("Unable to translate a NULL spec");
}
if (egressSpec instanceof MyEgressSpec) {
return (MyEgressSpec<T>) egressSpec;
}
throw new IllegalArgumentException(String.format("Wrong type %s", egressSpec.type()));
}
}
Flink I/O Module¶
Flink I/O modules are Stateful Functions’ top level entry point for accessing Flink connectors.
They define the relationship between ingress and egress types and source and sink providers.
It also provides runtime configurations through the globalConfguration
which is the union of all configurations in the applications flink-conf.yaml
and any command line arguments passed in the form --key value
.
package com.ververica.statefun.docs.io.custom.flink;
import com.ververica.statefun.docs.io.custom.MyEgressSpec;
import com.ververica.statefun.docs.io.custom.MyIngressSpec;
import com.ververica.statefun.flink.io.spi.FlinkIoModule;
import java.util.Map;
public final class MyFlinkIoModule implements FlinkIoModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
binder.bindSourceProvider(MyIngressSpec.TYPE, new MySourceProvider());
binder.bindSinkProvider(MyEgressSpec.TYPE, new MySinkProvider());
}
}
I/O modules leverage Java’s Service Provider Interfaces (SPI) for discovery.
This means that every JAR should contain a file com.ververica.statefun.flink.io.spi.FlinkIoModule
in the META_INF/services
resource directory that lists all available modules that it provides.
com.ververica.statefun.docs.impl.io.MyFlinkIoModule