参考Kafka 源码解析之 Producer 发送模型(一)
一个简单的生产者
下面的代码是一个简单的生产者向Kafka中发送消息的例子:
1 | import org.apache.kafka.clients.producer.KafkaProducer; |
Kafka提供了生产者API,使用时候需要实例化KafkaProducer,然后调用send方法发送数据。
生产者数据发送流程
下面的流程图展示了消息的发送流程
首先创建一个ProducerRecord
对象,包含了目标主题和要发送的内容。然后数据被发送给序列化器
,将键和值对象序列化成字节数据。然后数据发送给分区器
,如果ProducerRecord里面指定了分区,分区器不做任何操作,否则根据ProducerRecord对象的键来选择一个分区。然后上面的记录被追加到一个记录批次
里面,有一个独立的线程将记录批次发送到相应的broker上。
实例化了生产者对象之后,调用send方法发送数据
1 | // 异步向Topic发送数据 |
doSend的具体实现
1 | /** |
上面doSend
方法中,发送数据总共分5步:
- 确认数据要发送到的 topic 的 metadata 是可用的
- 序列化key和value
- 获取消息的partition值
- 向累加器中追加数据,数据先进行缓存
- 如果batch满了,唤醒sender线程发送数据
序列化
生产者端对数据的key和value进行序列化操作,消费者端再进行相应的反序列化操作,下面是Kafka提供的序列化器和反序列化器
对于自带的序列化器不能满足需求的情况,可以使用例如Avro、Thrift或者Protobuf等序列化框架或者使用自定义的序列化器。
下面使用Avro序列化记录
首先使用Avro命令生成自定义对象,.avsc文件通过JSON来描述数据的schema
1 | { |
生成自定义的数据对象后,将schema保存在注册表中,这样消费者在读取数据的时候就知道用什么schema来反序列化记录。
下面是使用avro的KafkaAvroSerializer来序列化对象,需要制定schema注册表的位置http://127.0.0.1:8081
1 | import io.confluent.kafka.serializers.KafkaAvroSerializer; |
更具体的Avro序列化,查看Avro的相关文档。
获取分区
看一下partition方法的实现:
1 | private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { |
如果record指定了分区则指定的分区会被使用,如果没有则使用partitioner分区器来选择分区
1 | public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { |
向累加器追加数据
生产者向累加器中追加数据,当batch满了,唤醒sender线程发送数据
Producer 通过RecordAccumulator
实例追加数据,每个 TopicPartition 都会对应一个 Deque
1 | public RecordAppendResult append(TopicPartition tp, |
- 获取该 topic-partition 对应的 queue,没有的话会创建一个空的 queue;
- 向 queue 中追加数据,先获取 queue 中最新加入的那个 RecordBatch,如果不存在或者存在但剩余空余不足以添加本条 record 则返回 null,成功写入的话直接返回结果,写入成功;
- 如果不存在现有的RecordBatch,创建一个新的 RecordBatch,初始化内存大小根据
max(batch.size, Records.LOG_OVERHEAD + Record.recordSize(key, value))
来确定(防止单条 record 过大的情况); - 向新建的 RecordBatch 写入 record,并将 RecordBatch 添加到 queue 中,返回结果,写入成功。
发送RecordBatch到Broker
当 record 写入成功后,如果发现 RecordBatch 已满足发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒 sender 线程,发送 RecordBatch
1 | private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) { |