Kafka入门

Kafka是一个分布式消息发布订阅系统,Kafka可以用于以下场景:

  • 消息系统, 例如ActiveMQ 和 RabbitMQ.
  • 站点的用户活动追踪。 用来记录用户的页面浏览,搜索,点击等。
  • 操作审计。 用户/管理员的网站操作的监控。
  • 日志聚合。收集数据,集中处理。
  • 流处理等

流处理平台

基本概念

  • Producer:消息生产者,就是向kafka broker发消息的客户端。
  • Consumer:消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
  • Topic:主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
  • Partition:消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

    note: 一个topic的一个partition只能被一个consumer group中的一个consumer消费,多个consumer消费同一个partition中的数据是不允许的,但是一个consumer可以消费多个partition中的数据

partition

  • Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Consumer Group:消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

快速实践Kafka

下载kafka和zookeeper的安装包,解压,配置环境变量,启动zookeeper服务,启动kafka守护进程

新建一个topic

使用kafka自带的命令行工具kafka-topic.sh创建一个新的topic

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

发送一个消息到topic

1
2
3
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

读取刚创建的消息

1
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

可以看到刚才创建的两条消息被输出来了。

使用python的Kafka客户端实现生产者消费者

实现上面命令行的生产者消费者的例子:

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
from kafka import KafkaProducer
import time

producer = KafkaProducer(bootstrap_servers='localhost:9092')

i = 0
while True:
ts = int(time.time() * 1000)
producer.send(topic="myfirsttopic", value=bytes(str(i).encode('utf-8')), timestamp_ms=ts)
producer.flush()
print(i)
i += 1
time.sleep(1)

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from kafka import KafkaConsumer

KAFKA_HOST = 'localhost'
KAFKA_PORT = 9092
KAFKA_TOPIC = 'myfirsttopic'

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers='localhost:9092'
)

try:
for message in consumer:
print(message)
except KeyboardInterrupt:
print("Catch keyboard interrupt")

可以看到,生产者产生的消息被消费者消费了

producer_consumer_python1

追踪偏移量offset

kafka允许consumer将当前消费的消息的offset提交到kafka中,这样如果consumer因异常退出后,下次启动仍然可以从上次记录的offset开始向后继续消费消息。

修改一下刚才的consumer代码,把enable_auto_commit设为false,让应用程序决定何时提交偏移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

KAFKA_HOST = 'localhost'
KAFKA_PORT = 9092
kafka_topic = TopicPartition("myfirsttopic", 0)

consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id="testgroup",
auto_offset_reset="earliest",
enable_auto_commit=False
)

consumer.assign([tp])
print("Start offset is: " + str(consumer.position(tp)))

try:
for message in consumer:
print(message)
consumer.seek(tp, message.offset+1)
consumer.commit()
except KeyboardInterrupt:
print("Catch keyboard interrupt")

在offset为249时候终止消费者,然后再次启动消费者,可以看出是从上次停止的地方(250)继续消费:

offset

默认情况下auto.commit.enable等于true,这也就意味着consumer会定期的commit offset

Kafka消息发送分区选择

Kafka发送消息的流程图如下:

sendMsg

那么,发送消息的时候,是如何选择分区的呢?KafkaProducer对象通过send方法,将记录发给kafka

1
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));

在KafkaProducer中,是通过内部的私有方法doSend来发送消息

1
int partition = this.partition(record, serializedKey, serializedValue, cluster);

看一下partition方法的实现:

1
2
3
4
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition.intValue() : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

如果record指定了分区则指定的分区会被使用,如果没有则使用partitioner分区器来选择分区

partitioner的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}

return counter.getAndIncrement();
}

如果key为null,则先根据topic名获取上次计算分区时使用的一个整数并加一。然后判断topic的可用分区数是否大于0,如果大于0则使用获取的nextValue的值和可用分区数进行取模操作。 如果topic的可用分区数小于等于0,则用获取的nextValue的值和总分区数进行取模操作。

如果消息的key不为null,就根据hash算法murmur2就算出key的hash值,然后和分区数进行取模运算。

Kafka Connect

Kafka Connect 是Kafka 的一部分,它为在Kafka 和外部数据存储系统之间移动数据提供了一种可靠且可伸缩的方式。

KafkaConnect

Kafka Connnect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。

Kafka Connect简单实例

下面这个例子使用Kafka Connect将数据从文件导入到Kafka,然后再将Topic中数据导入到文件中

创建文件,写入测试数据

1
2
cd /Users/lvshuo/bigdata/kafka
echo "Hello World" > test.txt

topic的偏移量存储在/tmp/connect.offsets这个文件中,在config/connect-standalone.properties配置,每次connect启动的时候会根据connector的name获得topic偏移量,然后在继续读取或者写入数据

下面配置source和sink:一个用于将文件数据导入Kafka,一个用于将Topic数据导出到文件,下面是配置文件connect-console-source.properties的内容:

1
2
3
4
name=local-console-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
topic=connect-test

connect-console-sink.properties文件内容:

1
2
3
4
5
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=connect-test
file=test.sink.txt

启动两个单点的connector

1
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

可以看到生成了test.sink.txt的文件

查看一下connect-test主题中的数据

更多的Connector可以去confluent平台去看。

参考

坚持原创技术分享,您的支持将鼓励我继续创作!