kafka producer consumer 구현하기

kafka producer과 consumer를 java로 구현해보자.
일단 카프카가 설치 되어 있어야 한다. 카프카 설치는 여기에서 보면 된다.
아주 간단한 샘플용이니 참고만 하길 바란다.
kafka version은 0.9.0.1 버전으로 하였다.
바로 소스를 보자.

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>

    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.1.7</version>
    </dependency>
</dependencies>

일단 디펜더시는 카프카 클라이언트와 필자는 logback을 자주 써서 logback으로 디펜더시를 설정 하였다.
다음 코드는 producer 코드이다. 간단하게 만들었다.

public static void main(String[] args) throws IOException {

  Properties configs = new Properties();
  configs.put("bootstrap.servers", "localhost:9092");
  configs.put("acks", "all");
  configs.put("block.on.buffer.full", "true");
  configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

  producer.send(new ProducerRecord<>("test", "hello world"),
    (metadata, exception) -> {
      if (metadata != null) {
        System.out.println(
          "partition(" + metadata.partition() + "), offset(" + metadata.offset() + ")");
      } else {
        exception.printStackTrace();
      }
    });
  producer.flush();
  producer.close();
}

test라는 topic을 예전에 생성 해놔서 topic을 test로 지정해놨다.
http://kafka.apache.org/documentation.html#producerconfigs 여기 가면 producer config 파라미터들이 정리되어 있다.
소스는 딱히 설명 하지 않아도 보면 알 듯 싶다.

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

중요한건 send API가 실제 보내는 API이다. ProducerRecord 클래스의 첫번째 인자는 topic이고 두번째 인자는 value로 지정 되어 있다. 만약 콘솔로 consumer를 띄었다면 콘솔에 찍혀 있는 걸 확인 할 수 있다.

다음으론 consumer예제 클래스이다.

public static void main(String[] args) {
  Properties configs = new Properties();
  configs.put("bootstrap.servers", "localhost:9092");
  configs.put("session.timeout.ms", "10000");
  configs.put("group.id", "test");
  configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
  consumer.subscribe(Arrays.asList("test"));
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(500);
    for (ConsumerRecord<String, String> record : records) {
      switch (record.topic()) {
        case "test":
          System.out.println(record.value());
          break;
        default:
          throw new IllegalStateException("get message on topic " + record.topic());
      }
    }
  }
}

http://kafka.apache.org/documentation.html#consumerconfigs 마찬가지로 컨슈머의 설정은 해당 URL에 잘 정리 되어 있다. 예전 config와 새로운 config 설정이 있으니 참고하기 바란다.

consumer.subscribe(Arrays.asList("test"))

test라는 topic을 구독하겠다는 것이다 List로 들어가니 여러 topic을 구독 할 수 있다.
그리고 해당 topic(여기서는 test)으로 오면 실제 값을 출력 해준다.

이렇게 아주 간단하게 producer와 consumer를 구현해봤다. 물론 간단한 코드 이므로 실 운영에서는 사용하지 못할 듯하다. 좀더 리펙토링이 필요 할 것이다. 참고 정도만 하면 될듯 싶다.
소스는 github에 올려놔야 겠다.

kafka multi-broker cluster

좀전에 카프카에 대허서 알아 봤다.
간단하게 설치도 하고 했는데 실 운영에서는 그렇게 못쓴다. 물론 주기퍼도 여러대 카프카도 여러대로 써야 하나의 서버가 죽더라도 운영은 계속 되어야 한다. 우리는 카프카의 멀티 broker의 설정에 대해 알아 보자

기존의 server.properties는 냅두고 server1.properties 와 server2.properties를 만들자.

cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties

server.properties 복사하여 server1.properties, server2.properties를 아래와 같이 변경을 하자.

server1.properties

broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

server2.properties

broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

위와 같이 설정만 하면 끝이다. 그런 후에 주기퍼를 실행하고 server.properties실행 server1.properties 실행 server2.properties 실행 순서로 실행을 시키자.

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties

그리고 topic을 생성해야 된다.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

topic 생성도 기존의 생성과는 약간 다르다. 현재 있는 topic의 리스트를 출력해보면 생성 되어있는 것을 확인 할 수 있다.

bin/kafka-topics.sh --list --zookeeper localhost:2181
my-replicated-topic
test

필자가 하다가 꼬였는지 server.properties의 클러스팅이 되지 않아 다시 server3.properties 를 만들어서 했다

broker.id=3
port=9095
log.dir=/tmp/kafka-logs-3

만약 상관 없이 된다면 위와 같이 하지 않아도 된다.
다음으로 topic의 정보들을 확인하여 보자.

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,3,1 Isr: 3,1,2

현재 Leader 3이고 Replicas는 구성된 노드의 목록이다. Isr는 현재 살아있는 노드를 출력하는 것이다. 저 숫자는 broker.id 를 기준으로 출력된다.

bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-replicated-topic

필자는 localhost:9093으로 접속했다. 원래는 9092로 접속하면 된다. 그리고나서 메시지를 입력해보자.

