实时流计算系统设计与实现之数据传输-ApacheKafka

实时流计算系统设计与实现之数据传输-ApacheKafka

精选文章moguli202025-02-06 11:31:0013A+A-

Apache Kafka

终于到了专门讲解Apache Kafka的章节了,笔者竟然还有一点点儿小激动,这是因为Kafka确实是笔者喜欢的软件之一(其他还包括Git、Linux、Java、Python、Redis、Hadoop、Flink、Ignite等)。

Kafka是由LinkedIn开源的一个用于管理和处理流式数据的发布/订阅消息系统。它具备超高性能、分布式、错误容忍等优良特性,非常适合用于实时传输流式大数据。可以说,Kafka是我们构建流计算系统的必备利器。

相比其他消息中间件,Kafka最初的设计目标不仅仅是一个普通的消息中间件,它更被设计为一种全新的数据管理方式。Kafka可以直接以流的方式来存储、查询和管理数据。想想在Kafka横空出世之前,当需要存储流式业务数据时,需要用诸如Flume这样的日志收集工具先将消息从消息中间件中拉取出来,然后将消息写入数据库或文件系统。

而现在,业务数据可以直接以数据流的方式原封不动地保存在Kafka中,如无必要,无须再将其转储到其他数据存储系统。Kafka这种新颖的流数据管理方式极大地简化了管理流数据的复杂度,并且具有许多优良的功能。例如,Kafka的实现天然就是分布式架构,支持消息持久化和多副本存储,并且可以灵活地横向扩展等。

Kafka架构

Kafka由ZooKeeper集群和若干broker(代理)节点组成。典型的Kafka应用还包括若干消息生产者和消息消费者。

ZooKeeper是一个分布式系统协调器。协调器这个名字可能不太好理解,但是可以借助进程内的锁来理解。例如,在JVM内部,我们可以使用可重入锁(ReentrantLock)来协调多个线程对相同资源的安全访问。而在分布式环境下,进程内的锁不再可用,于是需要使用类似于ZooKeeper这样的分布式系统协调器来同步多个节点上进程对相同资源的安全访问。Kafka正是使用ZooKeeper来协调集群中的各种角色实例及存储集群中的各种元数据信息的。

broker代表组成Kafka集群的每台Kafka服务器。由于Kafka是发布/订阅模式的消息中间件,因此生产者在发送消息前,必须创建消息的主题(topic)。Kafka的主题由一个或多个分区(partition)组成,这些分区分布在各个broker服务器上。由于高可用设计的原因,每个分区还可以设置为一个或多个副本(replica),并且同一个broker服务器上最多部署该分区的一个副本。正因为如此,当Kafka中的某个broker服务器发生故障或停机时,只要一个主题设置的分区副本数量比宕机的broker服务器数量多,理论上该主题(topic)的消息就不会丢失。

图8-4展示了Kafka的工作原理。当消息生产者往某个主题发送消息时,消息被发送到该主题的某个分区,并最终写入该分区的每一个副本。Kafka一个设计非常巧妙的地方是,broker并不会“认真”记录一条特定的消息是否被消费了,而是用偏移量来“笼统”记录分区中消息的生产情况和消费情况,这为消息的重放或跳跃带来极大方便。任何时候只需要修改消费者记录的偏移量,就可以让消费者重新消费已经消费过的消息,或者跳过一些积压太多从而放弃处理的消息。

对于存储在broker上的消息,都会设置一个超期策略,可以是按时间超期或按数量超期,超期的消息会被淘汰。这是一个非常好的功能,可以避免磁盘无休止地写下去,最终将磁盘写满。事实上,笔者认为,不仅仅是流数据,任何类型的线上数据都不会是永远有效的。这有两点含义:一方面提醒我们在设计数据存储系统(如写日志、存数据库等)时,务必设置一个超期淘汰机制;另一方面,提醒我们在为业务设计实体关系模型时,应该认真考虑数据在业务意义下的真实有效时间。可以说,缺乏数据实效考虑的系统,确定、一定以及肯定会运行得不长久!

Kafka生产者

通过Kafka提供的生产者相关API,可以将消息发送给Kafka。整体而言,Kafka生产者API使用起来还是比较简单的。首先创建一个KafkaProducer对象,然后根据需要发送消息的主题、内容及一个可选的分区键值,创建一个ProducerRecord对象,之后就可以通过KafkaProducer对象的send方法,以同步或异步的方式将消息发送出去了。下面是按照这个过程发送消息的示例代码。

首先,创建KafkaProducer对象:

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("acks", "all");

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("buffer.memory", 33554432);

props.put("compression.type", "gzip");

props.put("retries", 1);

props.put("max.in.flight.requests.per.connection", 2);

