Apache Flink Series 9 - How Flink & Standalone Cluster Setup Work?
In this post, I am going to explain, how Flink starts itself, and what happens when you submit your job to the Standalone Cluster setup Standalone …
In this post, I am going to read the log files from the application that I created in previous post. Here is the github link and also previous post link
Before reading, I updated the taskmanager.numberOfTaskSlots
config. Because one slot offered by TaskManager was not the matching with the real scenarios.
# ...
taskmanager.numberOfTaskSlots: 8
# ...
To remember how standalone cluster works you may refer to Apache Flink Series 9 - How Flink & Standalone Cluster Setup Work?
Let’s start to read log files.
ClusterEntryPoint
which is the base class for the Flink clusterClusterEntryPoint
class which is StandaloneSessionClusterEntrypoint
2020-05-03 22:48:42,350 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting StandaloneSessionClusterEntrypoint (Version: 1.10.0, Rev:aa4eb8f, Date:07.02.2020 @ 19:18:19 CET)
StandaloneSessionClusterEntrypoint
:public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {
public StandaloneSessionClusterEntrypoint(Configuration configuration) {
super(configuration);
}
public static void main(String[] args) {
// print information about the environment
EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
// This signal handler / signal logger is based on Apache Hadoop's org.apache.hadoop.util.SignalLogger.
SignalHandler.register(LOG);
// install the safe guard shutdown hook. JVM shotdown hooks are a special construct that allows developers to put some code to be executed when the JVM is shutting down.
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// EntrypointClusterConfiguration is class that parsers the command line argument
EntrypointClusterConfiguration entrypointClusterConfiguration = null;
final CommandLineParser < EntrypointClusterConfiguration > commandLineParser = new CommandLineParser < > (new EntrypointClusterConfigurationParserFactory());
try {
entrypointClusterConfiguration = commandLineParser.parse(args);
} catch (FlinkParseException e) {
LOG.error("Could not parse command line arguments {}.", args, e);
commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
System.exit(1);
}
// loading configuration from flink-conf.yaml file
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
// create the StandaloneSessionClusterEntrypoint instance
StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
// run the cluster with new cluster entry point
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
}
2020-05-03 22:48:42,351 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user: mehmetozanguven
2020-05-03 22:48:42,351 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current Hadoop/Kerberos user: <no hadoop dependency found>
2020-05-03 22:48:42,351 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit Server VM - AdoptOpenJDK - 11/11.0.6+10
2020-05-03 22:48:42,351 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size: 1024 MiBytes
2020-05-03 22:48:42,351 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME: /home/mehmetozanguven/JavaJDKS/jdk-11.0.6+10
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - No Hadoop Dependency available
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlog.file=/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/log/flink-mehmetozanguven-standalonesession-0-mehmetozanguven-ABRA-A5-V5.log
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlog4j.configuration=file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/conf/log4j.properties
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlogback.configurationFile=file:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/conf/logback.xml
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments:
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - /home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/conf
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --executionMode
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - cluster
2020-05-03 22:48:42,352 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath: /home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/lib/flink-table_2.11-1.10.0.jar:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/lib/flink-table-blink_2.11-1.10.0.jar:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/lib/log4j-1.2.17.jar:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/lib/slf4j-log4j12-1.7.15.jar:/home/mehmetozanguven/Desktop/ApacheTools/flink-1.10.0/lib/flink-dist_2.11-1.10.0.jar:::
2020-05-03 22:48:42,353 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
2020-05-03 22:48:42,354 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX signal handlers for [TERM, HUP, INT]
flink-conf.yaml file
, GlobalConfiguration
class loads that file.2020-05-03 22:48:42,377 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2020-05-03 22:48:42,378 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2020-05-03 22:48:42,378 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2020-05-03 22:48:42,378 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.memory.process.size, 1568m
2020-05-03 22:48:42,378 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 8
...
public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint{
// ...
public static void main(String[] args) {
// ...
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
}
}
// ...
public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
protected static Configuration loadConfiguration(EntrypointClusterConfiguration entrypointClusterConfiguration) {
// ...
final Configuration configuration = GlobalConfiguration.loadConfiguration(entrypointClusterConfiguration.getConfigDir(), dynamicProperties);
// ...
}
}
// ...
public final class GlobalConfiguration {
public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {
// ...
final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
// ...
// code statement where flink-conf.yaml file loaded
Configuration configuration = loadYAMLResource(yamlConfigFile);
}
}
2020-05-03 22:48:42,409 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting StandaloneSessionClusterEntrypoint.
public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint{
// ...
public static void main(String[] args) {
// ...
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
}
// ...
public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
// ...
}
// ...
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
// configure default filesystem
configureFileSystems(configuration);
// install security context
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
// run the cluster
runCluster(configuration);
return null;
});
}
}
}
2020-05-03 22:48:42,409 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Install default filesystem.
2020-05-03 22:48:42,465 INFO org.apache.flink.core.fs.FileSystem - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-05-03 22:48:42,497 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Install security context.
2020-05-03 22:48:42,511 INFO org.apache.flink.runtime.security.modules.HadoopModuleFactory - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
# Jaas = Java Authentication and Authorization Service(JAAS) for authentication of users and for authorization of users
2020-05-03 22:48:42,525 INFO org.apache.flink.runtime.security.modules.JaasModule - Jaas file will be created as /tmp/jaas-8543618536163139636.conf.
2020-05-03 22:48:42,534 INFO org.apache.flink.runtime.security.SecurityUtils - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
// ...
protected void initializeServices(Configuration configuration) throws Exception {
LOG.info("Initializing cluster services.");
synchronized (lock) {
// get job manager rpc address and port-range
final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
final String portRange = getRPCPortRange(configuration);
// create the rpc server with AkkaRpcServiceUtils
commonRpcService = createRpcService(configuration, bindAddress, portRange);
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor);
// create blobServer and start
// BlobServer is responsible for listening incoming and to handle them
// BlobServer is also takes care of creating directory to store BLOBs object
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
// create HeartBeatServices
// HeartBeatServices gives access to all services needed for heartbeating
heartbeatServices = createHeartbeatServices(configuration);
// metrictregisty keeps track of all registered metrics
metricRegistry = createMetricRegistry(configuration);
final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, bindAddress);
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
final String hostname = RpcUtils.getHostname(commonRpcService);
processMetricGroup = MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
}
}
}
# ...
2020-05-03 22:48:43,641 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory /tmp/blobStore-57de0a06-e2cc-481b-b0b5-26970fb77e53
2020-05-03 22:48:43,644 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:46005 - max concurrent requests: 50 - max backlog: 1000
# ...
2020-05-03 22:48:46,775 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID 2c1556deb41732fd9124989609914a5c (akka.tcp://[email protected]:38023/user/taskmanager_0) at ResourceManager
2020-05-03 22:48:46,833 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID 2c1556deb41732fd9124989609914a5c (akka.tcp://[email protected]:38023/user/taskmanager_0) at ResourceManager
2020-05-03 22:48:43,894 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner -
TM_RESOURCES_JVM_PARAMS extraction logs:
- Loading configuration property: jobmanager.rpc.address, localhost
- Loading configuration property: jobmanager.rpc.port, 6123
- Loading configuration property: jobmanager.heap.size, 1024m
- Loading configuration property: taskmanager.memory.process.size, 1568m
- Loading configuration property: taskmanager.numberOfTaskSlots, 8
# ...
2020-05-03 22:48:43,894 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Starting TaskManager (Version: 1.10.0, Rev:aa4eb8f, Date:07.02.2020 @ 19:18:19 CET)
2020-05-03 22:48:43,894 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - OS current user: mehmetozanguven
2020-05-03 22:48:43,895 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Current Hadoop/Kerberos user: <no hadoop dependency found>
2020-05-03 22:48:43,895 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - JVM: OpenJDK 64-Bit Server VM - AdoptOpenJDK - 11/11.0.6+10
2020-05-03 22:48:43,895 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Maximum heap size: 512 MiBytes
# ...
Note: State Example job name was the “Flink Streaming Java API Skeleton”
2020-05-03 22:48:57,513 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 35d029eee2a592940a335c8a4bfc7060 (Flink Streaming Java API Skeleton).
2020-05-03 22:48:57,514 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 35d029eee2a592940a335c8a4bfc7060 (Flink Streaming Java API Skeleton).
2020-05-03 22:48:57,538 INFO org.apache.flink.runtime.jobmaster.JobMaster - Initializing job Flink Streaming Java API Skeleton (35d029eee2a592940a335c8a4bfc7060).
2020-05-03 22:48:57,549 INFO org.apache.flink.runtime.jobmaster.JobMaster - Using restart back off time strategy NoRestartBackoffTimeStrategy for Flink Streaming Java API Skeleton (35d029eee2a592940a335c8a4bfc7060).
2020-05-10 19:48:34,216 INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job Flink Streaming Java API Skeleton (9d9ced6c7ac0a516b1958073f0edd4e4) under job master id 00000000000000000000000000000000.
9d9ced6c7ac0a516b1958073f0edd4e4 = jobId
c2bff0bee2ef084a2bc6b112cce92dc3 = custom source id
2020-05-10 19:48:34,225 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Java API Skeleton (9d9ced6c7ac0a516b1958073f0edd4e4) switched from state CREATED to RUNNING.
2020-05-10 19:48:34,237 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom File Source (1/1) (c2bff0bee2ef084a2bc6b112cce92dc3) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,237 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (1/8) (7778c56069046c5a1f4730c468a50440) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,237 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (2/8) (b73c8f013fa5d37b1e9fffffcc472768) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,237 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (3/8) (34cf48e24141ae5e811fbcf0ac6ebb1f) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (4/8) (c75e38d97a8c26b9b1ae6474470dedf0) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (5/8) (105f2dcefab3bbc45fcef49e871b298a) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (6/8) (0da90e4375dceebdd142df352e07d33e) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (7/8) (479fb6c575b6ff64f8b94c34401e9c83) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (8/8) (ac8d287bbc3bb87f59342c47b32b6373) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (1/8) (54f54824dab58e882c20808b485112e3) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (2/8) (738207a12cec4c57490d1b55290ea34b) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (3/8) (6eea23540e58d5aff098a2a96db89b36) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (4/8) (92576a02091d413adac412f1521e74ef) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (5/8) (25e0f9a5002736cfe6f5f9ebfed1f069) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (6/8) (6babf57f8c236df78d01ece66d771f63) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (7/8) (2a1909729e4c10ada074ae2a18c68fc7) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,239 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (8/8) (5af64e90fc740665e678b5b53f9d3853) switched from CREATED to SCHEDULED.
2020-05-10 19:48:34,273 INFO org.apache.flink.runtime.jobmaster.JobMaster - Connecting to ResourceManager akka.tcp://flink@localhost:6123/user/resourcemanager(00000000000000000000000000000000)
[email protected]://flink@localhost:6123/user/jobmanager_0 for job 9d9ced6c7ac0a516b1958073f0edd4e4. (job id)
2020-05-10 19:48:34,287 INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
# ...
2020-05-10 19:48:34,390 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Requesting new slot [SlotRequestId{886601088399e7df846d7d1383d736a2}] and profile ResourceProfile{UNKNOWN} from resource manager.
2020-05-10 19:48:34,391 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{UNKNOWN} for job 9d9ced6c7ac0a516b1958073f0edd4e4 with allocation id 0c281e95bf020f4088b3c54b47ea3c9d.
# ...
2020-05-10 19:48:34,370 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Successful registration at job manager akka.tcp://flink@localhost:6123/user/jobmanager_0 for job 9d9ced6c7ac0a516b1958073f0edd4e4.
2020-05-10 19:48:34,371 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Establish JobManager connection for job 9d9ced6c7ac0a516b1958073f0edd4e4.
2020-05-10 19:48:34,375 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to the leader of job 9d9ced6c7ac0a516b1958073f0edd4e4.
2020-05-10 19:48:34,401 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request 0c281e95bf020f4088b3c54b47ea3c9d for job 9d9ced6c7ac0a516b1958073f0edd4e4 from resource manager with leader id 00000000000000000000000000000000.
2020-05-10 19:48:34,402 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for 0c281e95bf020f4088b3c54b47ea3c9d.
2020-05-10 19:48:34,402 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to the leader of job 9d9ced6c7ac0a516b1958073f0edd4e4.
2020-05-10 19:48:34,474 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom File Source (1/1) (attempt #0) to 75eef81d20fd250ba7860a5e6a4c2153 @ mehmetozanguven-ABRA-A5-V5 (dataPort=40679)
2020-05-10 19:48:34,483 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - IBBDataSource -> MapTxtToObject (1/8) (7778c56069046c5a1f4730c468a50440) switched from SCHEDULED to DEPLOYING.
2020-05-10 19:48:34,485 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying IBBDataSource -> MapTxtToObject (1/8) (attempt #0) to 75eef81d20fd250ba7860a5e6a4c2153 @ mehmetozanguven-ABRA-A5-V5 (dataPort=40679)
# ...
# ...
2020-05-10 19:48:34,756 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (3/8) (6eea23540e58d5aff098a2a96db89b36) switched from DEPLOYING to RUNNING.
2020-05-10 19:48:34,760 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom File Source (1/1) (c2bff0bee2ef084a2bc6b112cce92dc3) switched from DEPLOYING to RUNNING.
2020-05-10 19:48:34,773 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (5/8) (25e0f9a5002736cfe6f5f9ebfed1f069) switched from DEPLOYING to RUNNING.
2020-05-10 19:48:34,780 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (2/8) (738207a12cec4c57490d1b55290ea34b) switched from DEPLOYING to RUNNING.
# ...
# ...
2020-05-10 19:48:34,741 INFO org.apache.flink.runtime.taskmanager.Task - FindFastestVehicle -> Sink: PrintResult (3/8) (6eea23540e58d5aff098a2a96db89b36) switched from DEPLOYING to RUNNING.
2020-05-10 19:48:34,741 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom File Source (1/1) (c2bff0bee2ef084a2bc6b112cce92dc3) switched from DEPLOYING to RUNNING.
2020-05-10 19:48:34,765 INFO org.apache.flink.runtime.taskmanager.Task - FindFastestVehicle -> Sink: PrintResult (5/8) (25e0f9a5002736cfe6f5f9ebfed1f069) switched from DEPLOYING to RUNNING.
2020-05-10 19:48:34,770 INFO org.apache.flink.runtime.taskmanager.Task - FindFastestVehicle -> Sink: PrintResult (2/8) (738207a12cec4c57490d1b55290ea34b) switched from DEPLOYING to RUNNING.
# ...
.txt
file, State changed to Finished (JobManager’s log file):# ...
2020-05-10 19:48:40,948 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (2/8) (738207a12cec4c57490d1b55290ea34b) switched from RUNNING to FINISHED.
2020-05-10 19:48:40,952 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (3/8) (6eea23540e58d5aff098a2a96db89b36) switched from RUNNING to FINISHED.
2020-05-10 19:48:40,956 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FindFastestVehicle -> Sink: PrintResult (5/8) (25e0f9a5002736cfe6f5f9ebfed1f069) switched from RUNNING to FINISHED.
2020-05-10 19:48:41,141 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Java API Skeleton (9d9ced6c7ac0a516b1958073f0edd4e4) switched from state RUNNING to FINISHED.
# ...
# ...
2020-05-10 19:48:40,942 INFO org.apache.flink.runtime.taskmanager.Task - FindFastestVehicle -> Sink: PrintResult (2/8) (738207a12cec4c57490d1b55290ea34b) switched from RUNNING to FINISHED.
2020-05-10 19:48:40,942 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for FindFastestVehicle -> Sink: PrintResult (2/8) (738207a12cec4c57490d1b55290ea34b).
2020-05-10 19:48:40,942 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task FindFastestVehicle -> Sink: PrintResult (2/8) (738207a12cec4c57490d1b55290ea34b) [FINISHED]
2020-05-10 19:48:40,942 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task FindFastestVehicle -> Sink: PrintResult (2/8) 738207a12cec4c57490d1b55290ea34b.
# ...
2020-05-10 19:48:41,162 INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection d6d73032757e1a231274935bddee5e9f: JobManager is shutting down..
2020-05-10 19:48:41,183 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 9d9ced6c7ac0a516b1958073f0edd4e4.
2020-05-10 19:48:41,187 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 9d9ced6c7ac0a516b1958073f0edd4e4.
Last but not least, wait for the next post…
In this post, I am going to explain, how Flink starts itself, and what happens when you submit your job to the Standalone Cluster setup Standalone …
In this post, I am going to explain “what is state backend”, “which options do we have for state backend” , “how to configure state backend for your …