Apache Flink Series 7 — Create Sample Apache Flink Cluster on Local Machine — Part 2
In this post, I will create simple stream job and submit the job to the flink cluster. You can find the project in my github repo. This is the second …
In this post, I am going to explain “what is state backend”, “which options do we have for state backend” , “how to configure state backend for your flink job” and “I am going to implement a project to show each state backend options”.
You can find the example from my github repo.
Before starting, it is a good to remember what is a state in Flink:
At a high level, we can consider state as memory in operators in Flink that remembers information about past input and can be used to influence the processing of future input.
State backend is a pluggable component which determines how the state is stored, accessed and maintained. Because it is pluggable, two flink applications can use different state backend mechanism.
State backend is responsible for two things:
For local state management:
For state checkpointing:
In general, with State backend mechanism, we can determine the saving folder for job’s state, types of the state backend and more.
In Flink, we have 4 options for state backend:
Let’s talk about each of them one by one.
OutOfMemoryError
. Secondly, because the states on the heap are long-lived object, this can issue on the garbage collection. Lastly, because all states store in the JobManager’s heap, if JobManager fails, all states will be lost.OutOfMemoryError
, because the states are held on the heap.Note: RocksDB is an embeddable persistent key-value store for fast storage. More on this address
ListCheckpointed
interface.ValueState<T>
: keeps only one value and this value can be updated via update(T)
ListState<T>
: keeps a list of elements for specific keyReducingState<T>
: keeps single value that represents the aggregation of all values added to the state.Aggregating<IN, OUT>
: Same features with reducing state, only input type can be different.MapState<UK, UV>
: Keeps a list of mappings.ValuState, ListState
etc..Let’s create a flink example to see different state backend options more realistic way.
In this example, I going to use big csv file provided by https://data.ibb.gov.tr and I am going to use the specific data set called *INSTANT LOCATION AND SPEED INFORMATION OF IMM ISTAC VEHICLES* *(*The dataset contains the location and speed information of trucks, vans, road sweepers, and road washing vehicles of ISTAC Inc., a subsidiary of the Istanbul Metropolitan Municipality.*).*
This data set are divided by ten days for each month. I downloaded one part of the data set. However one part was too big to read on my local machine. Then I decided to the split data with pandas and then convert them to the txt file to read from Flink application.
Note: Flink may also be providing classes to transform csv file to datastream, but pandas way was more easy to do for me!!
Here is the python code for splitting the csv file (you may change split number) and converting to the txt:
import pandas as pandas
# read csv file and convert them to .txt
path = "path/To/CSV"
csv = pandas.read_csv(path)
range = csv[:500000] # take only first 500000 rows
file = open("path/To/Txt", "w+")
for each in range.itertuples():
str_repr = str(each[1]) + "," + str(each[2]) \
+ "," + str(each[3]) + "," + str(each[4]) + "," + str(each[5]) \
+ "," + str(each[6]) + "," + str(each[7]) + "," + str(each[8])\
+ "," + str(each[9]) + "," + str(each[10])
file.write(str_repr+"\n")
file.close()
In this example, our flink job will find the “fastest vehicle” for each type in a real-time way. When we are finding the fastest vehicle, we are going to use ValueState
(which is Managed KeyedState) and MemoryStateBackend, FsStateBackend and RocksDbStateBackend
respectively.
Basically our flink application:
First let’s summarize how we define Managed KeyedState for our Flink Job:
RuntimeContext
, therefore your function must extends RichFunction
StateDescriptor
to get a state value. For example if you decided to use ListState
, you would define ListStateDescriptor
.void open(Configuration config)
function..getAll(), .value()
etc..$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.10.0
VehicleInstantData
which contains all the csv’s fields as attributes (with format String).public class VehicleInstantData implements Serializable
{
private String _id;
private String day_hour;
private String geohash;
private String latitude;
private String longitude;
private String vehicle_type;
private String speed;
private String day_year;
private String day_mounth;
private String day_day;
public static VehicleInstantData createFromSplittedArray(String[] splittedArray)
{
VehicleInstantData newData = new VehicleInstantData(
splittedArray[0], splittedArray[1], splittedArray[2],
splittedArray[3], splittedArray[4], splittedArray[5],
splittedArray[6], splittedArray[7], splittedArray[8],
splittedArray[9]);
return newData;
}
private VehicleInstantData(String _id, String day_hour, String geohash,
String latitude, String longitude,
String vehicle_type, String speed,
String day_year, String day_mounth, String day_day) {
this._id = _id;
this.day_hour = day_hour;
this.geohash = geohash;
this.latitude = latitude;
this.longitude = longitude;
this.vehicle_type = vehicle_type;
this.speed = speed;
this.day_year = day_year;
this.day_mounth = day_mounth;
this.day_day = day_day;
}
// empty constructor
// getter and setter
}
public class MapToObjectTransformation implements MapFunction<String, VehicleInstantData>
{
@Override
public VehicleInstantData map(String value) throws Exception
{
String[] splittedByComma = value.split(",");
return VehicleInstantData.createFromSplittedArray(splittedByComma);
}
}
// ...
public class StreamCreator
{
public DataStream<String> getDataSourceStream(StreamExecutionEnvironment env, String filePath)
{
return env.readTextFile(filePath);
}
public DataStream<VehicleInstantData> mapDataSourceStreamToObject(DataStream<String> dataSourceStream)
{
return dataSourceStream.map(new MapToObjectTransformation());
}
}
public class StreamCreator
{
// ...
public KeyedStream<VehicleInstantData, Tuple> keyByVehicleType(DataStream<VehicleInstantData> vehicleStream)
{
return vehicleStream
.keyBy("vehicle_type");
}
// ...
}
// ...
public class StreamingJob {
public static void main(String[] args) throws Exception {
// ...
KeyedStream<VehicleInstantData, Tuple> keyByVehicleType = streamCreator.keyByVehicleType(vehicleInstantDataDataStream);
// ...
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
public class StreamCreator
{
// ...
public SingleOutputStreamOperator<VehicleInstantData> findFastestVehicleForEachType(KeyedStream<VehicleInstantData, Tuple> vehicleKeyedStreamByVehicleType)
{
return vehicleKeyedStreamByVehicleType.map(new FastestVehicleMapper());
}
}
// ...
public class FastestVehicleMapper extends RichMapFunction<VehicleInstantData, VehicleInstantData>
{
// ValueState only holds one object for each key
private transient ValueState<VehicleInstantData> fastestVehicleState;
@Override
public void open(Configuration parameters) throws Exception
{
ValueStateDescriptor<VehicleInstantData> valueStateDescriptor = new ValueStateDescriptor<VehicleInstantData>(
"fastest-vehicle",
TypeInformation.of(VehicleInstantData.class)
);
fastestVehicleState = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public VehicleInstantData map(VehicleInstantData newVehicleData) throws Exception
{
VehicleInstantData previousVehicleData = this.fastestVehicleState.value();
if (previousVehicleData == null) // means there is nothing in the state
{
this.fastestVehicleState.update(newVehicleData);
return newVehicleData;
}
else
{
return updateStateAndReturnValue(newVehicleData, previousVehicleData);
}
}
private VehicleInstantData updateStateAndReturnValue(VehicleInstantData newVehicleData, VehicleInstantData previousVehicleData) throws IOException
{
if (Integer.parseInt(previousVehicleData.getSpeed()) < Integer.parseInt(newVehicleData.getSpeed()))
{
// update the state with new one
this.fastestVehicleState.update(newVehicleData);
return newVehicleData;
}
else
{
return previousVehicleData;
}
}
}
public class StreamingJob {
public static void main(String[] args) throws Exception {
String txtFilePath = "/home/mehmetozanguven/Desktop/flink_examples/datas/ibb_datas/01.2019_1-10.txt";
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamCreator streamCreator = new StreamCreator();
DataStream<String> dataSource = streamCreator.getDataSourceStream(env, txtFilePath);
DataStream<VehicleInstantData> vehicleInstantDataDataStream = streamCreator.mapDataSourceStreamToObject(dataSource);
KeyedStream<VehicleInstantData, Tuple> keyByVehicleType = streamCreator.keyByVehicleType(vehicleInstantDataDataStream);
SingleOutputStreamOperator<VehicleInstantData> fastestVehicles = streamCreator.findFastestVehicleForEachType(keyByVehicleType);
fastestVehicles.print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
Now, our flink application is ready, take a jar your application via mvn clean install
Let’s configure state backend
Even if you are using MemoyStateBackend
for state backend, you should configure the savepoints and checkpoints directory in the flink-conf.yaml file. These directories will play in role when you want to save your all state in a current time or periodically.
I intent to write a post for savepoint & checkpoint mechanism after State backend with the same example as we are doing right now.
Here is the config for savepoints and checkpoints:
# ...
state.checkpoints.dir: file:///home/path/to/checkpoints_dir
state.savepoints.dir: file:///home/path/to/savepoints_dir
# ...
Because MemoryStateBackend
is a default option, we don’t need to setup anything. I have just deployed the Job.
Here is the log file after I deployed the jar & submit the application:
2020-05-03 18:29:25,293 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2020-05-03 18:29:25,294 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.checkpoints.dir, file:///home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/checkpoints_dir
2020-05-03 18:29:25,294 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/savepoints_dir
2020-05-03 18:29:25,294 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property:
2020-05-03 18:29:37,807 INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/checkpoints_dir', savepoints: 'file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/savepoints_dir', asynchronous: TRUE, maxStateSize: 5242880)
2020-05-03 18:29:38,168 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/checkpoints_dir', savepoints: 'file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/savepoints_dir', asynchronous: TRUE, maxStateSize: 5242880)
2020-05-03 18:29:38,168 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/checkpoints_dir', savepoints: 'file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/savepoints_dir', asynchronous: TRUE, maxStateSize: 5242880)
2020-05-03 18:29:38,178 INFO org.apache.flink.runtime.taskmanager.Task - Split Reader: Custom File Source -> MapTxtToObject (1/1) (7956c3e739a4fd7b67cd932c27822a60) switched from DEPLOYING to RUNNING.
2020-05-03 18:29:38,179 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/checkpoints_dir', savepoints: 'file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/savepoints_dir', asynchronous: TRUE, maxStateSize: 5242880)
flink-conf.yaml
file like this:# Flink will know that we are going to use FsStateBackend
state.backend: filesystem
state.checkpoints.dir: file:///home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/checkpoints_dir
state.savepoints.dir: file:///home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/savepoints_dir
flink-conf.yaml
file like this:# Flink will know that we are going to use RocksDb
state.backend: rocksdb
state.checkpoints.dir: file:///home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/checkpoints_dir
state.savepoints.dir: file:///home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/state_backend/memory_state_backend/savepoints_dir
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>{flink_versiyon}</version>
<scope>provided</scope>
</dependency>
In the next post, we are going to read log files for this example.
After that, I am going to explain savepoints & checkpoints for flink application.
Then, I am going to do detail post for rocksdb configuration with an example later on.
Hopefully,
Last but not least, wait for the next post …
In this post, I will create simple stream job and submit the job to the flink cluster. You can find the project in my github repo. This is the second …
In this post, we will look at the log files (both for TaskManager and JobManager) and try to understand what is going on Flink cluster. Actually this …