메뉴

문서정보

목차

Docker 로 Kafka 테스트 환경 만들기

이 문서는 Kafka에 대한 최소한의 경험을 가지고 있는 것으로 가정한다. 아래의 지식이 필요하다. AWS MSK 를 테스트 중이다. MSK의 경우 가용 영역 갯수 만큼의 브로커를 만들어야 한다. 테스트하려는 도쿄리전은 3개의 가용 영역으로 구성돼 있으니 최소 사양으로 만들 경우 (kafka실행을 위한 최소 스펙)kafka.m5.large 인스턴스 3개를 실행해야 하므로 상당한 비용이 들어간다. AWS MSK에서 테스트할 kafka 버전은 2.x인데, 2.x는 사용해본적이 없어서 시간이 꽤나 걸릴 것 같은(비용도 그만큼 올라갈 것 같은) 불길한 예감이 들었다.

그래서 로컬 PC에서 테스트를 충분히 한 다음, AWS에서 테스트를 해보기로 했다. 예전 같으면 VirtualBox로 몇 개의 VM을 만들어서 테스트했겠지만 도커가 있는 요즘에는 그런 노가다를 뛸 필요가 없다. 도커를 이용해서 만들어보기로 했다.

kafka 클러스터를 실행하려면 zookeeper도 함께 실행해야 하므로 docker-compose설정이 있는지를 찾아봤다. kafka stack docker compose를 선택했다.

다양한 조합의 docker compose yml 파일들이 있는데, 나는 full-stack.yml 과 zk-multiple-kafka-multiple.yml 두개의 파일을 이용해서 full stack multiple zk & kafka 시스템으로 구성했다. multiple kafka & zookeeper에 kafka & zookeeper 관리 툴들 까지 함께 올려서 완전한 테스트,디버깅 환경을 만들었다고 보면 되겠다.

Kafka 환경 분석을 위해서 yml 파일의 내용을 출력해봤다.
version: '2.1'

