Stateless processors
Stateless transformations do not require state for processing and they do not require a state store associated with the stream processor. Kafka 0.11.0 and later allows you to materialize the result from a stateless IKTable transformation. This allows the result to be queried through interactive queries. To materialize a IKTable, each of the below stateless operations can be augmented with an optional queryableStoreName argument.
Branch
Branch (or split) a IKStream based on the supplied predicates into one or more IKStream instances. (details)
Predicates are evaluated in order. A record is placed to one and only one output stream on the first match: if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches, the the record is dropped.
Branching is useful, for example, to route records to different downstream topics.
IKStream -> IKStream[]
IKStream<string, string> stream = ....;
var branches = stream.Branch(
(k,v) => k.StartsWith("A"),
(k,v) => k.StartsWith("B"),
(k,v) => k.StartsWith("C"),
(k,v) => true); // DLQ pattern
// branches[0] contains all records whose keys start with "A"
// branches[1] contains all records whose keys start with "B"
// branches[2] contains all records whose keys start with "C"
// branches[3] contains other records
Filter
Evaluates a boolean function for each element and retains those for which the function returns true.
IKStream -> IKStream
IKTable -> IKTable
IKStream<string, string> stream = ....;
IKTable<string, string> table = ...;
// A filter that selects only value which contains 'test' string constant
stream.Filter((k, v) => v.Contains("test"))
table.Filter((k, v) => v.Contains("test"))
InverseFilter
Evaluates a boolean function for each element and drops those for which the function returns true.
IKStream -> IKStream
IKTable -> IKTable
IKStream<string, string> stream = ....;
IKTable<string, string> table = ...;
// A inverse filter that selects value which contains not 'test' string constant
stream.FilterNot((k, v) => v.Contains("test"))
table.FilterNot((k, v) => v.Contains("test"))
FlatMap
Takes one record and produces zero, one, or more records. You can modify the record keys and values, including their types.
IKStream → IKStream
IKStream<string, string> stream = ....;
// Here, we generate two output records for each input record.
// We also change the key and value types.
// Example: ("KEY1", "Hello") -> ("HELLO", 100), ("HELLO", 900)
stream
.FlatMap((k, v) =>
{
List<KeyValuePair<string, long>> results = new List<KeyValuePair<string, long>>();
results.Add(KeyValuePair.Create(v.ToUpper(), 100L));
results.Add(KeyValuePair.Create(v.ToUpper(), 900L));
return results;
})
FlatMapValues
Takes one record and produces zero, one, or more records, while retaining the key of the original record. You can modify the record values and the value type.
flatMapValues is preferable to flatMap because it will not cause data re-partitioning. However, you cannot modify the key or key type like flatMap does.
IKStream → IKStream
IKStream<string, string> stream = ....;
// Split a word into characters.
stream.FlatMapValues((k,v) => v.ToCharArray())
Foreach
Terminal operation. Performs a stateless action on each record.
You would use foreach to cause side effects based on the input data (similar to peek) and then stop further processing of the input data (unlike peek, which is not a terminal operation).
Note on processing guarantees: Any side effects of an action (such as writing to external systems) are not trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.
IKStream → void
IKStream<string, string> stream = ....;
// Print the contents of the IKStream to the local console
stream.Foreach((k,v) => Console.WriteLine($"{k} {v}"))
GroupByKey
Groups the records by the existing key.
Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations.
When to set explicit SerDes: Variants of GroupByKey exist to override the configured default SerDes of your application, which you must do if the key and/or value types of the resulting IKGroupedStream do not match the configured default SerDes.
IKStream → IKGroupedStream
GroupBy
Groups the records by a new key, which may be of a different key type. When grouping a table, you may also specify a new value and value type. groupBy is a shorthand for SelectKey(…).GroupByKey().
Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations.
When to set explicit SerDes: Variants of GroupBy exist to override the configured default SerDes of your application, which you must do if the key and/or value types of the resulting IKGroupedStream or IKGroupedTable do not match the configured default SerDes.
IKStream → IKGroupedStream
IKTable → IKGroupedTable
Map
Takes one record and produces one record. You can modify the record key and value, including their types.
IKStream → IKStream
IKStream<string, string> stream = ....;
// We create a new record keyvalue, with the value to key and key to value
stream.Map((k,v) => KeyValuePair.Create(v.ToUpper(), k.ToUpper()))
MapValues
Takes one record and produces one record, while retaining the key of the original record. You can modify the record value and the value type.
MapValues is preferable to map because it will not cause data re-partitioning. However, it does not allow you to modify the key or key type like map does.
IKStream → IKStream
IKTable → IKTable
IKStream<string, string> stream = ....;
IKTable<string, string> table = ...;
// New value type => Int32 which is the length of string value
stream.MapValues((k,v) => v.Length)
table.MapValues((k,v) => v.Length)
Peek
Performs a stateless action on each record, and returns an unchanged stream.
You would use peek to cause side effects based on the input data (similar to foreach) and continue processing the input data (unlike foreach, which is a terminal operation). peek returns the input stream as-is; if you need to modify the input stream, use map or mapValues instead.
Peek is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting.
Note on processing guarantees: Any side effects of an action (such as writing to external systems) are not trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.
IKStream → IKStream
IKStream<string, string> stream = ....;
stream.Peek((k,v) => Console.WriteLine($"{k} {v}"))
Print
Terminal operation. Prints the records to Sys Out.
Calling Print() is the same as calling Foreach((key, value) => Console.WriteLine($”{k} {v}”))
Print is mainly for debugging/testing purposes, and it will try to flush on each record print. Hence it should not be used for production usage if performance requirements are concerned.
IKStream → void
IKStream<string, string> stream = ....
// New value type => Int32 which is the lenght of string value
stream.Print(Printed<string, string>.ToOut())
SelectKey
Assigns a new key – possibly of a new key type – to each record.
Calling SelectKey(…) is the same as calling Map((key, value) => …)
Marks the stream for data re-partitioning: Applying a grouping or a join after selectKey will result in re-partitioning of the records.
IKStream → IKStream
IKStream<string, string> stream = ....;
// Derive a new record key from the record's value.
stream.SelectKey((k,v) => v.Length)
Table to Steam
Get the changelog stream of this table.
IKTable → IKStream
IKTable<string, string> table = ....;
// Also, a variant of `ToStream` exists that allows you
// to select a new key for the resulting stream.
IKStream<string, string> = table.ToStream();
Repartition
Manually trigger repartitioning of the stream with desired number of partitions.
Generated topic is treated as internal topic, as a result data will be purged automatically as any other internal repartition topic. In addition, you can specify the desired number of partitions, which allows to easily scale in/out downstream sub-topologies.
IKStream<string, string> stream = ....;
IKStream<string, string> repartitionedStream = stream.Repartition(Repartitioned<string, string>.NumberOfPartitions(10));
MapAsync
Takes one record and produces one record. You can modify the record key and value, including their types.This operation is asynchronous and will create a request/response pattern. This asynchronous processing will be release by a dedicated external thread and implement a retry behavior.
Use cases : Enrichment data from HTTP Api, Database SQL or Nosql, etc …
IKStream → IKStream
IKStream<string, string> stream = ....;
// We create a new record keyvalue, with the upper value to key and value
stream.MapAsync(
async (record, context) =>
await Task.FromResult(new KeyValuePair<string, string>(record.Value.ToUpper(), record.Value)),
RetryPolicy.NewBuilder().NumberOfRetry(10).Build());
MapValuesAsync
Takes one record and produces one record, while retaining the key of the original record. You can modify the record value and the value type.
This operation is asynchronous and will create a request/response pattern. This asynchronous processing will be release by a dedicated external thread and implement a retry behavior.
Use cases : Enrichment data from HTTP Api, Database SQL or Nosql, etc …
IKStream → IKStream
IKStream<string, string> stream = ....;
// New value type => Int32 which is the length of string value
stream.MapValuesAsync(
async (record, context) =>
await Task.FromResult(record.Value.Length),
RetryPolicy.NewBuilder().NumberOfRetry(10).Build());
FlatMapAsync
Takes one record and produces zero, one, or more records. You can modify the record keys and values, including their types.
This operation is asynchronous and will create a request/response pattern. This asynchronous processing will be release by a dedicated external thread and implement a retry behavior.
Use cases : Enrichment data from HTTP Api, Database SQL or Nosql, etc …
IKStream → IKStream
IKStream<string, string> stream = ....;
// Here, we generate two output records for each input record.
// We also change the key and value types.
// Example: ("KEY1", "co") -> ("KEY1", c), ("KEY1", o)
stream
.FlatMapAsync(
async (record, context) =>
await Task.FromResult(record.Value.ToCharArray().Select(c => new KeyValuePair<string,char>(record.Key, c))),
RetryPolicy.NewBuilder().NumberOfRetry(10).Build());
FlatMapValuesAsync
Takes one record and produces zero, one, or more records, while retaining the key of the original record. You can modify the record values and the value type.
This operation is asynchronous and will create a request/response pattern. This asynchronous processing will be release by a dedicated external thread and implement a retry behavior.
Use cases : Enrichment data from HTTP Api, Database SQL or Nosql, etc …
IKStream → IKStream
IKStream<string, string> stream = ....;
// Split a word into characters.
stream..FlatMapValuesAsync<char>(
async (record, context) =>
await Task.FromResult(record.Value.ToCharArray()),
RetryPolicy.NewBuilder().NumberOfRetry(10).Build());
ForeachAsync
Perform an asynchronous action on each record of a stream. Note that this is a terminal operation that returns void. This operation is asynchronous and will create a request/response pattern. This asynchronous processing will be release by a dedicated external thread and implement a retry behavior.
Use cases : Push data asynchronously into a sink system like Database, HTTP Api, JMS Broker, etc ..
IKStream → void
IKStream<string, string> stream = ....;
// try to insert new items into a mongoDb collection
stream..ForeachAsync(
async (record, _) =>
{
await database
.GetCollection<Person>("adress")
.InsertOneAsync(new Person()
{
name = record.Key,
address = new Address()
{
city = record.Value
}
});
},
RetryPolicy
.NewBuilder()
.NumberOfRetry(10)
.RetryBackOffMs(100)
.RetriableException<Exception>()
.RetryBehavior(EndRetryBehavior.SKIP)
.Build());
Process
Terminal operation. Applies a Processor to each record. Process(...)
allows you to leverage the Processor API from the DSL.
Be carefull, if you want interact with an external system. Please use ForeachAsync
instead.
IKStream → void
var builder = new StreamBuilder();
builder.Stream<string, string>("topic")
.Process(ProcessorBuilder
.New<string, string>()
.Processor((record) =>
{
// what you want ...
})
.Build());
Transform
Applies a Transformer to each record. Transform(..)
allows you to leverage the Processor API from the DSL.
Each input record is transformed into zero or one record (similar to the stateless Map
). The Transformer must return null for zero output. You can modify the record’s key and value, including their types.
Marks the stream for data re-partitioning: Applying a grouping or a join after transform will result in re-partitioning of the records. If possible use TransformValues(...)
instead, which will not cause data re-partitioning.
Be carefull, if you want interact with an external system. Please use MapAsync(...), MapValuesAsync(...), FlatMapAsync(...) or FlatMapValuesAsync(...)
instead.
IKStream → IKStream
var builder = new StreamBuilder();
builder.Stream<string, string>("topic")
.Transform(TransformerBuilder
.New<string, string, string, string>()
.Transformer((record) =>
Record<string, string>.Create(record.Key.ToUpper(), record.Value.ToUpper()))
.Build())
.To("topic-output");
TransformValues
Applies a Transformer to each record, while retaining the key of the original record (even if you change the key into the output Record
). TransformValues(..)
allows you to leverage the Processor API from the DSL.
Each input record is transformed into zero or one output record (similar to the stateless MapValues
). The Transformer must return null for zero output. You can modify the record’s value, including his type.
TransformValues(...)
is preferable to Transform(...)
because it will not cause data re-partitioning.
IKStream → IKStream
var builder = new StreamBuilder();
builder.Stream<string, string>("topic")
.TransformValues(TransformerBuilder
.New<string, string, string, string>()
.Transformer((record) => Record<string, string>.Create(record.Value.ToUpper()))
.Build())
.To("topic-output");