3 Flink 运行环境

3 Flink 运行环境

精选文章moguli202025-05-21 0:33:253A+A-

Flink 集群角色

为提交作业执行任务,Flink 需要几个关键组件:

  • 客户端(Client):代码由客户端获取并做转换,之后提交给 JobManager。
  • JobManager:Flink 集群的“管事人”,对作业进行重要调度管理,而它获取到执行的作业后,会进一步处理转换,然后分发任务给众多的 TaskManager。
  • TaskManager:就是真正“干活的人”,数据的处理操作都是它们来做。

Flink 的运行方式与部署模式

Flink 的运行方式

Flink 的运行方式有 4 种:

  1. 本地(单机)模式:在这种模式下,Flink可以直接在一台机器上运行,用户只需下载jar包后启动即可。这种模式适合开发和测试阶段使用。
  2. Flink Standalone 模式:这是一种经典的主从架构模式,Flink自己实现资源调度。启动后,主节点上会启动一个JobManager,从节点上启动 TaskManager。Client将作业提交给 JobManager,JobManager负责资源分配和任务调度,TaskManage r负责执行任务。这种模式不依赖其他系统,适合中小规模的生产环境。
  3. Flink On YARN模式:在这种模式下,Flink 使用 YARN 作为底层资源调度系统,以分布式的方式在集群中运行。YARN 负责资源管理,Flink 负责计算任务。这种模式适合大规模生产环境,能够充分利用集群资源。
  4. Flink On Kubernetes模式:在这种模式下,Flink 部署在 Kubernetes 上,利用 Kubernetes 进行资源管理和调度。这种模式结合了 Flink 的高性能计算和 Kubernetes 的容器管理能力,适合需要高可用性和弹性伸缩的生产环境。

Flink 的部署模式

Flink 有 3 种部署模式(Development Mode):

  1. 会话模式(Session):最符合常规思维,先启动一个 Flink 集群,保持一个会话,这这个会话中通过客户端提交作业。集群启动时,所有资源就已经确定,所以所有提交的作业会竞争集群中的资源。

适用场景:单个规模小,执行时间短的大量作业。

优点:不用多个集群,不用给每个作业定义集群。

缺点:作业竞争资源,资源共享。

  1. 单作业模式(Per-Job):为了更好的隔离资源,为每个提交的作业启动一个集群,即但作业(Per-Job)模式。曾经是比较推崇的部署模式,现在已经过时。作业完成后,集群会关闭,资源也会释放。特别多的作业,要维护特别多的集群。

存在问题:通过 Client 提交,提交时要作代码转换,如:取数据、打平、聚合、输出等,这些工作都会在客户端进行,客户端作的工作比较多。当有大量作业时,客户端与集群频繁通信,会产生大量的带宽消耗,去下载依赖和把二进制数据发送给 Job Manager;加上很多情况下,提交作业在同一个客户端,会加剧客户端节点资源的消耗。为解决上述问题,应用模式产生。

  1. 应用模式(Application):在应用模式下,不再需要客户端提交任务,直接把应用提交到 Job Manager 上运行。不需要通过客户端提交任务到针对单作业集群的 Job 。每个应用独立启动一个 Job Manager 也就是一个集群,这个 JobManager 只为执行一个应用而存在。执行结束后,JobManager 也将被关闭。所有的转换工作,都在集群上进行。

应用模式与但作业模式,都是提交作业后才创建集群,单作业模式通过客户端来提交,客户端解析出的每个作业对应一个集群;而应用模式下,直接由 JobManager 执行应用程序,不需要客户端对代码进行解析。目前应用模式是部署的首选。

【注意】Flink 本身无法直接进行单作业模式运行,都需要借助一些资源管理框架来启动集群,如:YARN、Kubernetes 等运行。

Flink 部署

Flink 伪分布式环境搭建

服务器规划:

服务器主机名

部署组件

wuji1626

JobManager

TaskManager

对压缩包进行解压:

tar -zvxf flink-1.17.0-bin-scala_2.12.tgz

修改配置文件

1)修改 flink-conf.yaml

进入 conf 目录,修改 Flink 的核心配置文件 flink-conf.yaml 的内容。

# JobManager节点地址.
jobmanager.rpc.address: wuji1626
# JobManager 绑定的地址,不受限制就设置为 0.0.0.0
jobmanager.bind-host: 0.0.0.0
# Flink 的 Rest服务地址
rest.address: wuji1626
# Rest 服务地址,不受限制就设置为 0.0.0.0
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: wuji1626