# 주키퍼와 카프카 서비스를 설정한다
services:
  # 3개의 노드로 된 주키퍼 클러스터를 구성한다.
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    # 클라이언트 포트 2181를 bind 한다. 
    ports:
      - "2181:2181"
    # 주키퍼는 서버는 2개의 포트를 가진다. 
    # 2888은 서버 노드끼리 통신을 하기 위해서 사용한다.
    # 3888은 리더 선출을 위해서 사용한다.
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    volumes:
      - ./zk-multiple-kafka-multiple/zoo1/data:/data
      - ./zk-multiple-kafka-multiple/zoo1/datalog:/datalog

  zoo2:
    image: zookeeper:3.4.9
    hostname: zoo2
    ports:
      - "2182:2182"
    environment:
        ZOO_MY_ID: 2
        ZOO_PORT: 2182
        ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    volumes:
      - ./zk-multiple-kafka-multiple/zoo2/data:/data
      - ./zk-multiple-kafka-multiple/zoo2/datalog:/datalog

  zoo3:
    image: zookeeper:3.4.9
    hostname: zoo3
    ports:
      - "2183:2183"
    environment:
        ZOO_MY_ID: 3
        ZOO_PORT: 2183
        ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    volumes:
      - ./zk-multiple-kafka-multiple/zoo3/data:/data
      - ./zk-multiple-kafka-multiple/zoo3/datalog:/datalog


  # 3개의 카프카 브로커를 구성한다.
  # Advertised listeners와 주키퍼 노드 포트를 등록한다.
  # 4개의 파티션을 구성한다.
  kafka1:
    image: confluentinc/cp-kafka:5.2.1
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-multiple-kafka-multiple/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka2:
    image: confluentinc/cp-kafka:5.2.1
    hostname: kafka2
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-multiple-kafka-multiple/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka3:
    image: confluentinc/cp-kafka:5.2.1
    hostname: kafka3
    ports:
      - "9094:9094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 3
      KAFKA_NUM_PARTITIONS: 4
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-multiple-kafka-multiple/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka-schema-registry:
    image: confluentinc/cp-schema-registry:5.2.1
    hostname: kafka-schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    depends_on:
      - zoo1
      - zoo2
      - zoo3
      - kafka1
      - kafka2
      - kafka3

  schema-registry-ui:
    image: landoop/schema-registry-ui:0.9.4
    hostname: kafka-schema-registry-ui
    ports:
      - "8001:8000"
    environment:
      SCHEMAREGISTRY_URL: http://kafka-schema-registry:8081/
      PROXY: "true"
    depends_on:
      - kafka-schema-registry

  kafka-rest-proxy:
    image: confluentinc/cp-kafka-rest:5.2.1
    hostname: kafka-rest-proxy
    ports:
      - "8082:8082"
    environment:
      # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
      KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
      KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
      KAFKA_REST_HOST_NAME: kafka-rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
    depends_on:
      - zoo1
      - zoo2
      - zoo3
      - kafka1
      - kafka2
      - kafka3
      - kafka-schema-registry

  kafka-topics-ui:
    image: landoop/kafka-topics-ui:0.9.4
    hostname: kafka-topics-ui
    ports:
      - "8000:8000"
    environment:
      KAFKA_REST_PROXY_URL: "http://kafka-rest-proxy:8082/"
      PROXY: "true"
    depends_on:
      - zoo1
      - zoo2
      - zoo3
      - kafka1
      - kafka2
      - kafka3
      - kafka-schema-registry
      - kafka-rest-proxy

  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.2.1
    hostname: kafka-connect
    ports:
      - kafka-schema-registry
      - kafka-rest-proxy

  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.2.1
    hostname: kafka-connect
    ports:
      - kafka-schema-registry
      - kafka-rest-proxy

  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.2.1
    hostname: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
    volumes:
      - ./connectors:/etc/kafka-connect/jars/
    depends_on:
      - zoo1
      - zoo2
      - zoo3
      - kafka1
      - kafka2
      - kafka3
      - kafka-schema-registry
      - kafka-rest-proxy

  kafka-connect-ui:
    image: landoop/kafka-connect-ui:0.9.4
    hostname: kafka-connect-ui
    ports:
      - "8003:8000"
    environment:
      CONNECT_URL: "http://kafka-connect:8083/"
      PROXY: "true"
    depends_on:
      - kafka-connect

  ksql-server:
    image: confluentinc/cp-ksql-server:5.2.1
    hostname: ksql-server
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      KSQL_LISTENERS: http://0.0.0.0:8088/
      KSQL_KSQL_SERVICE_ID: ksql-server_
    depends_on:
      - zoo1
      - zoo2
      - zoo3
      - kafka1
      - kafka2
      - kafka3

  zoonavigator-web:
    image: elkozmon/zoonavigator-web:0.5.1
    ports:
     - "8004:8000"
    environment:
      API_HOST: "zoonavigator-api"
      API_PORT: 9000
    links:
     - zoonavigator-api
    depends_on:
     - zoonavigator-api

  zoonavigator-api:
    image: elkozmon/zoonavigator-api:0.5.1
    environment:
      SERVER_HTTP_PORT: 9000
    depends_on:
      - zoo1
      - zoo2
      - zoo3

docker-compose 명령으로 zookeeper & kafka 애플리케이션을 실행하고 정보를 살펴보자.
# docker-compose -f zk-multiple-kafka-multiple.yml up
$ docker-compose ps
/home/yundream/.local/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
  RequestsDependencyWarning)
                 Name                               Command               State                       Ports                    
