Key-Value - Input
In this tutorial we use key-values. Key-values are entries that take record a key and the corresponding value. This example will focus on taking inputs from a key-value topic to a non-key-value topic.
Prerequisites
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions at here.
Dataflow
Overview
In this example, we will write an example that takes inputs from a key-value topic then transforms it via a filter-map
to select only students in a certain class.
Define the types
For this example, we will not be using primitive types. We have the following objects.
types:
student-info:
type: object
properties:
age:
type: u32
teacher:
type: string
grade:
type: f32
student-in-class:
type: object
properties:
name:
type: string
grade:
type: f32
We have two object types here. A student-info
that stores the value of the source. And student-in-class
that captures the sink's type.
Topic List
The following is our list of topics.
topics:
student-list:
schema:
key:
type: string
value:
type: student-info
profx-students:
schema:
value:
type: student-in-class
Source
The student-list
is the source topic. It maps a primitive string
to a student-info
. For our example, the primitive string
will be the name of the student.
Sink
The profx-students
is the sink topic. It contains the type student-in-class
defined above.
Transform
We will apply a transform to take the tuple (String,StudentInfo)
entries into StudentInClass
. We will apply a filter-map
transform to filter out students that do not have profX
in their teacher entry.
transforms:
- operator: filter-map
run: |
fn make_user(name: Option<String>, info: StudentInfo) -> Result<Option<StudentInClass>>{
if info.teacher != "profX" {
return Ok(None);
}
let ret = StudentInClass{
name: name.unwrap_or("".to_string()),
grade: info.grade
};
Ok(Some(ret))
}
In this transfrom function, the input parameters contain a variable for the key, name
, and another for the value, info
.
Running the Example
Full Code
Copy and paste following config and save it as dataflow.yaml
.
apiVersion: 0.5.0
meta:
name: key-value-type
version: 0.1.0
namespace: example
config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End
types:
student-info:
type: object
properties:
age:
type: u32
teacher:
type: string
grade:
type: f32
student-in-class:
type: object
properties:
name:
type: string
grade:
type: f32
topics:
student-list:
schema:
key:
type: string
value:
type: student-info
profx-students:
schema:
value:
type: student-in-class
services:
filter-profx:
sources:
- type: topic
id: student-list
transforms:
- operator: filter-map
run: |
fn student_filter(name: Option<String>, info: StudentInfo) -> Result<Option<StudentInClass>>{
if info.teacher != "profX" {
return Ok(None);
}
let ret = StudentInClass{
name: name.unwrap_or("".to_string()),
grade: info.grade
};
Ok(Some(ret))
}
sinks:
- type: topic
id: profx-students
Running SDF
To run example:
$ sdf run --ephemeral
Produce data
We will produce some data by first writing it into a file name student.txt
.
Jerry>{"age":13,"teacher":"profX","grade":90.1}
Tom>{"age":14,"teacher":"profY","grade":99.2}
Jane>{"age":-1,"teacher":"profZ","grade":99.11}
Terry>{"age":13,"teacher":"profX","grade":50}
We can produce data via
$ fluvio produce student-list --key-separator ">" -f student.txt
We should see the following output if we consume with -k
$ fluvio consume student-list -Bdk
[Jerry] {"age":13,"teacher":"profX","grade":90.1}
[Tom] {"age":14,"teacher":"profY","grade":99.2}
[Jane] {"age":-1,"teacher":"profZ","grade":99.11}
[Terry] {"age":13,"teacher":"profX","grade":50}
Consume data
To consume the data.
$ fluvio consume profx-students -Bd
{"grade":90.1,"name":"Jerry"}
{"grade":50.0,"name":"Terry"}
Only students with teacher names profX
is sent to the sink.
Cleanup
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
$ sdf clean --force
Conclusion
This how-to focused on using key-values as inputs. The following pages contains another example of key-value
as inputs.