Ubuntu 14.04に Apache kafka インストール

1. JDK8 インストール

2. Apache kafka をダウンロード

wget http://ftp.jaist.ac.jp/pub/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

3. Apache kafkaを解凍

tar -zxvf kafka_2.11-0.11.0.1.tgz

4. kafkaを展開したフォルダへ移動

cd kafka_2.11-0.11.0.1

5. ZooKeeperを起動

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • バックグラウンド起動しない場合は、-daemon を外すこと

6. Apache kafkaのコンフィグ修正

ローカルホスト以外から待ち受ける場合は、以下のコマンドをkafkadir/config/server.properties の最下部に追加

listeners=PLAINTEXT://<ipaddr>:9092
advertised.listeners=PLAINTEXT://<ipaddr>:9092

7. Apache kafkaを起動

bin/kafka-server-start.sh -daemon config/server.properties
  • バックグラウンド起動しない場合は、-daemon を外すこと

8. Apache Kafka に Topicを追加

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <トピック名>

9. Topicリストを表示

bin/kafka-topics.sh --list --zookeeper localhost:2181
  • No.8 で作ったトピック名が表示されればOK

10. Producer(データ送信部)の起動

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-topic

11. Consumer(データ受信部)の起動

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-topic --from-beginning

12. 動作確認

  • No10のコンソールに文字を入力し、No11のコンソールに文字が表示されればOK。

13. Python用Kafkaパッケージインストール

pip install kafka-python

ex01. Pythonスクリプト(送信部)

  • 以下を実行することで、testと言う文字列が 10回 kafkaへ送信される。
import time
from kafka import KafkaConsumer, KafkaProducer

def main():
	producer = KafkaProducer(bootstrap_servers='<ipaddr>:9092')
	counter = 0
	while True:
		producer.send('<No.8で作成したTopic名>', b"test")
		time.sleep(1)
		counter+=1

		if counter == 10:
			break

if __name__ == "__main__":
	main()

ex02. Pythonスクリプト(受信部)

  • 待受モードで立ち上がり、ex01のスクリプトを実行すると、その結果が表示される。
from kafka import KafkaConsumer, KafkaProducer

def main():
	consumer = KafkaConsumer(bootstrap_servers='<ipaddr>:9092',auto_offset_reset='earliest')
	consumer.subscribe(['<No.8で作成したTopic名>'])

	for message in consumer:
		print message

if __name__ == "__main__":
    main()