-------------------------------------------------------------------------------------------------------------------------------
kafkafullstack_kafka-connect-ui_1        /run.sh                          Up      0.0.0.0:8003->8000/tcp                       
kafkafullstack_kafka-connect_1           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp, 9092/tcp             
kafkafullstack_kafka-rest-proxy_1        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp                       
kafkafullstack_kafka-schema-registry_1   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp                       
kafkafullstack_kafka-topics-ui_1         /run.sh                          Up      0.0.0.0:8000->8000/tcp                       
kafkafullstack_kafka1_1                  /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp                       
kafkafullstack_kafka2_1                  /etc/confluent/docker/run        Up      9092/tcp, 0.0.0.0:9093->9093/tcp             
kafkafullstack_kafka3_1                  /etc/confluent/docker/run        Up      9092/tcp, 0.0.0.0:9094->9094/tcp             
kafkafullstack_ksql-server_1             /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp                       
kafkafullstack_schema-registry-ui_1      /run.sh                          Up      0.0.0.0:8001->8000/tcp                       
kafkafullstack_zoo1_1                    /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp   
kafkafullstack_zoo2_1                    /docker-entrypoint.sh zkSe ...   Up      2181/tcp, 0.0.0.0:2182->2182/tcp, 2888/tcp,  
                                                                                  3888/tcp                                     
kafkafullstack_zoo3_1                    /docker-entrypoint.sh zkSe ...   Up      2181/tcp, 0.0.0.0:2183->2183/tcp, 2888/tcp,  
                                                                                  3888/tcp                                     
kafkafullstack_zoonavigator-api_1        ./run.sh                         Up      9000/tcp                                     
kafkafullstack_zoonavigator-web_1        ./run.sh                         Up      80/tcp, 0.0.0.0:8004->8000/tcp       

현재 구성은 아래와 같다.

 Kafka 테스트 구성

테스트

테스트 환경은 아래와 같다.

kafkacat

kafka 설치할 때 같이 배포되는 kafka cli는 사용하기 귀찮아서, kafkacat 이라는 관리툴을 설치했다.
# apt-get install kafkacat
사용 방법은 아래와 같다.
kafkacat -b localhost:9092 -t new_topic -G [group_name] -p [partition_num] [-P|-C] 
앞서 만든 kafka의 정보를 확인해 보자. -L 을 이용하면 메타데이터 정보를 확인 할 수 있다.
$ kafkacat -L -b localhost:9092 
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 3 brokers:
  broker 2 at 127.0.0.1:9093
  broker 3 at 127.0.0.1:9094
  broker 1 at 127.0.0.1:9092
 9 topics:
....
3개의 브로커와 9개의 토픽이 있는 걸 확인 할 수 있다.

new_topic 라는 새로운 토픽을 만들어서 컨슈머와 프로듀서를 테스트해보기로 했다. 먼저 컨슈머 모드로 실행을 했다. 파티션을 명시하지 않으면 모든 파티션으로 부터 메시지를 읽는다.
# kafkacat -b localhost:9092 -t new_topic -C 
message 0001
% Reached end of topic new_topic [0] at offset 4
message 0002
% Reached end of topic new_topic [2] at offset 7
message 0003
% Reached end of topic new_topic [0] at offset 5
message 0004
% Reached end of topic new_topic [1] at offset 6
message 0005
% Reached end of topic new_topic [1] at offset 7
몇 번 파티션으로 부터 메시지를 읽었는지와 각 파티션에서의 offset 을 확인 할 수 있다. 토픽 new_topic에 대한 메타정보를 확인해 보자.
$ kafkacat -b localhost:9092 -L -t new_topic
Metadata for new_topic (from broker -1: localhost:9092/bootstrap):
 3 brokers:
  broker 2 at 127.0.0.1:9093
  broker 3 at 127.0.0.1:9094
  broker 1 at 127.0.0.1:9092
 1 topics:
  topic "new_topic" with 4 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 2, leader 3, replicas: 3, isrs: 3
    partition 3, leader 1, replicas: 1, isrs: 1
    partition 1, leader 2, replicas: 2, isrs: 2

kafka topic ui

docker compose로 카프카 topic 정보를 확인 할 수 있는 kafaka topic ui를 설치했다. 웹 브라우저로 접근해보자.

 kafka topic ui

 kafka topic ui

new_topic의 상세한 정보를 확인 할 수 있다. kafka 상태를 빠르게 모니터링하는 용도로 괜찮은 것 같다.

ZooNavigator

주키퍼 정보를 웹으로 네비게이션하고 편집 할 수 있다.

 ZooNavigator

 ZooNavigator

1번 브로커의 정보를 요청했다. 브로커의 네트워크 정보와 호스트이름 등을 확인 할 수 있다.

