Flink从入门到放弃之源码解析系列-第9章 异常处理
ztj100 2024-10-28 21:09 13 浏览 0 评论
导语
- Flink组件和逻辑计划
- Flink执行计划生成
- JobManager中的基本组件(1)
- JobManager中的基本组件(2)
- JobManager中的基本组件(3)
- TaskManager
- 算子
- 网络
- 水印WaterMark
- CheckPoint
- 任务调度与负载均衡
- 异常处理
- Alibaba Blink新特性
1前言
flink 的架构在 flink 基本组件一节已经介绍过,其中的 TaskManager 负责监护 task 的执行,对于每个 task,flink 都会启动一个线程去执行,那么当用户的代码抛出异常时,flink 的处理逻辑是什么呢?
2
flink 的 task 的 Runnable 类是 Task.java,我们观察到它的 run() 方法真个被一个大的 try catch 包住,我们重点关注 catch 用户异常之后的部分:
//Task
catch (Throwable t) {
// ----------------------------------------------------------------
// the execution failed. either the invokable code properly failed, or
// an exception was thrown as a side effect of cancelling
// ----------------------------------------------------------------
try {
// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
// loop for multiple retries during concurrent state changes via calls to cancel() or
// to failExternally()
while (true) {
ExecutionState current = this.executionState;
简单总结其逻辑:
- 如果当前的执行状态是 ExecutionState.RUNNING 或者 ExecutionState.DEPLOYING,表明是从正常运行到异常状态的过度,这时候判断是主动 Cancel 执行,如果是,执行 StreamTask 的 cancel 方法, 并通知观察者它的状态已变成:ExecutionState.CANCELED;如果不是主动 Cancel,表明是用户异常触发,这时候同样执行 StreamTask 的 cancel 方法,然后通知观察者它的状态变成:ExecutionState.FAILED,这里的 cancel 方法留给 flink 内部的算子来实现,对于普通 task ,会停止消费上游数据,对于 source task,会停止发送源数据
- 对于用户异常来说,通知观察者的状态应该为 ExecutionState.FAILED,我们下面详细分析
- finally 的部分会释放掉这个 task 占有的所有资源,包括线程池、输入 InputGate 及 写出 ResultPartition 占用的全部 BufferPool、缓存的 jar 包等,最后通知 TaskManager 这个 Job 的 这个 task 已经执行结束:
- notifyFinalState()
- 如果异常逻辑发生了任何其它异常,说明是 TaskManager 相关环境发生问题,这个时候会杀死 TaskManager
通知TaskManager
上面提到,finally 的最后阶段会通知 TaskManager,我们来梳理逻辑:
//TaskManager
// removes the task from the TaskManager and frees all its resources
case TaskInFinalState(executionID) =>
unregisterTaskAndNotifyFinalState(executionID)
//TaskManager
private def unregisterTaskAndNotifyFinalState(executionID: ExecutionAttemptID): Unit = {
val task = runningTasks.remove(executionID)
if (task != null) {
// the task must be in a terminal state
if (!task.getExecutionState.isTerminal) {
try {
task.failExternally(new Exception("Task is being removed from TaskManager"))
} catch {
case e: Exception => log.error("Could not properly fail task", e)
}
}
//TaskManager
self ! decorateMessage(
UpdateTaskExecutionState(
new TaskExecutionState(
task.getJobID,
task.getExecutionId,
task.getExecutionState,
task.getFailureCause,
accumulators)
)
)
//ExecutionGraph
case FAILED:
attempt.markFailed(state.getError(userClassLoader));
return true;
//Execution
void markFinished(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators, Map<String, Accumulator<?, ?>> userAccumulators) {
// this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!)
while (true) {
ExecutionState current = this.state;
if (current == RUNNING || current == DEPLOYING) {
if (transitionState(current, FINISHED)) {
try {
//Execution
try {
vertex.notifyStateTransition(attemptId, targetState, error);
}
catch (Throwable t) {
LOG.error("Error while notifying execution graph of execution state transition.", t);
}
//ExecutionGraph
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
newExecutionState, Throwable error)
{
//...
// see what this means for us. currently, the first FAILED state means -> FAILED
if (newExecutionState == ExecutionState.FAILED) {
fail(error);
}
//ExecutionGraph
public void fail(Throwable t) {
while (true) {
JobStatus current = state;
// stay in these states
if (current == JobStatus.FAILING ||
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
synchronized (progressLock) {
postRunCleanup();
progressLock.notifyAll();
LOG.info("Job {} failed during restart.", getJobID());
return;
}
} else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t;
if (!verticesInCreationOrder.isEmpty()) {
// cancel all. what is failed will not cancel but stay failed
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
ejv.cancel();
}
} else {
// set the state of the job to failed
transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
}
return;
}
// no need to treat other states
}
}
总结其逻辑:
- 在一些合法性 check 之后,TaskManager 会给自己发送一条路由消息:UpdateTaskExecutionState,TaskManager 继而将这条消息转发给 JobManager
- JobManager 会标志 Job 状态为 FAILING 并通知 JobCli,并且立即停止所有 task 的执行,这时候 CheckpointCoordinator 在执行 checkpoint 的时候感知到 task 失败状态会立即返回,停止 checkpoint
3异常后的资源释放
主要包括以下资源:
- 网络资源:InputGate 和 ResultPartiton 的内存占用
- 其他内存:通过 MemoryManager 申请的资源
- 缓存资源:lib 包和其他缓存
- 线程池:Task 内部持有
相关推荐
- 从IDEA开始,迈进GO语言之门(idea got)
-
前言笔者在学习GO语言编程的时候,GO语言在国内还没有像JAVA/Php/Python那样普及,绕了不少的弯路,要开始入门学习一门编程语言,最好就先从选择一个好的编程语言的开发环境开始,有了这个开发环...
- 基于SpringBoot+MyBatis的私人影院java网上购票jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于SpringBoot...
- 基于springboot的个人服装管理系统java网上商城jsp源代码mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...
- 基于springboot的美食网站Java食品销售jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...
- 贸易管理进销存springboot云管货管账分析java jsp源代码mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目描述贸易管理进销存spring...
- SpringBoot+VUE员工信息管理系统Java人员管理jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍SpringBoot+V...
- 目前见过最牛的一个SpringBoot商城项目(附源码)还有人没用过吗
-
帮粉丝找了一个基于SpringBoot的天猫商城项目,快速部署运行,所用技术:MySQL,Druid,Log4j2,Maven,Echarts,Bootstrap...免费给大家分享出来前台演示...
- SpringBoot+Mysql实现的手机商城附带源码演示导入视频
-
今天为大家带来的是基于SpringBoot+JPA+Thymeleaf框架的手机商城管理系统,商城系统分为前台和后台、前台用的是Bootstrap框架后台用的是SpringBoot+JPA都是现在主...
- 全网首发!马士兵内部共享—1658页《Java面试突击核心讲》
-
又是一年一度的“金九银十”秋招大热门,为助力广大程序员朋友“面试造火箭”,小编今天给大家分享的便是这份马士兵内部的面试神技——1658页《Java面试突击核心讲》!...
- SpringBoot数据库操作的应用(springboot与数据库交互)
-
1.JDBC+HikariDataSource...
- SpringBoot 整合 Flink 实时同步 MySQL
-
1、需求在Flink发布SpringBoot打包的jar包能够实时同步MySQL表,做到原表进行新增、修改、删除的时候目标表都能对应同步。...
- SpringBoot + Mybatis + Shiro + mysql + redis智能平台源码分享
-
后端技术栈基于SpringBoot+Mybatis+Shiro+mysql+redis构建的智慧云智能教育平台基于数据驱动视图的理念封装element-ui,即使没有vue的使...
- Springboot+Mysql舞蹈课程在线预约系统源码附带视频运行教程
-
今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的Springboot+Mysql舞蹈课程在线预约系统,系统项目源代码在【猿来入此】获取!https://www.yuan...
- SpringBoot+Mysql在线众筹系统源码+讲解视频+开发文档(参考论文
-
今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的在线众筹管理系统,主要实现了普通用户在线参与众筹基本操作流程的全部功能,系统分普通用户、超级管理员等角色,除基础脚手架外...
- Docker一键部署 SpringBoot 应用的方法,贼快贼好用
-
这两天发现个Gradle插件,支持一键打包、推送Docker镜像。今天我们来讲讲这个插件,希望对大家有所帮助!GradleDockerPlugin简介...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- 从IDEA开始,迈进GO语言之门(idea got)
- 基于SpringBoot+MyBatis的私人影院java网上购票jsp源代码Mysql
- 基于springboot的个人服装管理系统java网上商城jsp源代码mysql
- 基于springboot的美食网站Java食品销售jsp源代码Mysql
- 贸易管理进销存springboot云管货管账分析java jsp源代码mysql
- SpringBoot+VUE员工信息管理系统Java人员管理jsp源代码Mysql
- 目前见过最牛的一个SpringBoot商城项目(附源码)还有人没用过吗
- SpringBoot+Mysql实现的手机商城附带源码演示导入视频
- 全网首发!马士兵内部共享—1658页《Java面试突击核心讲》
- SpringBoot数据库操作的应用(springboot与数据库交互)
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- node卸载 (33)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- exceptionininitializererror (33)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)