大数据技术:Flink读取Kafka的json数据并进行分析

大数据技术:Flink读取Kafka的json数据并进行分析

精选文章moguli202025-04-03 23:11:4932A+A-

Flink 读取 Kafka JSON 数据的原理
1、当 Flink 从 Kafka 读取 JSON 数据时,首先,Flink 通过 Kafka 连接器与 Kafka 集群建立连接。这个连接器负责从 Kafka 主题中拉取数据,并将其转换为 Flink 能够处理的数据流。
2、在数据拉取过程中,Kafka 连接器会根据配置的起始偏移量等配置使得用户可以根据实际需求灵活地控制数据读取的起始点。
3、一旦数据被拉取到 Flink 中,接下来就需要对 JSON 格式的数据进行反序列化。Flink 提供了多种反序列化方式,对于 JSON 数据,通常会使用 JSON 反序列化器将字节数组形式的 JSON 数据转换为 Java 对象。这些 Java 对象随后会被封装成 Flink 的数据流元素,以便后续进行各种转换操作。

不必多说,直接上代码,java语言
1、创建pojo类

这里省略getter和setter等方法

2、核心

package org.kafkaJsonTest.Analysis;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.pojo.PowerLoad;
import org.kafkaJsonTest.conf.readKafkaDataJson;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkKafkaSqlPowerAnalysis {
    public static void main(String[] args) {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建 Kafka 数据源
        readKafkaDataJson readJson = new readKafkaDataJson();
        KafkaSource kafkaSource = readJson.kafkaJson("power", "flink_power", PowerLoad.class);

        // 从 Kafka 读取数据
        DataStream stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 将 DataStream 转换为 Table
        Table inputTable = tableEnv.fromDataStream(stream);

        // 注册为临时表
        tableEnv.createTemporaryView("powers_load", inputTable);

        // 示例 SQL 查询:计算每个区域的平均温度
        String sql = "SELECT region_code, avg(loadValue) as avg_loadValue ,avg(threshold) as avg_threshold FROM powers_load GROUP BY region_code";
        Table resultTable = tableEnv.sqlQuery(sql);

        // 将表结果转换为数据流 更新流
        tableEnv.toChangelogStream(resultTable).print();

        // 执行任务,使用 try-catch 块处理异常
        try {
            env.execute("helo");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    }

这里反序列化Json数据是借助 JsonDeserializationSchema 能够方便地把 Kafka 中的 JSON 消息反序列化为 pojo对象,更具可读性和可维护性,其次使用flinksql进行数据分析。

3、发送数据,这里代码省略

        // 示例 PowerLoad 数据
        List data = new ArrayList<>();
        data.add(new PowerLoad("region_A", 120.5, 150.0, System.currentTimeMillis()));
        data.add(new PowerLoad("region_B", 130.2, 170.0, System.currentTimeMillis()));
        data.add(new PowerLoad("region_C", 110.8, 140.0, System.currentTimeMillis()));
        data.add(new PowerLoad("region_A", 125.3, 150.0, System.currentTimeMillis()));
        data.add(new PowerLoad("region_B", 135.7, 170.0, System.currentTimeMillis()));

4、结果

[前提得开启hadoop,zookeeper,kafka]


觉得还不错的,点个赞,嘿嘿嘿

点击这里复制本文地址 以上内容由莫古技术网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

莫古技术网 © All Rights Reserved.  滇ICP备2024046894号-2