3 Flink 运行环境
Flink 集群角色
为提交作业执行任务,Flink 需要几个关键组件:
- 客户端(Client):代码由客户端获取并做转换,之后提交给 JobManager。
- JobManager:Flink 集群的“管事人”,对作业进行重要调度管理,而它获取到执行的作业后,会进一步处理转换,然后分发任务给众多的 TaskManager。
- TaskManager:就是真正“干活的人”,数据的处理操作都是它们来做。
Flink 的运行方式与部署模式
Flink 的运行方式
Flink 的运行方式有 4 种:
- 本地(单机)模式:在这种模式下,Flink可以直接在一台机器上运行,用户只需下载jar包后启动即可。这种模式适合开发和测试阶段使用。
- Flink Standalone 模式:这是一种经典的主从架构模式,Flink自己实现资源调度。启动后,主节点上会启动一个JobManager,从节点上启动 TaskManager。Client将作业提交给 JobManager,JobManager负责资源分配和任务调度,TaskManage r负责执行任务。这种模式不依赖其他系统,适合中小规模的生产环境。
- Flink On YARN模式:在这种模式下,Flink 使用 YARN 作为底层资源调度系统,以分布式的方式在集群中运行。YARN 负责资源管理,Flink 负责计算任务。这种模式适合大规模生产环境,能够充分利用集群资源。
- Flink On Kubernetes模式:在这种模式下,Flink 部署在 Kubernetes 上,利用 Kubernetes 进行资源管理和调度。这种模式结合了 Flink 的高性能计算和 Kubernetes 的容器管理能力,适合需要高可用性和弹性伸缩的生产环境。
Flink 的部署模式
Flink 有 3 种部署模式(Development Mode):
- 会话模式(Session):最符合常规思维,先启动一个 Flink 集群,保持一个会话,这这个会话中通过客户端提交作业。集群启动时,所有资源就已经确定,所以所有提交的作业会竞争集群中的资源。
适用场景:单个规模小,执行时间短的大量作业。
优点:不用多个集群,不用给每个作业定义集群。
缺点:作业竞争资源,资源共享。
- 单作业模式(Per-Job):为了更好的隔离资源,为每个提交的作业启动一个集群,即但作业(Per-Job)模式。曾经是比较推崇的部署模式,现在已经过时。作业完成后,集群会关闭,资源也会释放。特别多的作业,要维护特别多的集群。
存在问题:通过 Client 提交,提交时要作代码转换,如:取数据、打平、聚合、输出等,这些工作都会在客户端进行,客户端作的工作比较多。当有大量作业时,客户端与集群频繁通信,会产生大量的带宽消耗,去下载依赖和把二进制数据发送给 Job Manager;加上很多情况下,提交作业在同一个客户端,会加剧客户端节点资源的消耗。为解决上述问题,应用模式产生。
- 应用模式(Application):在应用模式下,不再需要客户端提交任务,直接把应用提交到 Job Manager 上运行。不需要通过客户端提交任务到针对单作业集群的 Job 。每个应用独立启动一个 Job Manager 也就是一个集群,这个 JobManager 只为执行一个应用而存在。执行结束后,JobManager 也将被关闭。所有的转换工作,都在集群上进行。
应用模式与但作业模式,都是提交作业后才创建集群,单作业模式通过客户端来提交,客户端解析出的每个作业对应一个集群;而应用模式下,直接由 JobManager 执行应用程序,不需要客户端对代码进行解析。目前应用模式是部署的首选。
【注意】Flink 本身无法直接进行单作业模式运行,都需要借助一些资源管理框架来启动集群,如:YARN、Kubernetes 等运行。
Flink 部署
Flink 伪分布式环境搭建
服务器规划:
服务器主机名 | 部署组件 |
wuji1626 | JobManager TaskManager |
对压缩包进行解压:
tar -zvxf flink-1.17.0-bin-scala_2.12.tgz
修改配置文件
1)修改 flink-conf.yaml
进入 conf 目录,修改 Flink 的核心配置文件 flink-conf.yaml 的内容。
# JobManager节点地址.
jobmanager.rpc.address: wuji1626
# JobManager 绑定的地址,不受限制就设置为 0.0.0.0
jobmanager.bind-host: 0.0.0.0
# Flink 的 Rest服务地址
rest.address: wuji1626
# Rest 服务地址,不受限制就设置为 0.0.0.0
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: wuji1626
2)修改 conf/works 文件,指定 works 服务器的主机,由于是伪分布式部署,只写本机主机即可(我的主机名为 wuji1626)。
wuji1626
3)修改 conf/masters 文件,指定 master 服务器的主机,由于是伪分布式部署,需要指定本机主机名和通信端口(使用默认的 8081 即可)。
wuji1626:8081
4)启动集群
bin/start-cluster.sh
通过 jps 查看进程启动情况。其中
StandaloneSessionClusterEntrypoint 相当于 JobManager,而 TaskManagerRunner 就是 TaskManager。
5)验证
通过下述 url 访问 Flink WebUI 控制台,验证集群启动状况。如集群控制台所示,母亲可用的 TaskManager 有 1 个,正在运行的 Job 没有。
http://wuji1626:8081
在 Flink WebUI 中提交任务
要在 Flink WebUI界面提交 Flink任务,需要先将 Flink 任务打成 jar 包。再将 jar 包通过 WebUI 界面提交到 Flink 集群。
1)打 jar 包。
为了便于 Flink 程序运行,前期可以将 Flink 任务程序所有依赖的库都打入一个 jar 文件。可在 maven 的 pom.xml 中配置插件 shade,它可以在导出 jar 文件时,同时导出 2 个 jar 包,一个保含所有依赖,一个不包含依赖只有源码。
具体配置如下:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
使用 maven 插件,先后执行 clean、package,在工程的 target 目录下可以看到两个 jar 包。
2)上传 jar 包
在 Flink Dashboard 中选择【Submit New Job】,并点击【Add New】按钮。
在本地选择包含依赖的 jar 文件上传。
选择入口类的全路径名称“
com.wuji1626.wc.Flink03_Unbound_Stream”,提交。
提交之前,先在服务器命令行中执行,向端口发送信息。
nc -l -v 8888
3)Job 执行,Job 将会将端口输入的内容,计算后输出到控制台输出。
如果事先没有运行 nc 命令,Job 启动会报错,提示连接被拒绝(Connection Refused)。
通过命令行提交任务
1)将打包的
bigdata-sample-1.0-SNAPSHOT.jar 文件上传到服务器对应的路径下,这里上传到了如下目录下。
flink-1.17.0//examples/streaming
2)执行 nc 命令,开启向端口通信的程序。
nc -lk 8888
3)运行下述命令:
bin/flink run -m wuji1626:8081 -c com.wuji1626.wc.Flink03_Unbound_Stream ./examples/streaming/bigdata-sample-1.0-SNAPSHOT.jar
其中:
- -m:确定 JobManager 格式为:[主机名]:[端口],这里是 wuji1626:8081
- -c:入口类绝对类名,同时需要指定类所在 jar 位置。
运行后,控制台会输出 Job信息。
前往 Flink WebUI 控制台查看 Job 信息,信息与 Console 输出的 Job ID 保持一致。
4)在 Task 节点可以查看程序的控制台输出。
Standalone 运行模式
独立运行模式是独立运行,不依赖任何外部的资源管理平台。
独立运行的缺点:
- 资源不足;
- 出现故障,没有自动扩展或重分配资源的保证,必须手动处理
所以独立模式一般只用在开发测试或作业非常少的场景下。
部署模式:
- 会话模式部署:与 Flink 的会话部署模式相同,先启动 JobManager 再提交作业。
- 单作业模式部署:Flink 不支持在 Standalone 运行模式下进行单作业部署,因为需要外部的资源管理平台进行资源管理。
- 应用模式部署:应用模式下不会提前创建集群。需要分别运行 standalone-job.sh 来创建 JobManager。
以 Application 的方式部署
1)将之前运行的
bigdata-sample-1.0-SNAPSHOT.jar 复制到 lib 目录下。
2)执行下述脚本,启动 Standalone 模式的应用集群。
bin/standalone-job.sh start --job-classname com.wuji1626.wc.Flink03_Unbound_Stream
3)执行下述脚本,启动 JobManager。
bin/taskmanager.sh start
4)执行 nc 命令:
nc -lk 8888
5)通过 WebUI 查看应用运行情况。
控制台上确实有运行中的 Application。
通过查看 Task 的控制台输出,Application 确实正在运行中。
6)通过下述脚本,停止 JobManager。
bin/taskmanager.sh stop
日常开发中不会采用 Application 方式进行部署,会话模式就能满足日常的开发工作。
有进步了一点,发个美女轻松一下。