Stateful Functions

Stateful functions are the building blocks of applications; they are atomic units of isolation, distribution, and persistence. As objects, they encapsulate the state of a single entity (e.g., a specific user, device, or session) and encode its behavior. Stateful functions can interact with each other, and external systems, through message passing.

Defining A Stateful Function

A stateful function is any class that implements the StatefulFunction interface. The following is an example of a simple hello world function.

package com.ververica.statefun.docs;

import com.ververica.statefun.sdk.Context;
import com.ververica.statefun.sdk.StatefulFunction;

public class FnHelloWorld implements StatefulFunction {

  @Override
  public void invoke(Context context, Object input) {
    System.out.println("Hello " + input.toString());
  }
}

Functions process each incoming message through their invoke method. Input’s are untyped and passed through the system as a java.lang.Object so one function can potentially process multiple types of messages.

The Context provides metadata about the current message and function, and is how you can call other functions or external systems. Functions are invoked based on a function type and unique identifier.

Function Type’s and Identifiers

In a local environment, the address of an object is the same as a reference to it. But in a distributed system, objects may be spread across multiple machines and may or may not be active at any given moment.

In Stateful Functions, function types and identifiers are used to reference specific stateful functions in the system. A function type is similar to a class in an object-oriented language; it declares what sort of function the address references. The id is a primary key and scopes the function call to a specific instance of the function type.

Suppose a Stateful Functions application was tracking metadata about each user account on a website. The system would contain a user stateful function that accepts and responds to inputs about users and tracks relevant information. Stateful Functions will create one virtual instance of this stateful function for every user. Other functions can call the function for any particular user by the user function type and using the current user id as the instance identifier.

package com.ververica.statefun.docs;

import com.ververica.statefun.sdk.Context;
import com.ververica.statefun.sdk.FunctionType;
import com.ververica.statefun.sdk.StatefulFunction;

/** A simple function that says hello. */
public class FnUser implements StatefulFunction {

  public static final FunctionType TYPE = new FunctionType("ververica", "user");

  @Override
  public void invoke(Context context, Object message) {
    String userId = context.self().id();
    System.out.println("Hello " + userId);
  }
}
package com.ververica.statefun.docs;

import com.ververica.statefun.sdk.Context;
import com.ververica.statefun.sdk.FunctionType;
import com.ververica.statefun.sdk.StatefulFunction;

/** A simple stateful function that sends a message to the user with id "user1" */
public class FnCaller implements StatefulFunction {

  public static final FunctionType TYPE = new FunctionType("ververica", "caller");

  @Override
  public void invoke(Context context, Object input) {
    context.send(FnUser.TYPE, "user1", new MyUserMessage());
  }
}

Virtual Functions

Functions are virtual, which means the system can support an infinite number of active functions while only requiring a static number of physical objects on the JVM heap. Any function can call any other without ever triggering an allocation. The system will make it appear as if functions are always available in-memory.

Stateful Functions applications deploy on Apache Flink’s horizontally parallel runtime. If the user function, seen above, is run on a Flink cluster with a parallelism of 10, then only ten objects will ever be allocated. Even if the application creates a billion user functions for a billion different users, memory usage will be stable. Those billion virtual functions will be evenly partitioned and run by the ten underlying objects. New object creation only occurs the first time a function of that type, regardless of id, is needed.

Sending Delayed Messages

Functions are able to send messages on a delay so that they will arrive after some duration. Functions may even send themselves delayed messages that can serve as a callback. The delayed message is non-blocking so functions will continue to process records between the time a delayed message is sent and received.

package com.ververica.statefun.docs.delay;

import com.ververica.statefun.sdk.Context;
import com.ververica.statefun.sdk.StatefulFunction;
import java.time.Duration;

public class FnDelayedMessage implements StatefulFunction {

  @Override
  public void invoke(Context context, Object input) {
    if (input instanceof Message) {
      System.out.println("Hello");
      context.sendAfter(Duration.ofMinutes(1), context.self(), new DelayedMessage());
    }

    if (input instanceof DelayedMessage) {
      System.out.println("Welcome to the future!");
    }
  }
}

Completing Async Requests

When interacting with external systems, such as a database or API, one needs to take care that communication delay with the external system does not dominate the application’s total work. Stateful Functions allows registering a java CompletableFuture that will resolve to a value at some point in the future. Future’s are registered along with a metadata object that provides additional context about the caller.

When the future completes, either successfully or exceptionally, the caller function type and id will be invoked with a AsyncOperationResult. An asynchronous result can complete in one of three states:

