温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flink区分运行环境的方法是什么

发布时间:2021-12-31 14:31:40 来源:亿速云 阅读:308 作者:iii 栏目:大数据

这篇文章主要介绍“Flink区分运行环境的方法是什么”,在日常操作中,相信很多人在Flink区分运行环境的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Flink区分运行环境的方法是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

Flink判断运行环境(本地、集群)的逻辑如下:

(1)在任务的main方法中,通过 StreamExecutionEnvironment 获取运行环境

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

(2)生成运行环境的工厂类放在ThreadLocal中;threadLocalContextEnvironmentFactory 是StreamExecutionEnvironment类的静态属性 

	/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
	private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();

①当是本地IDE直接运行任务main方法时,ThreadLocal中获取到的StreamExecutionEnvironmentFactory为空,此时生成本地运行环境LocalStreamEnvironment

	public static StreamExecutionEnvironment getExecutionEnvironment() {
		return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
			.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
			.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
	}

当ThreadLocal中有StreamExecutionEnvironmentFactory时,则用其createExecutionEnvironment()方法来生成运行环境

②当集群环境时,是如何将StreamExecutionEnvironmentFactory放入到ThreadLocal中?

通过 bin/flink run ....   命令提交jar包到集群运行命令时,该脚本会调用 org.apache.flink.client.cli.CliFrontend  来运行用户程序,如下:

.......
.......
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

在CliFrontend中依次执行以下方法 main() ->  parseParameters() -> run() -> executeProgram() 

	protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
		ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
	}

在org.apache.flink.client.ClientUtils的executeProgram()中调用 StreamContextEnvironment.setAsContext(...),StreamContextEnvironment继承自StreamExecutionEnvironment。setAsContext()代码如下

	public static void setAsContext(
			final PipelineExecutorServiceLoader executorServiceLoader,
			final Configuration configuration,
			final ClassLoader userCodeClassLoader,
			final boolean enforceSingleJobExecution,
			final boolean suppressSysout) {
		StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
			executorServiceLoader,
			configuration,
			userCodeClassLoader,
			enforceSingleJobExecution,
			suppressSysout);
		initializeContextEnvironment(factory);
	}

创建生成运行环境的工厂类实例,在initializeContextEnvironment()方法中把实例放到StreamExecutionEnvironment类的静态属性threadLocalContextEnvironmentFactory 中 ,代码如下

	protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
		contextEnvironmentFactory = ctx;
		threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
	}

这样在用户程序 StreamExecutionEnvironment.getExecutionEnvironment() 时,获取到的运行环境就是StreamContextEnvironment类的setAsContext()方法中生成的

	public static void setAsContext(
			final PipelineExecutorServiceLoader executorServiceLoader,
			final Configuration configuration,
			final ClassLoader userCodeClassLoader,
			final boolean enforceSingleJobExecution,
			final boolean suppressSysout) {
		StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
			executorServiceLoader,
			configuration,
			userCodeClassLoader,
			enforceSingleJobExecution,
			suppressSysout);
		......
	}

本地运行环境LocalStreamEnvironment 和 独立集群、flink on yarn等运行环境StreamContextEnvironment 的主要区别在于,他们的成员属性 configuration 不同。LocalStreamEnvironment 中是创建的空键值对(new Configuration()),而StreamContextEnvironment 是通过 CliFrontend 生成的 Configuration 对象。

到此,关于“Flink区分运行环境的方法是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI