kafka를 설치 해보자
카프카란 LinkedIn에서 개발된 메시지큐 시스템이다. 기존 메시지 시스템과 우수한 성능을 보이고 있다고 한다. 여기 에서 다운로드 받을 수 있다.

producer_consumer

위의 그림은 카프카의 구성요소와 동작 방식으로 발행과 구독 모델기반으로 되어져 있다.
카픈카는 topic 기준으로 메시지를 관리하며 Producer는 topic의 메시지를 생성 후 메시지를 broker에 전달한다. 전달받은 메시지를 분류별로 큐에 쌓으면 구독하는 consumer들이 메시지를 가져가는 구조로 되어있다.

다운 받은 카프카 파일을 압출을 푼후에 실행 시켜보자.

bin/zookeeper-server-start.sh config/zookeeper.properties

...

[2016-05-22 13:30:16,738] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:os.version=10.11.4 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:user.name=wonwoo (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:user.home=/Users/wonwoo (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:user.dir=/Users/wonwoo/Documents/kafka_2.11-0.9.0.1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,747] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,747] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,747] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,775] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

카프카를 실행 시키기 위해 주기퍼가 필요 하므로 주키퍼를 먼저 실행 해야 된다. 그럼 위와 같은 로그가 출력 될 것이다.
그런다음에 메시지 저장소인 카프카를 실행시켜보자.

bin/kafka-server-start.sh config/server.properties

...

[2016-05-22 13:33:02,693] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-05-22 13:33:02,696] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-05-22 13:33:02,730] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-05-22 13:33:02,778] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-05-22 13:33:02,786] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-05-22 13:33:02,788] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.0.6,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-05-22 13:33:02,803] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-05-22 13:33:02,822] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-22 13:33:02,822] INFO Kafka commitId : 23c69d62a0cabf06 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-22 13:33:02,823] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

위와 같이 로그가 출력 되었다면 성공적으로 실행되었다.
카프카는 topic 기준으로 메시지를 처리 하기 때문에 topic을 생성해 주어야 한다.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

위는 topic을 생성해주는 커멘드 명령어이다. 우리는 공식문서에 나와있듯이 topic의 이름을 test라 지정하였다.
그리고 현재 만들어진 topic을 볼 수 있는 명령어이다.

bin/kafka-topics.sh --list --zookeeper localhost:2181
test

그런다음에 producer를 실행 시켜보자

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

방금 만든 test라는 topic으로 발행하는 커멘드 명령어이다.
명령어를 실행 시킨다음에 메시지를 입력해보자.

This is Message
This is another message

발행을 했으면 구독하는 consumer가 있어야 한다.

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is Message
This is another message

위와 같이 test라는 topic을 구독하여 보자.
그럼 이전에 test라는 topic으로 발행 했던 메시지들이 출력이 된다.

그럼 다른 메시지를 발행해보자.

My name is wonwoo lee

메시지를 입력후에 consumer에 있는 커멘드창을 보면 위에 입력했던 메시지가 출력이 될 것이다. 만약 출력이 안되었다면 topic이 제대로 되어있는지 다시 확인해보자.

일단 기본적으로 kafka에 대해서 알아봤고 설치 후 간단한 명령어를 통해 발행 및 구독을 해보았다.
카프카의 공식 홈페이지에 보면 아주 쉽게 설명 되어 있으니 공식 홈페이지를 참고에서 해보는 것이 나을 듯하다.