hi wonwoo
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
hi wonwoo

그후에 consumer를 실행시켜 해당 메시지가 왔는지 확인해보자. 그럼 정상적으로 메세지가 출력되었다.
그리고 나서 클러스팅을 위해 리더를 죽여보자. 현재 리더가 필자와 다를 수 있으니 리더의 id를 확인하여 죽이자. 현재 필자의 리더 id는 2이기 때문에 server-2를 찾아서 죽였다.

ps -ef | grep server-2
kill -9 {pid}

kill을 한뒤에 다시 한번 정보를 살펴보자.

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 3   Replicas: 2,3,1 Isr: 3,1

현재 필자의 리더의 id는 3이며 살아있는 노드들은 3번과 1번임을 알수 있다. 그후에 다시 consumer에 접속을 해보자.

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
hi wonwoo

리더를 죽여도 잘 실행 된다.
만약 consumer를 실행 시키는 중에 kill을 하면 에러로그가 일정시간동안 올라온다. 리더를 승격 시키기 전까지의 통신을 할 수 없으므로 나오는 에러인 듯 싶다. 에러로그가 일정 시간 출력 되고 다시 메시지를 발행하면 정상적으로 consumer가 받을 수 있다.
하지만 그 시간까지는 메시지가 유실 되는 듯 하다. 셋팅의 문제이지 않나 싶기도 한데.. 지금은 잘 모르겠다.
아무튼 이렇게 카프카에 대해서 살짝 맛만 봤다. kafka와 spark streaming도 한번 해봐야 겠다.

kafka 설치

kafka를 설치 해보자
카프카란 LinkedIn에서 개발된 메시지큐 시스템이다. 기존 메시지 시스템과 우수한 성능을 보이고 있다고 한다. 여기 에서 다운로드 받을 수 있다.

producer_consumer

위의 그림은 카프카의 구성요소와 동작 방식으로 발행과 구독 모델기반으로 되어져 있다.
카픈카는 topic 기준으로 메시지를 관리하며 Producer는 topic의 메시지를 생성 후 메시지를 broker에 전달한다. 전달받은 메시지를 분류별로 큐에 쌓으면 구독하는 consumer들이 메시지를 가져가는 구조로 되어있다.

다운 받은 카프카 파일을 압출을 푼후에 실행 시켜보자.

bin/zookeeper-server-start.sh config/zookeeper.properties

...

[2016-05-22 13:30:16,738] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:os.version=10.11.4 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:user.name=wonwoo (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:user.home=/Users/wonwoo (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,738] INFO Server environment:user.dir=/Users/wonwoo/Documents/kafka_2.11-0.9.0.1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,747] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,747] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,747] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-05-22 13:30:16,775] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

카프카를 실행 시키기 위해 주기퍼가 필요 하므로 주키퍼를 먼저 실행 해야 된다. 그럼 위와 같은 로그가 출력 될 것이다.
그런다음에 메시지 저장소인 카프카를 실행시켜보자.

bin/kafka-server-start.sh config/server.properties

...

[2016-05-22 13:33:02,693] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-05-22 13:33:02,696] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-05-22 13:33:02,730] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-05-22 13:33:02,778] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-05-22 13:33:02,786] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-05-22 13:33:02,788] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.0.6,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-05-22 13:33:02,803] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-05-22 13:33:02,822] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-22 13:33:02,822] INFO Kafka commitId : 23c69d62a0cabf06 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-22 13:33:02,823] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

위와 같이 로그가 출력 되었다면 성공적으로 실행되었다.
카프카는 topic 기준으로 메시지를 처리 하기 때문에 topic을 생성해 주어야 한다.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

위는 topic을 생성해주는 커멘드 명령어이다. 우리는 공식문서에 나와있듯이 topic의 이름을 test라 지정하였다.
그리고 현재 만들어진 topic을 볼 수 있는 명령어이다.

bin/kafka-topics.sh --list --zookeeper localhost:2181
test

그런다음에 producer를 실행 시켜보자

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

방금 만든 test라는 topic으로 발행하는 커멘드 명령어이다.
명령어를 실행 시킨다음에 메시지를 입력해보자.

This is Message
This is another message

발행을 했으면 구독하는 consumer가 있어야 한다.

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is Message
This is another message

위와 같이 test라는 topic을 구독하여 보자.
그럼 이전에 test라는 topic으로 발행 했던 메시지들이 출력이 된다.

그럼 다른 메시지를 발행해보자.

My name is wonwoo lee

메시지를 입력후에 consumer에 있는 커멘드창을 보면 위에 입력했던 메시지가 출력이 될 것이다. 만약 출력이 안되었다면 topic이 제대로 되어있는지 다시 확인해보자.

일단 기본적으로 kafka에 대해서 알아봤고 설치 후 간단한 명령어를 통해 발행 및 구독을 해보았다.
카프카의 공식 홈페이지에 보면 아주 쉽게 설명 되어 있으니 공식 홈페이지를 참고에서 해보는 것이 나을 듯하다.