2)修改 conf/works 文件,指定 works 服务器的主机,由于是伪分布式部署,只写本机主机即可(我的主机名为 wuji1626)。

wuji1626

3)修改 conf/masters 文件,指定 master 服务器的主机,由于是伪分布式部署,需要指定本机主机名和通信端口(使用默认的 8081 即可)。

wuji1626:8081

4)启动集群

bin/start-cluster.sh

通过 jps 查看进程启动情况。其中
StandaloneSessionClusterEntrypoint 相当于 JobManager,而 TaskManagerRunner 就是 TaskManager。

5)验证

通过下述 url 访问 Flink WebUI 控制台,验证集群启动状况。如集群控制台所示,母亲可用的 TaskManager 有 1 个,正在运行的 Job 没有。

http://wuji1626:8081

在 Flink WebUI 中提交任务

要在 Flink WebUI界面提交 Flink任务,需要先将 Flink 任务打成 jar 包。再将 jar 包通过 WebUI 界面提交到 Flink 集群。

1)打 jar 包。

为了便于 Flink 程序运行,前期可以将 Flink 任务程序所有依赖的库都打入一个 jar 文件。可在 maven 的 pom.xml 中配置插件 shade,它可以在导出 jar 文件时,同时导出 2 个 jar 包,一个保含所有依赖,一个不包含依赖只有源码。

具体配置如下:

    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers combine.children="append">
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

使用 maven 插件,先后执行 clean、package,在工程的 target 目录下可以看到两个 jar 包。

2)上传 jar 包

在 Flink Dashboard 中选择【Submit New Job】,并点击【Add New】按钮。

在本地选择包含依赖的 jar 文件上传。

选择入口类的全路径名称“
com.wuji1626.wc.Flink03_Unbound_Stream”,提交。

提交之前,先在服务器命令行中执行,向端口发送信息。

nc -l -v 8888

3)Job 执行,Job 将会将端口输入的内容,计算后输出到控制台输出。

如果事先没有运行 nc 命令,Job 启动会报错,提示连接被拒绝(Connection Refused)。

通过命令行提交任务

1)将打包的
bigdata-sample-1.0-SNAPSHOT.jar 文件上传到服务器对应的路径下,这里上传到了如下目录下。

flink-1.17.0//examples/streaming

2)执行 nc 命令,开启向端口通信的程序。

nc -lk 8888

3)运行下述命令:

bin/flink run -m wuji1626:8081 -c com.wuji1626.wc.Flink03_Unbound_Stream ./examples/streaming/bigdata-sample-1.0-SNAPSHOT.jar

其中:

  • -m:确定 JobManager 格式为:[主机名]:[端口],这里是 wuji1626:8081
  • -c:入口类绝对类名,同时需要指定类所在 jar 位置。

运行后,控制台会输出 Job信息。

前往 Flink WebUI 控制台查看 Job 信息,信息与 Console 输出的 Job ID 保持一致。

4)在 Task 节点可以查看程序的控制台输出。

Standalone 运行模式

独立运行模式是独立运行,不依赖任何外部的资源管理平台。

独立运行的缺点:

  1. 资源不足;
  2. 出现故障,没有自动扩展或重分配资源的保证,必须手动处理

所以独立模式一般只用在开发测试或作业非常少的场景下。

部署模式:

  1. 会话模式部署:与 Flink 的会话部署模式相同,先启动 JobManager 再提交作业。
  2. 单作业模式部署:Flink 不支持在 Standalone 运行模式下进行单作业部署,因为需要外部的资源管理平台进行资源管理。
  3. 应用模式部署:应用模式下不会提前创建集群。需要分别运行 standalone-job.sh 来创建 JobManager。

以 Application 的方式部署

1)将之前运行的
bigdata-sample-1.0-SNAPSHOT.jar 复制到 lib 目录下。

2)执行下述脚本,启动 Standalone 模式的应用集群。

bin/standalone-job.sh start --job-classname com.wuji1626.wc.Flink03_Unbound_Stream

3)执行下述脚本,启动 JobManager。

bin/taskmanager.sh start

4)执行 nc 命令:

nc -lk 8888

5)通过 WebUI 查看应用运行情况。

控制台上确实有运行中的 Application。

通过查看 Task 的控制台输出,Application 确实正在运行中。

6)通过下述脚本,停止 JobManager。

bin/taskmanager.sh stop

日常开发中不会采用 Application 方式进行部署,会话模式就能满足日常的开发工作。

有进步了一点,发个美女轻松一下。


点击这里复制本文地址 以上内容由莫古技术网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

莫古技术网 © All Rights Reserved.  滇ICP备2024046894号-2