Success

The asynchronous operation has succeeded, and the produced result can be obtained via AsyncOperationResult#value.

Failure

The asynchronous operation has failed, and the cause can be obtained via AsyncOperationResult#throwable.

Unknown

The stateful function was restarted, possibly on a different machine, before the CompletableFuture was completed, therefore it is unknown what is the status of the asynchronous operation.

package com.ververica.statefun.docs.async;

import com.ververica.statefun.sdk.AsyncOperationResult;
import com.ververica.statefun.sdk.Context;
import com.ververica.statefun.sdk.StatefulFunction;
import java.util.concurrent.CompletableFuture;

@SuppressWarnings("unchecked")
public class EnrichmentFunction implements StatefulFunction {

  private final QueryService client;

  public EnrichmentFunction(QueryService client) {
    this.client = client;
  }

  @Override
  public void invoke(Context context, Object input) {
    if (input instanceof User) {
      onUser(context, (User) input);
    } else if (input instanceof AsyncOperationResult) {
      onAsyncResult((AsyncOperationResult) input);
    }
  }

  private void onUser(Context context, User user) {
    CompletableFuture<UserEnrichment> future = client.getDataAsync(user.getUserId());
    context.registerAsyncOperation(user, future);
  }

  private void onAsyncResult(AsyncOperationResult<User, UserEnrichment> result) {
    if (result.successful()) {
      User metadata = result.metadata();
      UserEnrichment value = result.value();
      System.out.println(String.format("Successfully completed future: %s %s", metadata, value));
    } else if (result.failure()) {
      System.out.println(String.format("Something has gone terribly wrong %s", result.throwable()));
    } else {
      System.out.println("Not sure what happened, maybe retry");
    }
  }
}

Function Providers and Dependency Injection

Stateful functions are created across a distributed cluster of nodes. StatefulFunctionProvider is a factory class for creating a new instance of a stateful function the first time it is activated.

package com.ververica.statefun.docs;

import com.ververica.statefun.docs.dependency.ProductionDependency;
import com.ververica.statefun.docs.dependency.RuntimeDependency;
import com.ververica.statefun.sdk.FunctionType;
import com.ververica.statefun.sdk.StatefulFunction;
import com.ververica.statefun.sdk.StatefulFunctionProvider;

public class CustomProvider implements StatefulFunctionProvider {

  public StatefulFunction functionOfType(FunctionType type) {
    RuntimeDependency dependency = new ProductionDependency();
    return new FnWithDependency(dependency);
  }
}

Providers are called once per type on each parallel worker, not for each id. If a stateful function requires custom configurations, they can be defined inside a provider and passed to the functions’ constructor. This is also where shared physical resources, such as a database connection, can be created that are used by any number of virtual functions. Now, tests can quickly provide mock, or test dependencies, without the need for complex dependency injection frameworks.

package com.ververica.statefun.docs;

import com.ververica.statefun.docs.dependency.RuntimeDependency;
import com.ververica.statefun.docs.dependency.TestDependency;
import org.junit.Assert;
import org.junit.Test;

public class FunctionTest {

  @Test
  public void testFunctionWithCustomDependency() {
    RuntimeDependency dependency = new TestDependency();
    FnWithDependency function = new FnWithDependency(dependency);

    Assert.assertEquals("It appears math is broken", 1 + 1, 2);
  }
}

Stateful Function Modules

Modules define a Stateful Functions application’s top-level entry point and are where everything gets tied together. They offer a single configuration method where stateful functions are bound to the system. It also provides runtime configurations through the globalConfguration which is the union of all configurations in the applications flink-conf.yaml and any command line arguments passed in the form --key value.

package com.ververica.statefun.docs;

import com.ververica.statefun.sdk.spi.StatefulFunctionModule;
import java.util.Map;

public class BasicFunctionModule implements StatefulFunctionModule {

  public void configure(Map<String, String> globalConfiguration, Binder binder) {

    // Declare the user function and bind it to its type
    binder.bindFunctionProvider(FnWithDependency.TYPE, new CustomProvider());

    // Stateful functions that do not require any configuration
    // can declare their provider using java 8 lambda syntax
    binder.bindFunctionProvider(FnUser.TYPE, unused -> new FnUser());
  }
}

Modules leverage Java’s Service Provider Interfaces (SPI) for discovery. This means that every JAR should contain a file com.ververica.statefun.sdk.spi.StatefulFunctionModule in the META_INF/services resource directory that lists all available modules that it provides.

com.ververica.statefun.docs.BasicFunctionModule