import confluent_kafka
import time
topic = 'confluent-kafka-topic'
def confluent_kafka_producer_performance():
topic = 'confluent-kafka-topic'
conf = {'bootstrap.servers': '192.168.65.130:9092'}
producer = confluent_kafka.Producer(**conf)
messages_to_retry = 0
msg_payload = 'This is message'
producer_start = time.time()
for i in range(10):
try:
producer.produce(topic, value=msg_payload)
print(msg_payload)
except BufferError as e:
messages_to_retry += 1
# hacky retry messages that over filled the local buffer
for i in range(messages_to_retry):
producer.poll(0)
try:
producer.produce(topic, value=msg_payload)
except BufferError as e:
producer.poll(0)
producer.produce(topic, value=msg_payload)
producer.flush()
return time.time() - producer_start
if __name__ == "__main__":
time_span = confluent_kafka_producer_performance()
print(time_span)
import confluent_kafka
import uuid
import time
def confluent_kafka_consumer_performance():
topic = 'confluent-kafka-topic'
msg_consumed_count = 0
conf = {'bootstrap.servers': '192.168.65.130:9092',
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
}
consumer = confluent_kafka.Consumer(**conf)
consumer_start = time.time()
# This is the same as pykafka, subscribing to a topic will start a background thread
consumer.subscribe([topic])
while True:
msg = consumer.poll(1)
if msg:
msg_consumed_count += 1
print(msg)
if msg_consumed_count >= 10:
break
consumer_timing = time.time() - consumer_start
consumer.close()
return consumer_timing
if __name__ == "__main__":
time_span = confluent_kafka_consumer_performance()
print(time_span)
import confluent_kafka
conf = {'bootstrap.servers':'192.168.65.130:9092'}
producer = confluent_kafka.Producer(**conf)
producer.produce('con_1',key ='key',value='part_0', partition=0)
producer.poll(1)
import confluent_kafka
import uuid
conf = {'bootstrap.servers':'192.168.65.130:9092', 'group.id':uuid.uuid1()}
consumer = confluent_kafka.Consumer(**conf)
tp1=confluent_kafka.TopicPartition('con_1', 0)
consumer.assign([tp1])
msg = consumer.poll(1)
msg.key()
msg.value()