Recommanded Free YOUTUBE Lecture: Learning and Hacking VPC

Contents

1. Docker 로 Kafka 테스트 환경 만들기

이 문서는 Kafka에 대한 최소한의 경험을 가지고 있는 것으로 가정한다. 아래의 지식이 필요하다.
  • Zookeeper 에 대한 기본 이해
  • Kafka에 대한 기본 이해
    • 컨슈머와 프로듀서
    • 파티션
    • 컨슈머 그룹
AWS MSK 를 테스트 중이다. MSK의 경우 가용 영역 갯수 만큼의 브로커를 만들어야 한다. 테스트하려는 도쿄리전은 3개의 가용 영역으로 구성돼 있으니 최소 사양으로 만들 경우 (kafka실행을 위한 최소 스펙)kafka.m5.large 인스턴스 3개를 실행해야 하므로 상당한 비용이 들어간다. AWS MSK에서 테스트할 kafka 버전은 2.x인데, 2.x는 사용해본적이 없어서 시간이 꽤나 걸릴 것 같은(비용도 그만큼 올라갈 것 같은) 불길한 예감이 들었다.

그래서 로컬 PC에서 테스트를 충분히 한 다음, AWS에서 테스트를 해보기로 했다. 예전 같으면 VirtualBox로 몇 개의 VM을 만들어서 테스트했겠지만 도커가 있는 요즘에는 그런 노가다를 뛸 필요가 없다. 도커를 이용해서 만들어보기로 했다.

kafka 클러스터를 실행하려면 zookeeper도 함께 실행해야 하므로 docker-compose설정이 있는지를 찾아봤다. kafka stack docker compose를 선택했다.

다양한 조합의 docker compose yml 파일들이 있는데, 나는 full-stack.yml 과 zk-multiple-kafka-multiple.yml 두개의 파일을 이용해서 full stack multiple zk & kafka 시스템으로 구성했다. multiple kafka & zookeeper에 kafka & zookeeper 관리 툴들 까지 함께 올려서 완전한 테스트,디버깅 환경을 만들었다고 보면 되겠다.

Kafka 환경 분석을 위해서 yml 파일의 내용을 출력해봤다.
version: '2.1'
#
services:
# 3 .
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
# 2181 bind .
ports:
- "2181:2181"
# 2 .
# 2888 .
# 3888 .
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
volumes:
- ./zk-multiple-kafka-multiple/zoo1/data:/data
- ./zk-multiple-kafka-multiple/zoo1/datalog:/datalog
zoo2:
image: zookeeper:3.4.9
hostname: zoo2
ports:
- "2182:2182"
environment:
ZOO_MY_ID: 2
ZOO_PORT: 2182
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
 
 
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

docker-compose 명령으로 zookeeper & kafka 애플리케이션을 실행하고 정보를 살펴보자.
# docker-compose -f zk-multiple-kafka-multiple.yml up
$ docker-compose ps
/home/yundream/.local/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
RequestsDependencyWarning)
Name Command State Ports
-------------------------------------------------------------------------------------------------------------------------------
kafkafullstack_kafka-connect-ui_1 /run.sh Up 0.0.0.0:8003->8000/tcp
kafkafullstack_kafka-connect_1 /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp
kafkafullstack_kafka-rest-proxy_1 /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp
kafkafullstack_kafka-schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
kafkafullstack_kafka-topics-ui_1 /run.sh Up 0.0.0.0:8000->8000/tcp
kafkafullstack_kafka1_1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
kafkafullstack_kafka2_1 /etc/confluent/docker/run Up 9092/tcp, 0.0.0.0:9093->9093/tcp
kafkafullstack_kafka3_1 /etc/confluent/docker/run Up 9092/tcp, 0.0.0.0:9094->9094/tcp
kafkafullstack_ksql-server_1 /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp
kafkafullstack_schema-registry-ui_1 /run.sh Up 0.0.0.0:8001->8000/tcp
kafkafullstack_zoo1_1 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
kafkafullstack_zoo2_1 /docker-entrypoint.sh zkSe ... Up 2181/tcp, 0.0.0.0:2182->2182/tcp, 2888/tcp,
3888/tcp
kafkafullstack_zoo3_1 /docker-entrypoint.sh zkSe ... Up 2181/tcp, 0.0.0.0:2183->2183/tcp, 2888/tcp,
3888/tcp
kafkafullstack_zoonavigator-api_1 ./run.sh Up 9000/tcp
kafkafullstack_zoonavigator-web_1 ./run.sh Up 80/tcp, 0.0.0.0:8004->8000/tcp
 
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

현재 구성은 아래와 같다.

 Kafka 테스트 구성

2. 테스트

테스트 환경은 아래와 같다.
  • 워크 스테이션 : 개인 PC(도커 컴포즈를 이용해서 kafka&zookeeper 를 실행한 호스트 컴퓨터)
  • 운영체제 : 우분투 리눅스 18.04
  • Docker 버전 18.09.6
  • Kafka 버전 2.2.0
  • zookeeper 버전 3.4.0

2.1. kafkacat

kafka 설치할 때 같이 배포되는 kafka cli는 사용하기 귀찮아서, kafkacat 이라는 관리툴을 설치했다.
# apt-get install kafkacat
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
사용 방법은 아래와 같다.
kafkacat -b localhost:9092 -t new_topic -G [group_name] -p [partition_num] [-P|-C]
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
  • -b : 카프카 브로커 주소 목록
  • -t : 토픽
  • -p : 파티션
  • -P : 프로듀서 모드로 실행. 기본 파티션은 0이다.
  • -C : 컨슈머 모드로 실행. -P, -C가 생략될 경우 기본 컨슈머 모드로 실행한다.
  • -G : 컨슈머 그룹
