2 minutes
Commandes utiles Kafka
Voici une liste de commandes utiles pour utiliser Kafka. Il y a pas mal de jargon dans Kafka, je vous renvoie à l’article suivant qui explique pas mal de choses.
Lister les groupes de consommateurs (Consumer Groups):
docker run wurstmeister/kafka /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
Décrire/ Obtenir des informations sur un Consumer Group:
docker run wurstmeister/kafka /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group id1 --describe
Créer un topic:
docker exec -it wurstmeister/kafka sh -c "JMX_PORT=10001 /opt/kafka/bin/kafka-topics.sh --create --topic topic --replication-factor 1 --partitions 1 --zookeeper zookeeper:2181"
Envoyer des messages:
docker exec -it wurstmeister/kafka sh -c "JMX_PORT=10001 /opt/kafka/bin/kafka-verifiable-producer.sh --topic topic --max-messages 200000 --broker-list localhost:9092"
Consumer en mode console:
docker exec -it wurstmeister/kafka sh -c "JMX_PORT=10001 /opt/kafka/bin/kafka-console-consumer.sh --topic topic --bootstrap-server host:9092"
docker run -it wurstmeister/kafka -c "JMX_PORT=10001 /opt/kafka/bin/kafka-console-consumer.sh --topic topic --bootstrap-server host:9092"
docker run --entrypoint=/opt/kafka/bin/kafka-console-consumer.sh wurstmeister/kafka --topic topic --bootstrap-server host:9092
Consumer en Python:
#!/usr/bin/env python
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='kafka1:9092',
group_id=None,
auto_offset_reset='earliest')
consumer.subscribe(['logs-app1'])
for msg in consumer:
print(msg)
Consumer Kafkacat:
docker run -it confluentinc/cp-kafkacat kafkacat -b host:9092 -t topic -o beginning -v
Mettre de la rétention sur certains topics:
docker exec -it kafka /opt/kafka/bin/kafka-configs.sh --zookeeper host:2181 --entity-type topics --entity-name topic --describe
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --zookeeper host:2181 --topic topic --describe
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --zookeeper host:2181 --topic topic --alter --config retention.ms=1000
L’article suivant parle des partitions et des offsets:
In short: When you commit an offset, it means that you read all the previous messages. So committing 7 means that next you won’t read 6 and 5 but the new incoming message 8 sent by the producer.
Offset au début:
docker run wurstmeister/kafka /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --topic topic --group id1 --reset-offsets --to-earliest --execute
Offset à la fin:
docker run wurstmeister/kafka /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --topic topic --group id1 --reset-offsets --to-latest --execute
Offset à un moment précis:
docker run wurstmeister/kafka /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --topic topic --group id1 --reset-offsets --to-datetime "2017-12-22T00:00:00.000" --execute
Offset à un moment précis pour les partitions 0, 1 (même datetime pour les 2 partitions)
docker run wurstmeister/kafka /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --topic topic:0,1 --group id1 --reset-offsets --to-datetime "2017-12-22T00:00:00.000" --execute
Offset à un moment précis pour les partitions 0, 1 (datetimes différents):
docker run dddpaul/kafka-rewind --servers=kafka:9092 --group-id=id1 --topic=topic -o 0=2017-12-01 -o 1=2018-01-01