kafka 애플리케이션 테스트

대충 이정도 하고 kafka 애플리케이션을 만들어보기로 했다. go 언어와 python 언어로 만들어볼 생각이다. 간단한 push 서비스 정도로 볼 수 있겠다.

Go

프로듀서 프로그램이다.
package main

import (
	"bufio"
	"fmt"
	"github.com/Shopify/sarama"
	"os"
)

type Producer struct {
	ChatProducer sarama.SyncProducer
}

// 프로듀서를 만든다.
// 앞서 만든 브로커 정보들을 설정했다.
func NewProducer() *Producer {
	config := sarama.NewConfig()
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Return.Successes = true
	c, err := sarama.NewSyncProducer([]string{
		"localhost:9092",
		"localhost:9093",
		"localhost:9094"}, config)
	if err != nil {
		panic(err)
	}
	return &Producer{ChatProducer: c}
}

func (p *Producer) Close() error {
	err := p.ChatProducer.Close()
	if err != nil {
		return err
	}
	return nil
}

// message 토픽으로 메시지를 전송한다.
func (p *Producer) SendStringData(message string) error {
	partition, offset, err := p.ChatProducer.SendMessage(&sarama.ProducerMessage{
		Topic: "message",
		Value: sarama.StringEncoder(message),
	})
	if err != nil {
		return err
	}
	fmt.Printf("%d/%d\n", partition, offset)
	return nil
}

func main() {
	p := NewProducer()
	reader := bufio.NewReader(os.Stdin)
	for {
		fmt.Print("> ")
		message, _ := reader.ReadString('\n')
		if message == "quit\n" {
			break
		}
		p.SendStringData(message)
	}
}

컨슈머 프로그램이다. 컨슈머는 testgroup그룹으로 묶었다.
package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	brokers := []string{"localhost:9092", "localhost:9093", "localhost:9094"}
	topics := []string{"message"}
	consumer, err := cluster.NewConsumer(brokers, "testgroup", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume messages, watch errors and notifications
	for {
		select {
		case msg, more := <-consumer.Messages():
			if more {
				fmt.Fprintf(os.Stdout, "msg [%d] [%d]> %s", msg.Partition, msg.Offset, msg.Value)
				consumer.MarkOffset(msg, "") // mark message as processed
			}
		case err, more := <-consumer.Errors():
			if more {
				log.Printf("Error: %s\n", err.Error())
			}
		case ntf, more := <-consumer.Notifications():
			if more {
				log.Printf("Rebalanced: %+v\n", ntf)
			}
		case <-signals:
			return
		}
	}
}
프로듀셔를 실행 한 다음, 컨슈머를 실행해보자.
# go run consumer_cluster.go 
2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}
4개의 파티션 바라보고 있는 걸 확인 할 수 있다. 컨슈머를 하나 더 실행하면 파티션에 대한 리밸런싱이 일어나는 걸 확인 할 수 있다.
# go run consumer_cluster.go 
2019/06/07 00:53:12 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 00:53:15 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1 2 3]] Released:map[] Current:map[message:[0 1 2 3]]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[]] Released:map[message:[0 1]] Current:map[message:[2 3]]}

# go run consumer_cluster.go 
2019/06/07 01:13:14 Rebalanced: &{Type:rebalance start Claimed:map[] Released:map[] Current:map[]}
2019/06/07 01:13:16 Rebalanced: &{Type:rebalance OK Claimed:map[message:[0 1]] Released:map[] Current:map[message:[0 1]]}
첫 번째 컨슈머는 2,3 두 번째 컨슈머는 0,1 파티션을 분산해서 처리하는 걸 확인 할 수 있다. 이렇게 파티션을 구성하는 것으로 메시지를 분산처리 할 수 있다. 아래 테스트 동영상이다.

컨슈머가 추가될 때마다 파티션에 대한 리밸런싱이 이루어지는 걸 확인 할 수 있다. 현재 애플리케이션 실행 구조는 아래와 같이 묘사 할 수 있다.

 애플리케이션 실행 구조

정리