AWS MSK 를 테스트 중이다. MSK의 경우 가용 영역 갯수 만큼의 브로커를 만들어야 한다. 테스트하려는 도쿄리전은 3개의 가용 영역으로 구성돼 있으니 최소 사양으로 만들 경우 (kafka실행을 위한 최소 스펙)kafka.m5.large 인스턴스 3개를 실행해야 하므로 상당한 비용이 들어간다. AWS MSK에서 테스트할 kafka 버전은 2.x인데, 2.x는 사용해본적이 없어서 시간이 꽤나 걸릴 것 같은(비용도 그만큼 올라갈 것 같은) 불길한 예감이 들었다.
그래서 로컬 PC에서 테스트를 충분히 한 다음, AWS에서 테스트를 해보기로 했다. 예전 같으면 VirtualBox로 몇 개의 VM을 만들어서 테스트했겠지만 도커가 있는 요즘에는 그런 노가다를 뛸 필요가 없다. 도커를 이용해서 만들어보기로 했다.
kafka 클러스터를 실행하려면 zookeeper도 함께 실행해야 하므로 docker-compose설정이 있는지를 찾아봤다. kafka stack docker compose를 선택했다.
다양한 조합의 docker compose yml 파일들이 있는데, 나는 full-stack.yml 과 zk-multiple-kafka-multiple.yml 두개의 파일을 이용해서 full stack multiple zk & kafka 시스템으로 구성했다. multiple kafka & zookeeper에 kafka & zookeeper 관리 툴들 까지 함께 올려서 완전한 테스트,디버깅 환경을 만들었다고 보면 되겠다.
Kafka 환경 분석을 위해서 yml 파일의 내용을 출력해봤다.
docker-compose 명령으로 zookeeper & kafka 애플리케이션을 실행하고 정보를 살펴보자.
# docker-compose -f zk-multiple-kafka-multiple.yml up
$ docker-compose ps
/home/yundream/.local/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
RequestsDependencyWarning)
Name Command State Ports
-------------------------------------------------------------------------------------------------------------------------------
kafkafullstack_kafka-connect-ui_1 /run.sh Up 0.0.0.0:8003->8000/tcp
kafkafullstack_kafka-connect_1 /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp
kafkafullstack_kafka-rest-proxy_1 /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp
kafkafullstack_kafka-schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
kafkafullstack_kafka-topics-ui_1 /run.sh Up 0.0.0.0:8000->8000/tcp
kafkafullstack_kafka1_1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
kafkafullstack_kafka2_1 /etc/confluent/docker/run Up 9092/tcp, 0.0.0.0:9093->9093/tcp
kafkafullstack_kafka3_1 /etc/confluent/docker/run Up 9092/tcp, 0.0.0.0:9094->9094/tcp
kafkafullstack_ksql-server_1 /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp
kafkafullstack_schema-registry-ui_1 /run.sh Up 0.0.0.0:8001->8000/tcp
kafkafullstack_zoo1_1 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
kafkafullstack_zoo2_1 /docker-entrypoint.sh zkSe ... Up 2181/tcp, 0.0.0.0:2182->2182/tcp, 2888/tcp,
3888/tcp
kafkafullstack_zoo3_1 /docker-entrypoint.sh zkSe ... Up 2181/tcp, 0.0.0.0:2183->2183/tcp, 2888/tcp,
3888/tcp
kafkafullstack_zoonavigator-api_1 ./run.sh Up 9000/tcp
kafkafullstack_zoonavigator-web_1 ./run.sh Up 80/tcp, 0.0.0.0:8004->8000/tcp
현재 구성은 아래와 같다.
테스트
테스트 환경은 아래와 같다.
워크 스테이션 : 개인 PC(도커 컴포즈를 이용해서 kafka&zookeeper 를 실행한 호스트 컴퓨터)
운영체제 : 우분투 리눅스 18.04
Docker 버전 18.09.6
Kafka 버전 2.2.0
zookeeper 버전 3.4.0
kafkacat
kafka 설치할 때 같이 배포되는 kafka cli는 사용하기 귀찮아서, kafkacat 이라는 관리툴을 설치했다.
앞서 만든 kafka의 정보를 확인해 보자. -L 을 이용하면 메타데이터 정보를 확인 할 수 있다.
$ kafkacat -L -b localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
3 brokers:
broker 2 at 127.0.0.1:9093
broker 3 at 127.0.0.1:9094
broker 1 at 127.0.0.1:9092
9 topics:
....
3개의 브로커와 9개의 토픽이 있는 걸 확인 할 수 있다.
new_topic 라는 새로운 토픽을 만들어서 컨슈머와 프로듀서를 테스트해보기로 했다. 먼저 컨슈머 모드로 실행을 했다. 파티션을 명시하지 않으면 모든 파티션으로 부터 메시지를 읽는다.
# kafkacat -b localhost:9092 -t new_topic -C
message 0001
% Reached end of topic new_topic [0] at offset 4
message 0002
% Reached end of topic new_topic [2] at offset 7
message 0003
% Reached end of topic new_topic [0] at offset 5
message 0004
% Reached end of topic new_topic [1] at offset 6
message 0005
% Reached end of topic new_topic [1] at offset 7
몇 번 파티션으로 부터 메시지를 읽었는지와 각 파티션에서의 offset 을 확인 할 수 있다. 토픽 new_topic에 대한 메타정보를 확인해 보자.
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
brokers := []string{"localhost:9092", "localhost:9093", "localhost:9094"}
topics := []string{"message"}
consumer, err := cluster.NewConsumer(brokers, "testgroup", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume messages, watch errors and notifications
for {
select {
case msg, more := <-consumer.Messages():
if more {
fmt.Fprintf(os.Stdout, "msg [%d] [%d]> %s", msg.Partition, msg.Offset, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
case err, more := <-consumer.Errors():
if more {
log.Printf("Error: %s\n", err.Error())
}
case ntf, more := <-consumer.Notifications():
if more {
log.Printf("Rebalanced: %+v\n", ntf)
}
case <-signals:
return
}
}
}
프로듀셔를 실행 한 다음, 컨슈머를 실행해보자.
# go run consumer_cluster.go
2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}
4개의 파티션 바라보고 있는 걸 확인 할 수 있다. 컨슈머를 하나 더 실행하면 파티션에 대한 리밸런싱이 일어나는 걸 확인 할 수 있다.
# go run consumer_cluster.go
2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[]] Released:map[message:[0 1]] Current:map[message:[2 3]]}
# go run consumer_cluster.go
2019/06/07 01:13:14 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1]] Released:map[] Current:map[message:[0 1]]}
첫 번째 컨슈머는 2,3 두 번째 컨슈머는 0,1 파티션을 분산해서 처리하는 걸 확인 할 수 있다. 이렇게 파티션을 구성하는 것으로 메시지를 분산처리 할 수 있다. 아래 테스트 동영상이다.
컨슈머가 추가될 때마다 파티션에 대한 리밸런싱이 이루어지는 걸 확인 할 수 있다. 현재 애플리케이션 실행 구조는 아래와 같이 묘사 할 수 있다.
정리
Docker : VirtualBox를 이용했던 시절에 비해서 테스트가 너무 쉬워졌다. Kafka를 멀티노드로 테스트하려면 최소한 5개 정도의 인스턴스는 필요했는데, 지금은 docker-compose up 명령 하나로 kafka와 zookeeper 풀 셋을 전개 할 수 있다.
Zookeeper : 분산코디네이터. 브로커를 모니터링하고, 토픽, 파티션을 관리한다.
kafka : 스트림 프로세싱 영역에서는 산업표준이라고 할 수 있다. 프로젝트에 도입하고 싶은데, 아직은 마땅한 프로젝트가 없는게 아쉽다.
ksql : Kafka를 위한 streaming SQL 이라고 한다. 현재 ksql 서버도 설치되 있으니 ksql-cli 만 설치하면 테스트해 볼 수 있을 것 같다.
Contents
Docker 로 Kafka 테스트 환경 만들기
테스트
kafkacat
kafka topic ui
ZooNavigator
kafka 애플리케이션 테스트
Go
정리
Recent Posts
Archive Posts
Tags