kafka的内存优化:
(1)修改启动脚本
[root@elk101.oldboyedu.com ~]# vim kafka-server-start.sh
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xmx256m -Xms256m -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
重启kafka即可:
kafka-server-stop.sh
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
使用 jps查看相关的进程
进程如下: QuorumPeerMain 为zookeeper的进程
使用jmap查看jvm的使用情况
1.MinHeapFreeRatio = 0
- 作用:这是 JVM 堆内存的最小空闲比例。当堆内存使用量低于 MinHeapFreeRatio 设置时,JVM 会尽量减少堆内存的大小。默认为 40,表示当堆空闲内存占比小于 40% 时,JVM 会尝试缩小堆的大小。
- 调优建议:将其设置为 0 意味着最小空闲比例为 0%,即尽量不减少堆内存。这在处理内存使用较大的应用时可能有助于避免频繁的堆内存缩减。对于内存使用非常稳定的应用可以选择更低的值,避免过度回收。
2.MaxHeapFreeRatio = 100
- 作用:这是堆内存最大空闲比例。当堆的空闲内存比例超过 MaxHeapFreeRatio 设置时,JVM 会尝试扩展堆内存。默认值为 70,表示如果堆空闲内存超过 70%,JVM 会增加堆内存。
- 调优建议:将其设置为 100 意味着堆不会主动扩展,除非必须。如果系统内存非常充裕,且垃圾回收的瓶颈不在堆大小上,可以考虑将其调整为更高的值来避免无谓的扩展。
3.MaxHeapSize = 1048576000 (1000.0MB)
- 作用:定义最大堆内存大小,单位为字节。此处为 1000MB,即最大堆内存为 1GB。
- 调优建议:根据应用的内存需求,合理设置最大堆大小。过大的堆可能导致垃圾回收时间变长,过小则可能频繁发生 GC。根据应用负载和实际需求进行调整。
4.NewSize = 88080384 (84.0MB)
- 作用:定义新生代(Young Generation)的初始大小。新生代用于存放新创建的对象。
- 调优建议:如果应用创建大量短生命周期对象,增加新生代的大小有助于减少频繁的 GC。通常,新生代占堆内存的 1/3 到 1/4。对于需要大量短期对象的应用,适当增大此值。
5.MaxNewSize = 349175808 (333.0MB)
- 作用:新生代的最大大小。若新生代的对象超过此值,将触发一次 GC。
- 调优建议:如果新生代经常达到最大值并触发 GC,考虑增加 MaxNewSize 的大小,减少 GC 频率。若频繁发生 GC 可适当降低此值。
6.OldSize = 176160768 (168.0MB)
- 作用:定义老年代(Old Generation)的初始大小。老年代用于存放生命周期较长的对象。
- 调优建议:老年代的大小需要根据应用的老对象的数量调整。增大老年代大小有助于减少 Full GC 的频率,但过大的老年代可能导致 Full GC 的时间较长。
7.NewRatio = 2
- 作用:定义新生代和老年代的大小比例。默认为 2,表示新生代的大小是老年代的 1/3。
- 调优建议:根据应用中年轻对象和老对象的比例调整此参数。若新生代对象较多,可以适当增大 NewSize 和 MaxNewSize,以避免频繁的 GC。
8.SurvivorRatio = 8
- 作用:定义新生代中 Survivor 区域 的比例。默认值为 8,表示将新生代划分为 8 个 Survivor 区域。
- 调优建议:若应用产生大量长生命周期的对象,考虑调整此值,减少 Survivor 区的比例,增大新生代的 Eden 区。
9.MetaspaceSize = 21807104 (20.8MB)
- 作用:定义 Metaspace 的初始大小。Metaspace 用于存储类的元数据(例如类定义和方法信息)。
- 调优建议:如果频繁发生 ClassLoader 加载类,导致 Metaspace 大小变化,可以适当增大 MetaspaceSize。对于需要加载大量类的应用,适当增加该值可以减少 Full GC。
10.CompressedClassSpaceSize = 1073741824 (1024.0MB)
- 作用:定义类压缩空间的大小,专门用于存储类元数据。默认情况下,JVM 会将类数据压缩存储。
- 调优建议:通常不需要调优此值,除非你的应用需要加载大量类,且类数据非常大。此项大小根据应用实际需求进行调整。
11.MaxMetaspaceSize = 17592186044415MB
- 作用:设置 Metaspace 的最大大小。Metaspace 存储类元数据,JVM 会自动扩展它,直到达到最大值。
- 调优建议:如果应用加载了大量的类,可以根据需要调整此参数,避免 Metaspace 的自动扩展导致 Full GC。增加 MaxMetaspaceSize 可以避免 Metaspace 满时发生的 Full GC。
12.G1HeapRegionSize = 0
- 作用:定义 G1 垃圾回收器的堆区域大小。G1 垃圾回收器将堆划分为多个区域来管理内存。
- 调优建议:通常不需要手动设置 G1HeapRegionSize,JVM 会根据堆大小自动计算合理的区域大小。若启用了 G1 GC,可以通过调整该值来优化回收时间和吞吐量,但默认设置通常足够。
工作原理图机制:
读和写基于topic 来写的,底层的数据是存储在分区中(类似es中的分片),当设置一个分区时 数据只能存储在单个节点上,当设置多个分区时可以存储在不同的节点。zookeeper存取kafka的元数据信息
分区数partition: 假设有三个分区
当某个节点的数据丢失了会从别的副本备份(kafka的副本是不可读取的)都叫副本
副本不能进行读 ,只能通过master进行读写操作,
五类api
a 生产者负责数据的写入
b 消费者负责数据的读取
c.zookeeper存取kafka的元数据信息
d.admin api管理kafka集群的
e.stream api是流试处理的api主要是用于大数据开发团队使用的api(spark flume等搭配使用)(kafka集群8000台)
kafka的机制 ISR OSR(每间隔30秒进行一次判断,(leo log offset )对比以下每个日志的offset的偏移量是否和leader相同,若相同是ISR,否则是OSR,对osr isr 列表更新,当下一次检测时,发现offset相同进行同步即可)(最低同步的offset 是高水位线)(kafka有数据丢失的风险) 在从ISR中选择leader时会出现丢失的现象
ISR是和主分片副本同步的集合,OSR是和主分片副本不同步的集合
数据同步步骤:
后续0.8版本后所有的offset都存储在 ——custermer——offset这个topic中 custormer——group
查看topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list
topic的创建以及使用:
一个topic是生产者与消费者 进行逻辑通信的基本单元,底层存储数据是对应一个或者多个分区副本。
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic wade_test --partitions 3 --replication-factor 1 创建分区数为3 副本数为1的一个topic
查看
root@devops-test:/data/wade-test/kafka_2.13-3.2.1/bin# ./kafka-topics.sh --bootstrap-server localhost:9092 --list
wade_test
修改
root@devops-test:/data/wade-test/kafka_2.13-3.2.1/bin# ./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic wade_test --partitions 5
查看详情
root@devops-test:/data/wade-test/kafka_2.13-3.2.1/bin# ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic wade_test
Topic: wade_test TopicId: XSZ9v-xSQvS-oxYh2lp_SQ PartitionCount: 5 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: wade_test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: wade_test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: wade_test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: wade_test Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: wade_test Partition: 4 Leader: 0 Replicas: 0 Isr: 0
分区数只能 调大不能调小,调小的化
创建生产者
kafka-console-producer.sh --bootstrap-server localhost --topicwade-test
root@devops-test:/data/wade-test/kafka_2.13-3.2.1/bin# ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic wade-test
>
>sdfd
>fdsfds
>fdff
>yu
多个分区的数据是无序读取的,如过是有序的则设置为一个分区即可
root@devops-test:/data/wade-test/kafka_2.13-3.2.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wade-test --from-beginning
[2025-02-21 14:45:39,373] WARN [Consumer clientId=console-consumer, groupId=console-consumer-1557] Error while fetching metadata with correlation id 2 : {wade-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
sdfd
fdsfds
fdff
yuo hai
(3)启动消费者加入同一个消费者组
[root@elk101.oldboyedu.com ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wade-test --consumer-property group.id=linux85 --from-beginning(从头开始读)
[root@elk103.oldboyedu.com ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wade-test --consumer.config ./config/consumer.properties --from-beginning
查看group的custormer
root@devops-test:/data/wade-test/kafka_2.13-3.2.1/bin# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
Consumer group 'console-consumer-1557' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-98514 wade-test 0 - 7 - console-consumer-f72c2ec5-522d-4013-b6da-1acd313a2194 /10.41.1.20 console-consumer
2.将多个consumer 集成到一个group组中
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wade-test --consumer-property group.id=wade-test1 --from-beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wade-test --consumer.config ..//config/consumer.properties --from-beginning
(5)观察消费者组的详细信息
[root@elk101.oldboyedu.com ~]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group wade-test
查看topic的consumer的offset的数据
kafka-console-consumer.sh --bootstrap-server localhost1:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning | grep wade_test