Apache Flink Series 3 — Architecture of Flink
In this post, I am going to explain “Components of Flink”, “Task Execution”, “Task Chaining”, “Data Transfer”, “Credit-Based Flow Control”, “State …
In this post, I am going to explain DataStream API in Flink.
You may see the all my notes about Apache Flink with this link
When we look at the Flink as a software, Flink is built as layered system. And one of the layer is DataStream API which places top of Runtime Layer.
Let’s dive into DataStream API with transformations in the Flink.
DataStream<Integer> dataStream = //... your data source kafka topic, file etc..
//MapFunction<I,O> accepts input (which is Integer in this example),
// and produces new datastream with the desired output(which is Integer also in this example)
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value; // our datasource has value of Integers, we double these integers
}
});
DataStream.filter()
and produces a new DataStream of the same typeDataStream<LogObject> yourLogs =... // kafka topic, file etc..
// return new stream which has no key in the log object
DataStream<LogObject>streamWithoutHavingKey = yourLogs.filter(new FilterFunction<LogObject>() {
@Override
public boolean filter(LogObject value) throws Exception {
return !value.isKey();
}
});
Collector<T>
which collects the records and forwards it.DataStream<LogObject> yourLogs =... // kafka topic, file etc..
yourLogs.flatMap(new FlatMapFunction<LogObject, String>() {
@Override
public void flatMap(LogObject value, Collector<String> out)
throws Exception {
for(String word: value.getUserName().split(" ")){
out.collect(word);
}
}
});
SELECT tableName.color, count(token) AS countOfValidToken
FROM tableName
WHERE tableName.token IS NOT NULL
GROUP BY tableName.color;
DataStream<LogObject> yourLogs =... // kafka topic, file etc..
// Note: assumed that LogObject has attibute called color, this one way of the defining key in the Flink, we will touch this later
DataStream<LogObject, String> eachColorStream = yourLogs.keyBy(color)
keyedStream.reduce(new ReduceFunction<Integer>(){
@override
public Integer reduce(Integer value1, Integer value2){
return value1+value2;
}
}
dataStream.shuffle()
dataStream.rebelance()
dataStream.rescale()
dataStream.broadcast()
DataStream<Tuple3<Integer,String,Long>> input =
// defined DataStream as Tuple which has 3 objects
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0) // keyed stream with integer input of the tuple
public class LogObject{
private String color;
}
DataStream<LogObject> input = // ...
KeyedStream<LogObject, String> keyed = input.keyBy("color")
KeyedStream<LogObject, String> keyed = input.keyBy(new KeySelector<LogObject, String>(){
public String getKey(LogObject logObject){
return logObject.token;
}
});
As already know, Flink applications are executed in parallel in a distributed environment.
Now we can control this parallelism, when we are writing DataStream program. And also we can control parallelism of the execution environment or per individual operator.
And don’t forget that by default, the parallelism of all operators is set to the parallelism of the application’s execution environment
Let’s look at this example:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
int defaultParallelism = env.getParallelism();
// the source will runs with the default parallelism
DataStream<LogObject> source = env.addSource(YourApahceFlinkKafkaConsumer);
// the map parallelism is set to double the defaul parallelism
DataStream<LogObject> mappedLog = source.map(...).setParallelism(defaultParallelism*2);
// the filterLog is fixed to 2
DataStream<LogObject> filterLog = source.filter(...).setParallelism(2);
If we submit this application with parallelism 16, then:
Last but not least, wait for the next post …
In this post, I am going to explain “Components of Flink”, “Task Execution”, “Task Chaining”, “Data Transfer”, “Credit-Based Flow Control”, “State …
In this post, I am going to explain some terms about stream processing and also terms used in Apache Flink You may see the all my notes about Apache …