Kafka Producer和Consumer客户端


Kafka为不同的编程语言提供了Producer和Consumer编程的API,下面以JAVA语言为例。

Producer客户端

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.test.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerApp {
public Properties initProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.129:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

return props;
}

public static void main(String[] args) {
ProducerApp app = new ProducerApp();

Producer<String, String> producer = new KafkaProducer<>(
app.initProperties());
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic",
Integer.toString(i), "message" + i));
}

producer.close();
}
}

上面的代码,首先创建了一个KafkaProducer需要用到的配置类。

bootstrap.servers:表示连接Kafka的broker,它将自动发现集群中的其余的borker,也可以指定多个broker,防止有服务器故障。

ack=all:表示Leader将等待所有副本同步后应答消息,此配置保障消息不会丢失,all将会阻塞消息,这种设置性能最低,但是是最可靠的。

retries:如果请求失败,生产者会自动重试,这里指定是0次,如果启用重试,则会有重复消息的可能性。

batch.size:producer会缓存每个分区未发送消息。缓存的大小是通过batch.size配置指定的。

linger.ms:指示生产者发送请求之前等待一段时间,这样可以减少网络请求,一次性可以发送更多的消息,这类似于TCP的Nagle算法。默认缓冲可立即发送,即便缓冲空间还没有满。

buffer.memory: 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException

key.serializervalue.serializer: 表示将用户提供的key和value对象ProducerRecord转换成字节。

send():异步发送一条消息到Topic,一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样可以并行发送多条消息而不阻塞去等待每一条消息的响应。

由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。

如果要模拟一个简单的阻塞调用,可以调用get()方法:

1
2
producer.send(new ProducerRecord<String, String>("my-topic",
Integer.toString(i), "message" + i)).get();

在无阻塞的情况下,可以利用回调参数提供的请求完成时将调用的回调通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
   for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"my-topic", Integer.toString(i), "message" + i);

producer.send(record, new Callback() {

@Override
public void onCompletion(RecordMetadata metaData, Exception e) {
if (e != null) {
e.printStackTrace();
}

System.out.println("The offset of the record just sent is "
+ metaData.offset());
}
});
}

callback一般在生产者的I/O线程中执行,执行的速度要快,不适宜执行阻塞和耗时的计算操作,否则将延迟其他的线程的消息发送。

发送到同一个分区的消息回调保证按一定的顺序执行,如上面的输出,第一个callback保证在第二个之前执行:

The offset of the record just sent is 90
The offset of the record just sent is 91
The offset of the record just sent is 92
The offset of the record just sent is 93
The offset of the record just sent is 94
The offset of the record just sent is 95
The offset of the record just sent is 96
The offset of the record just sent is 97
The offset of the record just sent is 98
The offset of the record just sent is 99

Consumer客户端

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.test.kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerApp {
private Properties initProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.129:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

return props;
}

public static void main(String[] args) {
ConsumerApp app = new ConsumerApp();

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
app.initProperties());
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
}
}

group.id:表示消费者组,相同group.id的消费者将视为同一个消费者组。

Kafka将已订阅topic的消息发送到每个消费者组中。并通过平衡分区在消费者分组中所有成员之间来达到平均。因此每个分区恰好地分配1个消费者(一个消费者组中)。如果一个topic有4个分区,并且一个消费者分组只有2个消费者。那么每个消费者将消费2个分区。

如果一个消费者故障。分配给它的分区将重新分配给同一个分组中其他的消费者。同样的,如果一个新的消费者加入到分组,将从现有消费者中移一个给它。这被称为重新平衡分组。

broker通过心跳自动机制检测test组中失败的进程,消费者会自动ping集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过session.timeout.ms,那么就会认为是故障的,它的分区将被分配到别的进程。

消费可能遇到“活锁”的情况,它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分区,可以使用max.poll.interval.ms活跃检测机制。 在此基础上,如果调用的poll的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。

消费者提供两个配置设置来控制poll循环:

max.poll.interval.ms:增大poll的间隔,可以为消费者提供更多的时间去处理返回的消息。缺点是此值越大将会延迟组重新平衡。

max.poll.records:此设置限制每次调用poll返回的消息数。通过调整此值,可以减少poll间隔,减少重新平衡分组的

enable.auto.commit表示自动提交偏移量,偏移量由auto.commit.interval.ms控制自动提交的频率。

上面的程序输出如下:

offset = 90, key = 90, value = message90
offset = 91, key = 91, value = message91
offset = 92, key = 92, value = message92
offset = 93, key = 93, value = message93
offset = 94, key = 94, value = message94
offset = 95, key = 95, value = message95
offset = 96, key = 96, value = message96
offset = 97, key = 97, value = message97
offset = 98, key = 98, value = message98
offset = 99, key = 99, value = message99

自动提交也偏移量要求必须下次调用poll之前或关闭消费者之前,处理完所有返回的数据。如果操作失败,这将会导致已提交的offset超过消费的位置,从而导致丢失消息。偏移量可以自动定时提交,还可以手动提交。如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.test.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerApp {
private Properties initProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.130:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

return props;
}

private static void insertIntoDb(
List<ConsumerRecord<String, String>> records) {
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"insertIntoDb: offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}

public static void main(String[] args) {
ConsumerApp app = new ConsumerApp();

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
app.initProperties());
consumer.subscribe(Arrays.asList("my-topic"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}

if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
}
}

在这个例子中,消费者将一批消息存储在内存中。当积累足够多的消息后,再进行处理,如批量的插入到数据库中。如果设置offset自动提交,消费将被认为是已消费的。这样会出现问题,进程可能在批处理记录之后,但在它们被插入到数据库之前失败了。

但是在插入数据库之后,在提交偏移量之前。在这种情况下,进程将获取到已提交的偏移量,并会重复插入的最后一批数据。这种方式就是所谓的“至少一次”保证,在故障情况下,可以重复。

运行上面的程序,输出如下:

insertIntoDb: offset = 490, key = 90, value = message90
insertIntoDb: offset = 491, key = 91, value = message91
insertIntoDb: offset = 492, key = 92, value = message92
insertIntoDb: offset = 493, key = 93, value = message93
insertIntoDb: offset = 494, key = 94, value = message94
insertIntoDb: offset = 495, key = 95, value = message95
insertIntoDb: offset = 496, key = 96, value = message96
insertIntoDb: offset = 497, key = 97, value = message97
insertIntoDb: offset = 498, key = 98, value = message98
insertIntoDb: offset = 499, key = 99, value = message99

上面的程序的commitSync表示所有收到的消息为”已提交”,在某些情况下,如果希望更精细的控制,通过指定一个明确消息的偏移量为“已提交”。下面的程序,在处理完每个分区中的消息后,提交偏移量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);

for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records
.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(
partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(lastOffset + 1)));
}
}

已提交的offset应始终是的程序将读取的下一条消息的offset。因此,调用commitSync时,应该加1个到最后处理的消息的offset。