wget http://ftp.jaist.ac.jp/pub/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
tar -zxvf kafka_2.11-0.11.0.1.tgz
cd kafka_2.11-0.11.0.1
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
ローカルホスト以外から待ち受ける場合は、以下のコマンドをkafkadir/config/server.properties の最下部に追加
listeners=PLAINTEXT://<ipaddr>:9092
advertised.listeners=PLAINTEXT://<ipaddr>:9092
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <トピック名>
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-topic --from-beginning
pip install kafka-python
import time
from kafka import KafkaConsumer, KafkaProducer
def main():
producer = KafkaProducer(bootstrap_servers='<ipaddr>:9092')
counter = 0
while True:
producer.send('<No.8で作成したTopic名>', b"test")
time.sleep(1)
counter+=1
if counter == 10:
break
if __name__ == "__main__":
main()
from kafka import KafkaConsumer, KafkaProducer
def main():
consumer = KafkaConsumer(bootstrap_servers='<ipaddr>:9092',auto_offset_reset='earliest')
consumer.subscribe(['<No.8で作成したTopic名>'])
for message in consumer:
print message
if __name__ == "__main__":
main()