mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-11 10:22:13 +08:00
67 lines
1.7 KiB
Markdown
67 lines
1.7 KiB
Markdown
# Python客户端 接入 Kafka
|
|
|
|
## 安装
|
|
|
|
```shell
|
|
# python kafka客户端采用kafka-python(1.4.6)及以上
|
|
pip install kafka-python
|
|
|
|
#使用lz4压缩需要
|
|
pip install lz4
|
|
|
|
#使用snappy压缩需要
|
|
pip install snappy
|
|
pip install python-snappy
|
|
```
|
|
|
|
## 发送例子
|
|
|
|
```python
|
|
from kafka import KafkaConsumer,KafkaProducer
|
|
import logging
|
|
import time
|
|
import json
|
|
#logging.basicConfig(level=logging.DEBUG)
|
|
BOOTSTRAP_SERVERS='127.0.0.1:9093'
|
|
TOPIC='test0'
|
|
|
|
producer=KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS,
|
|
compression_type="lz4",
|
|
security_protocol="SASL_PLAINTEXT",
|
|
sasl_mechanism="PLAIN",
|
|
sasl_plain_username='95.appId_000001', //clusterId.appId
|
|
sasl_plain_password='12345'
|
|
)
|
|
for i in range(10):
|
|
producer.send(TOPIC, bytes("Hello World".encode('utf-8')))
|
|
producer.flush()
|
|
```
|
|
|
|
|
|
## 消费例子
|
|
|
|
```python
|
|
from kafka import KafkaConsumer,KafkaProducer
|
|
import logging
|
|
import time
|
|
import json
|
|
#logging.basicConfig(level=logging.DEBUG)
|
|
BOOTSTRAP_SERVERS='127.0.0.1:9093'
|
|
TOPIC='test0'
|
|
consumer = KafkaConsumer(TOPIC,
|
|
group_id = 'test_group'
|
|
bootstrap_servers=BOOTSTRAP_SERVERS,
|
|
auto_offset_reset='earliest',
|
|
security_protocol='SASL_PLAINTEXT',
|
|
sasl_mechanism='PLAIN',
|
|
sasl_plain_username='95.appId_000001', //clusterId.appId
|
|
sasl_plain_password='12345',
|
|
receive_buffer_bytes=1024,
|
|
enable_auto_commit='False')
|
|
|
|
|
|
for msg in consumer:
|
|
print(msg)
|
|
|
|
consumer.commit()
|
|
``` |