앞서 만든 kafka의 정보를 확인해 보자. -L 을 이용하면 메타데이터 정보를 확인 할 수 있다.
$ kafkacat -L -b localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
3 brokers:
broker 2 at 127.0.0.1:9093
broker 3 at 127.0.0.1:9094
broker 1 at 127.0.0.1:9092
9 topics:
....
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
3개의 브로커와 9개의 토픽이 있는 걸 확인 할 수 있다.

new_topic 라는 새로운 토픽을 만들어서 컨슈머와 프로듀서를 테스트해보기로 했다. 먼저 컨슈머 모드로 실행을 했다. 파티션을 명시하지 않으면 모든 파티션으로 부터 메시지를 읽는다.
# kafkacat -b localhost:9092 -t new_topic -C
message 0001
% Reached end of topic new_topic [0] at offset 4
message 0002
% Reached end of topic new_topic [2] at offset 7
message 0003
% Reached end of topic new_topic [0] at offset 5
message 0004
% Reached end of topic new_topic [1] at offset 6
message 0005
% Reached end of topic new_topic [1] at offset 7
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
몇 번 파티션으로 부터 메시지를 읽었는지와 각 파티션에서의 offset 을 확인 할 수 있다. 토픽 new_topic에 대한 메타정보를 확인해 보자.
$ kafkacat -b localhost:9092 -L -t new_topic
Metadata for new_topic (from broker -1: localhost:9092/bootstrap):
3 brokers:
broker 2 at 127.0.0.1:9093
broker 3 at 127.0.0.1:9094
broker 1 at 127.0.0.1:9092
1 topics:
topic "new_topic" with 4 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 2, leader 3, replicas: 3, isrs: 3
partition 3, leader 1, replicas: 1, isrs: 1
partition 1, leader 2, replicas: 2, isrs: 2
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

2.2. kafka topic ui

docker compose로 카프카 topic 정보를 확인 할 수 있는 kafaka topic ui를 설치했다. 웹 브라우저로 접근해보자.

new_topic의 상세한 정보를 확인 할 수 있다. kafka 상태를 빠르게 모니터링하는 용도로 괜찮은 것 같다.

2.3. ZooNavigator

주키퍼 정보를 웹으로 네비게이션하고 편집 할 수 있다.

1번 브로커의 정보를 요청했다. 브로커의 네트워크 정보와 호스트이름 등을 확인 할 수 있다.

3. kafka 애플리케이션 테스트

대충 이정도 하고 kafka 애플리케이션을 만들어보기로 했다. go 언어와 python 언어로 만들어볼 생각이다.
  • 프로듀서 : message 토픽으로 표준출력(키보드 입력)을 전송한다.
  • 컨슈머 : message 토픽에서 읽은 데이터를 출력한다.
간단한 push 서비스 정도로 볼 수 있겠다.

3.1. Go

프로듀서 프로그램이다.
package main
import (
"bufio"
"fmt"
"github.com/Shopify/sarama"
"os"
)
type Producer struct {
ChatProducer sarama.SyncProducer
}
// .
// .
func NewProducer() *Producer {
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
c, err := sarama.NewSyncProducer([]string{
"localhost:9092",
"localhost:9093",
"localhost:9094"}, config)
if err != nil {
panic(err)
}
return &Producer{ChatProducer: c}
}
func (p *Producer) Close() error {
 
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

컨슈머 프로그램이다. 컨슈머는 testgroup그룹으로 묶었다.
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
brokers := []string{"localhost:9092", "localhost:9093", "localhost:9094"}
topics := []string{"message"}
consumer, err := cluster.NewConsumer(brokers, "testgroup", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume messages, watch errors and notifications
for {
 
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
프로듀셔를 실행 한 다음, 컨슈머를 실행해보자.
# go run consumer_cluster.go
2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}
 
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
4개의 파티션 바라보고 있는 걸 확인 할 수 있다. 컨슈머를 하나 더 실행하면 파티션에 대한 리밸런싱이 일어나는 걸 확인 할 수 있다.
# go run consumer_cluster.go
2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[]] Released:map[message:[0 1]] Current:map[message:[2 3]]}
# go run consumer_cluster.go
2019/06/07 01:13:14 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1]] Released:map[] Current:map[message:[0 1]]}
 
הההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההההה
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
첫 번째 컨슈머는 2,3 두 번째 컨슈머는 0,1 파티션을 분산해서 처리하는 걸 확인 할 수 있다. 이렇게 파티션을 구성하는 것으로 메시지를 분산처리 할 수 있다. 아래 테스트 동영상이다.

컨슈머가 추가될 때마다 파티션에 대한 리밸런싱이 이루어지는 걸 확인 할 수 있다. 현재 애플리케이션 실행 구조는 아래와 같이 묘사 할 수 있다.

 애플리케이션 실행 구조

4. 정리

  • Docker : VirtualBox를 이용했던 시절에 비해서 테스트가 너무 쉬워졌다. Kafka를 멀티노드로 테스트하려면 최소한 5개 정도의 인스턴스는 필요했는데, 지금은 docker-compose up 명령 하나로 kafka와 zookeeper 풀 셋을 전개 할 수 있다.
  • Zookeeper : 분산코디네이터. 브로커를 모니터링하고, 토픽, 파티션을 관리한다.
  • kafka : 스트림 프로세싱 영역에서는 산업표준이라고 할 수 있다. 프로젝트에 도입하고 싶은데, 아직은 마땅한 프로젝트가 없는게 아쉽다.
  • ksql : Kafka를 위한 streaming SQL 이라고 한다. 현재 ksql 서버도 설치되 있으니 ksql-cli 만 설치하면 테스트해 볼 수 있을 것 같다.