props.put("key.serializer", "org.apache.kafka.common.serialization.

StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.

StringSerializer");

Producer producer = new KafkaProducer(props);

KafkaProducer创建好后就可以用于发送消息了:

String productId = String.format("product_%d", RandomUtils.nextInt(0,

productNumber));

String event = JSONObject.toJSONString(new Event(productId,

System.currentTimeMillis()));

Future future = producer.send(new ProducerRecord<>(topic,

productId, event),

new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception == null) {

logger.info(String.format("succeed to send event[%s]", event));

} else {

logger.error(String.format("failed to send event[%s]", event)); }

}

});

// future.get() // 同步发送方式

在上面的代码中,我们首先创建了一个代表将被发送消息的ProducerRecord对象。然后,将其用KafkaProducer的send方法发送出去。KafkaProducer的send方法是一个异步方法,它将消息添加到消息发送缓冲区后就立刻返回。这种异步设计允许KafkaProducer在系统内部批次发送消息,从而提高消息发送的效率。如果需要在客户端确认消息发送是否成功,则可以采用future的get方法,等到get方法返回或抛出异常,就可以知道消息是发送成功还是发送失败了。

虽然Kafka性能卓越,提供的API也简单、易用,但是使用时还是要根据具体的使用场景来调整KafkaProducer的配置参数,否则Kafka的性能优势就容易发挥不出来,甚至不符合业务对数据的要求。例如,在金融交易系统中,消息丢失或消息重复是不允许的,可以接受的延迟最大为500毫秒,而且系统对吞吐量要求较高,希望每秒钟可以处理一百万个消息。而在分析广告点击的场景中,允许丢失少量的消息或出现少量的消息重复,延迟可以大一些,只要不影响用户体验即可。不同的使用场景对生产者API的使用和配置会有直接的影响。如果生产者API使用不当,则程序性能还会极大地受到影响,导致性能不尽如人意。

在配置KafkaProducer时,我们需要考虑以下4个方面。

1.消息的可靠性

ACK是消息在被认为“已提交”之前,生产者需要leader确认的请求应答数,目前ACK有3个取值。当acks设置为0时,KafkaProducer发送请求后不需要等待broker的确认信号就立马返回,此时KafkaProducer发送消息的速度最快、吞吐率最高,但是由于根本不管Kafka服务器是否正确接收了消息,所以它不能保证消息全都发送成功。

当acks设置为-1时,KafkaProducer发送请求后必须等待所有副本的确认信号才能返回,此时KafkaProducer发送消息的可靠性最高,但速度最慢、吞吐率最低。当acks设置为1时,KafkaProducer发送请求后必须等待leader副本的确认信号才能返回。很显然,这是一种在消息的可靠性和发送速度之间的平衡方案。

2.同步或异步

笔者曾一度认为ACK是控制消息同步还是异步发送的参数,后来在一次生产性能事故中,才发现自己对这个参数的理解有误。在那次事故中,本来KafkaProducer的acks设置为1,笔者认为这种配置下KafkaProducer的发送方式为异步的,后来添加producer.type为async的设置后,程序发送消息的性能大幅提升。经过认真思考后,笔者才明白,异步发送的目的是收集消息后批次发送,从而提升消息的发送效率,但这并不代表发送线程完全不理会消息是否发送成功。在KafkaProducer的发送线程中,当消息发送失败时,依旧需要重试并尽可能让消息发送成功。如果最终消息真的发送失败,那么在KafkaProducer.send()函数返回的Future对象中,要么抛出异常,要么由回调函数进行失败处理。

总之,它不会对消息是否可靠发送到Kafka不管不顾,这绝不是“异步”的副作用,只是说acks的设置会影响发送线程对消息是否发送成功的判断而已。例如,当acks为0时,发送线程发送消息时总是会显示成功。而当acks为-1时,只要有一个副本没有写入成功,那么发送线程发送消息就会失败,这个时候,发送线程要么会重发消息,要么会进行失败处理。总体来说,异步发送方式会极大地提高消息发送的性能,会提高消息发送的时延,但是不会影响消息发送的可靠性。

3.批次发送

将消息收集到一起后,由固定的几个发送线程专门按批次发送消息,一方面可以减少过多I/O线程的切换及出入操作系统内核态的次数,另一方面会减小均摊在每条消息上的非有效数据开销,从而整体提高消息发送的吞吐能力。KafkaProducer的batch.size参数可以控制批次发送的消息数量,而lingger.ms参数则可以控制收集消息的时间。当收集消息达到一定数量或者时间达到设定值时,这批消息就会被一次性发送给Kafka。整体而言,batch.size越大,吞吐能力越强,但是发送时延会增加,可能会导致消费者在一段时间内没有消息可以处理,而lingger.ms则控制了消息发送的最大时延。所以,需要根据实际使用场景和生产流量情况做好batch.size和lingger.ms之间的平衡。

4.压缩

KafkaProducer能够对发送的消息进行压缩,然后由消费者接收并对其进行解压。压缩的过程会带来一定的CPU开销,但是压缩有两个好处,一是减少消息发送时的网络流量,二是减少消息占用的磁盘空间。对于规模比较大的消息,可以对消息进行压缩。不过Kafka支持压缩并不表示Kafka适用于传输大文件,大文件的传输通过诸如HDFS这样的分布式文件系统来实现比较好,毕竟Kafka本质上还是一个消息中间件而已。

