大数据技术:Flink读取Kafka的json数据并进行分析
Flink 读取 Kafka JSON 数据的原理
1、当 Flink 从 Kafka 读取 JSON 数据时,首先,Flink 通过 Kafka 连接器与 Kafka 集群建立连接。这个连接器负责从 Kafka 主题中拉取数据,并将其转换为 Flink 能够处理的数据流。
2、在数据拉取过程中,Kafka 连接器会根据配置的起始偏移量等配置使得用户可以根据实际需求灵活地控制数据读取的起始点。
3、一旦数据被拉取到 Flink 中,接下来就需要对 JSON 格式的数据进行反序列化。Flink 提供了多种反序列化方式,对于 JSON 数据,通常会使用 JSON 反序列化器将字节数组形式的 JSON 数据转换为 Java 对象。这些 Java 对象随后会被封装成 Flink 的数据流元素,以便后续进行各种转换操作。
不必多说,直接上代码,java语言
1、创建pojo类
这里省略getter和setter等方法
2、核心
package org.kafkaJsonTest.Analysis;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.pojo.PowerLoad;
import org.kafkaJsonTest.conf.readKafkaDataJson;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkKafkaSqlPowerAnalysis {
public static void main(String[] args) {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 Kafka 数据源
readKafkaDataJson readJson = new readKafkaDataJson();
KafkaSource kafkaSource = readJson.kafkaJson("power", "flink_power", PowerLoad.class);
// 从 Kafka 读取数据
DataStream stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 将 DataStream 转换为 Table
Table inputTable = tableEnv.fromDataStream(stream);
// 注册为临时表
tableEnv.createTemporaryView("powers_load", inputTable);
// 示例 SQL 查询:计算每个区域的平均温度
String sql = "SELECT region_code, avg(loadValue) as avg_loadValue ,avg(threshold) as avg_threshold FROM powers_load GROUP BY region_code";
Table resultTable = tableEnv.sqlQuery(sql);
// 将表结果转换为数据流 更新流
tableEnv.toChangelogStream(resultTable).print();
// 执行任务,使用 try-catch 块处理异常
try {
env.execute("helo");
} catch (Exception e) {
e.printStackTrace();
}
}
}
这里反序列化Json数据是借助 JsonDeserializationSchema
3、发送数据,这里代码省略
// 示例 PowerLoad 数据
List data = new ArrayList<>();
data.add(new PowerLoad("region_A", 120.5, 150.0, System.currentTimeMillis()));
data.add(new PowerLoad("region_B", 130.2, 170.0, System.currentTimeMillis()));
data.add(new PowerLoad("region_C", 110.8, 140.0, System.currentTimeMillis()));
data.add(new PowerLoad("region_A", 125.3, 150.0, System.currentTimeMillis()));
data.add(new PowerLoad("region_B", 135.7, 170.0, System.currentTimeMillis()));
4、结果
[前提得开启hadoop,zookeeper,kafka]
觉得还不错的,点个赞,嘿嘿嘿