kafka使用protobuf序列化


在使用kafka,producer可以使用protobuf进行数据的序列化,consumer可以使用protobuf反序列化。

message

如定义如下message:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
syntax = "proto3";

message Person {
string name = 1;
int32 age = 2;
repeated string email = 3;

enum Gender {
MALE = 0;
FEMALE = 1;
}
Gender gender = 4;

message Address {
string country = 1;
string city = 2;
}
Address address = 5;

message PhoneNumber {
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
string number = 1;
PhoneType type = 2;
}
repeated PhoneNumber phone_number = 6;
}

使用protobuf编译器编译该文件,编译成python语言:

protoc --python_out=. person.proto

编译成功后,生成person_pb2.py文件。

producer

如下代码,在aiokafka使用protobuf进行序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
producer
"""

__author__ = 'heqingliang'


import asyncio
import person_pb2
from aiokafka import AIOKafkaProducer


def serializer(value):
return value.SerializeToString()


def make_person_message():
person = person_pb2.Person()
person.name = 'lisa'
person.age = 18
person.email.append("test@gmail.com")
person.email.append("test@qq.com")
person.email.append("test@163.com")
person.gender = person_pb2.Person.FEMALE
person.address.country = 'China'
person.address.city = 'GuangZhou'
mobile_phone_number = person.phone_number.add()
mobile_phone_number.number = '136********'
mobile_phone_number.type = person_pb2.Person.PhoneNumber.MOBILE
home_phone_number = person.phone_number.add()
home_phone_number.number = '020-*******'
home_phone_number.type = person_pb2.Person.PhoneNumber.HOME
work_phone_number = person.phone_number.add()
work_phone_number.number = '159********'
work_phone_number.type = person_pb2.Person.PhoneNumber.WORK
return person


async def send_message(loop):
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='192.168.1.134:9092', acks='all',
value_serializer=serializer)
await producer.start()
try:
await producer.send("person_topic", key=b'person',
value=make_person_message()
)
except Exception:
raise
finally:
await producer.stop()


def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(send_message(loop))


if __name__ == '__main__':
main()

consumer

如下代码,在aiokafka使用protobuf进行f反序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
customer
"""

__author__ = 'heqingliang'


import asyncio
import person_pb2
from aiokafka import AIOKafkaConsumer


def deserializer(value):
person = person_pb2.Person()
person.ParseFromString(value)
return person


async def consume(loop):
consumer = AIOKafkaConsumer("person_topic", loop=loop,
bootstrap_servers='192.168.1.134:9092',
group_id='person_topic_id',
value_deserializer=deserializer,
auto_offset_reset='earliest'
)
await consumer.start()
try:
async for msg in consumer:
print(msg.value)
finally:
await consumer.stop()


def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(consume(loop))


if __name__ == '__main__':
main()

运行程序

python3 producer_person.py 
python3 consumer_person.py

输出如下结果:

name: "lisa"
age: 18
email: "test@gmail.com"
email: "test@qq.com"
email: "test@163.com"
gender: FEMALE
address {
country: "China"
city: "GuangZhou"
}
phone_number {
number: "136********"
}
phone_number {
number: "020-*******"
type: HOME
}
phone_number {
number: "159********"
type: WORK
}