Kafka消费者

在开始演示Kafka消费者从Kafka读取消息前,我们需要首先理解Kafka消费者相关的几个概念,以帮助我们理解Kafka是如何实现发布/订阅模式的。

1.消费者和消费者组

在前面的章节中,我们已经知道,消息中间件点对点模式和发布/订阅模式最大的区别是后者在消费消息时,同一消息能够被多个消费者消费。而为了提高消费者处理消息的能力,还可以允许多个消费者共同处理同一主题的消息。为了同时实现这两个目标,Kafka创造性地引入“消费者组”这一概念。同一主题的消息能够被多个消费者组消费,各个消费者组相互独立,互不影响。但在同一个消费组里的消费者,则齐心协力共同处理同一主题下的消息,当一个消息被一个消费者认领后,同一个消费者组里的其他消费者就不再认领该消息,这样就保证了能够横向扩展并行处理的消费者数量。

2.消费者和分区

Kafka主题中的数据在具体存储时,又分成了若干个分区。在同一个消费者组内,任何一个分区在同一时刻都只允许有一个消费者负责读取该分区中的消息。所以,如果一个主题有3个分区,而消费组内有6个消费者,则只有3个消费者能够读取消息。当这3个消费者其中之一退出时,就会从另外3个消费者中选择一个接替退出的那个消费者继续读取分区的消息。图8-5说明了消费者和分区之间的各种关系。


下面我们来演示Kafka消费者从Kafka中读取消息的过程。

首先创建KafkaConsumer对象。

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("group.id", " KafkaConsumerExample");

props.put("auto.offset.reset", "latest");

props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserial

izer");

props.put("value.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<>(props);

然后让KafkaConsumer订阅某些主题。consumer.subscribe(Arrays.asList(topic));

最后,KafkaConsumer从主题中消费消息。

while (true) {

ConsumerRecords records = consumer.poll(100);

for (ConsumerRecord record : records) {

logger.info(String.format("receive key[%s], event[%s]", record.key(),

record.value()));

}

}

与KafkaProducer类似,KafkaConsumer也只有在合理配置之后才能发挥出其最佳的性能。主要的考虑因素有以下3点。

·消费者组内消费者的数量。影响KafkaConsumer性能最重要的因素是消费者组内消费者的数量。由于一个分区只能被同一消费者组内的一个消费者读取,而一个消费者可以读多个分区的数据,所以配置超过分区数的消费者数量并不会提升主题中消息处理的速度。

·fetch.min.bytes 和fetch.max.wait.ms,这两个属性的作用与KafkaProducer的batch.size和lingger.ms的作用类似。它们分别决定了消费者一次读取消息的条数及最多等待Kafka broker将数据收集全的时间。当消息不足fetch.min.bytes 定义的字节数,而时间达到fetch.max.wait.ms 时,broker 会将已经收集到的消息一次性返回给消费者。很明显,这种设计也是为了减少I/O 次数,提高每次消息有效载荷,从而提高消费者读取消息的性能。

·checkpoint时间间隔。消费者侧有保证消息可靠性读取的机制。这就是
replica.high.watermark.checkpoint.interval.ms参数的功能。当从分区读取出消息后,可以将本次读取消息的偏移量提交到ZooKeeper保存下来。当后续因为处理失败等原因,需要重新处理消息时,直接跳回标记点重新读取消息即可。如果每次都设置一个checkpoint,那么我们将永远不会丢失消息,但是这样做会明显地影响消费者性能。如果我们隔一段时间或对一定数量的消息数设置checkpoint,就可以在性能和可靠性之间获得一个合适的平衡点。

将Kafka用于数据总线

前面讲了很多Kafka技术细节的内容,那在我们实际构建实时流计算系统时应该怎样定位Kafka呢?笔者认为可以将Kafka定位为数据总线。之所以这样定位,主要考虑到Kafka的超高吞吐率、高可水平扩展性及能够直接高可靠地存储流式数据这三大特点。另外,Kafka的主题按分区存储及消费者组概念的引入,使编写高并发和高性能的系统非常便捷;而使用偏移量来任意重放消息或跳过积压消息的功能,对运维和开发人员实在是太贴心了。可以说,Kafka不愧是专门为大数据时代设计的数据总线。

在前面章节提到的风控系统中,我们就是使用Kafka在各个子系统中进行流数据的传递的。但更一般的情况是,Kafka作为整个数据系统的数据总线,为所有需要对接数据系统的子系统提供同一的数据接口。

图8-6描述了Kafka作为数据总线的场景,Kafka将日志、监控、交易、互联网、物联网等各种来源的数据整合起来,然后将这些数据交由各种离线、在线、实时的数据处理工具进行处理和分析,最后将原始数据或处理结果存储到各种数据存储系统。



点击这里复制本文地址 以上内容由莫古技术网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

莫古技术网 © All Rights Reserved.  滇ICP备2024046894号-2