메뉴

문서정보

목차

Amazon Managed Streaming for Apache Kafka

나는 항상 우리의 고객이 스트리밍 데이터를 이용하는 것을 보며 놀라곤 한다. 예를들어 기업과 전문가 집단에게 가장 신뢰받는 뉴스기업 중 하나인 Thomson Reuters 의 경우 데이터를 수집하고 분석 및 시각화 하여 사용자 경험의 지속적 개선을 지원하는 솔류션을 구축했다. Hayday, Clash of Clan 및 Boom Beach 같은 게임을 제공하는 소셜 게임 회사인 Supercell은 450억건의 게임 데이터를 실시간으로 처리하는 시스템을 개발했다.

2013 Re:Invent에서 Amazon Kinesis를 출시 한 이래 우리는 고객이 스트리밍 데이터로 작업하는 방식을 개선하기위해서 지속적으로 확장해 왔다. 사용 가능한 솔류션은 아래와 같다. AWS는 re:Invent 2018에서 Amazon Managed Streaming for Apache kafka(MSK)를 소개했다. MSK를 사용하면 Apache kafka를 이용한 스트리밍 데이터 처리 응용 프로그램을 쉽게 작성하고 실행 할 수 있다. AWS는 (2019년 5월 30일) MSK를 일반 사용자가 사용 할 수 있도록 공개했다. MSK는 현재 N.Virginia, Ohio, Oregon, Tokyo, Singapore, Sydney, Frankfurt, Ireland, Paris, London 리전에서 사용 할 수 있다.(왜 Seould 리전은 항상 뒷 전인 거냐)

Amazon MSK Use Case

Apache kafka 기반 서비스인 만큼 동일한 목적으로 사용 할 수 있다.

작동방식

Apache kafka는 고객이 스트림 이벤트, 트랜잭션, IoT 이벤트, 응용 프로그램의 로그와 같은 스트리밍 데이터를 캡춰해서 실시간 분석을 수행하고, 연속적인 변환을 실행 하도록 도와주는 소프트웨어다. 사용자는 데이터를 실시간으로 data lake와 데이터베이스에 저장 할 수 있다. 또한 kafka를 스트리밍 데이터 저장소로 사용하여, 컨슈머와 프로듀셔 애플리케이션을 분리 할 수 있다.

Kafka는 엔터프라이즈 수준에서 사용 할 수 있는 데이터 스트리밍 및 메시징 프레임워크이지만 프러덕션영역에 적용 하고자 할 경우 설정, 확장 및 관리에 어려움을 겪을 수 있다. Amazon MSK를 이용해서 확장성, 고가용성 시스템 구성을 위한 모든 관리 요소를 Amazon에 위임하고 개발자는 핵심 비지니스 프로세스 개발에 집중 할 수 있다.

MSK 클러스터는 해당 리전의 모든 가용 영역에 걸쳐서 브로커(Broker)를 배치한다. 사용자는 MSK를 위한 독립적인 VPC를 만들어야 한다. 테스트를 수행할 도쿄리전은 3개의 가용 영역을 가지고 있으므로 3개의 가용 영역 모두에 서브넷이 배치된 VPC 네트워크를 만든다.

 MSK VPC

사용자는 몇 분 안에 클러스터를 만들 수 있으며, AWS 계정, IAM(Identity and Access Management)를 이용해서 클러스터 작업에 대한 권한을 제어할 수 있다. 그리고 AWS ACM(AWS Certificate Manager)로 클라이언트를 인증 하고, TLS를 이용해서 전송 중 데이터를 암호화 할 수 있다. 데이터는 KMS를 이용해서 안전하게 저장 할 수 있다.

Amazon MSK는 지속적으로 서버상태를 모니터링하고 장애가 발생하면 자동으로 서버를 대체한다. 서비패치역시 자동으로 이루어지며, 고 가용성의 Zookeeper 노드도 추가 비용없이 사용 할 수 있다. Kafka의 성능 메트릭들은 Amazon CloudWatch로 확인 할 수 있다. Amazon MSK는 kafka 1.1.1 및 2.1.0과 호환되므로, 코드의 변경없이 kakfa 관련 툴과 프레임워크들을 사용 할 수 있다.

Amazon은 고객의 요청에 따라서 아래의 기능들을 추가했다. 조만간 AWS CloudFormation을 지원할 예정이다.

MSK 전개를 위한 네트워크 구성

MSK 기반 애플리케이션 테스트를 위해서 아래와 같은 네트워크를 구성했다.

 MSK 테스트용 네트워크

MSK 네트워크, 3개의 가용영역에 걸친 3개의 서브넷을 구성한다. Application 개발용으로 별도의 네트워크를 만들고 VPC Peering으로 연결한다.

클러스터의 생성

서울 리전에서 사용 할 수 없어서, 도쿄리전에서 테스트하기로 했다.

 MSK

 MSK

MSK는 프로비저닝된 브로커 갯수에 따라 비용이 과금된다. 가장 낮은 단계의 kafka.m5.large는 시간당 0.271 달러로 시간당 0.124달러인 EC2 m5.large의 두 배 정도의 비용이 들어간다. 비싸보이기는 하지만 직접 구성할 경우 최소 3대 이상의 주키퍼 노드를 구성해야 하는 걸 감안하면 합리적인 가격이라고 생각된다.

Create Brokder 를 클릭해서 MSK를 전개한다.

 MSK 일반설정

 MSK 일반설정

태그를 이용해서 자원을 검색&필터링하고 (태그 기반) IAM 정책을 적용하고 비용을 추적 할 수 있다. MSK를 전개할 VPC를 선택한다. Kafka 버전은 2.1.0을(Kafka 1.x 버전까지만 사용했었는데, 이번기회에 2.x 적응을 해봐야 겠다.) 선택했다.

 가용영역 선택

 가용영역 선택

VPC와 가용영역, 서브넷을 선택한다. MSK는 해당 리전의 모든 가용영역을 사용한다. 현재 도쿄는 3개의 가용영역으로 구성된다.

 MSK 브로커 설정

 MSK 브로커 설정

Brokers per Availability Zone. 각 가용 영역에 배치할 브로커의 갯수를 설정한다. 도쿄리전은 3개의 가용 영역으로 구성됐으니, x 3 만큼의 브로커가 전개된다. 태그는 대략 설정하자. 카프카 브로커가 사용할 Storage 크기를 설정한다. 기본 크기는 1000GB다.

 MSK Security

 MSK Security

클러스터 내부에서 흐르는 데이터는 암호화를 하도록 설정했다. 클라이언트와 브로커는 TLS 기반 암호화와 pinaintext 트래픽을 허용하기로 했다. 클러스터 안에서 저장되는 데이터들에 대해서는 AWS CMK를 이용해서 암호화 해서 저장하기로 했다. 원한다면 TLS 인증서를 이용해서 클라이언트를 인증 할 수도 있다.

 MSK Advanced settings

 MSK Advanced settings

Cluster 설정은 기본 설정으로(커스텀 설정하려면 귀찮을 것 같아서, 나중에 제대로 다루게 될 때가 오면 그때 자세히 살펴볼 생각이다) 가져갔다. Create cluster 를 클릭하면 클러스터가 만들어진다.

MSK 전개 정보 확인하기

몇 분을 기다리면 MSK가 전개되고 콘솔에서 정보를 확인 할 수 있다.

g

g

Kafka의 ARN을 확인하자.

 ARN 확인

 ARN 확인

Kafka 애플리케이션의 개발을 위해서 zookeeper 과 kafka 브로커 정보를 확인해보자. 웹 콘솔에서도 확인 할 수 있지만 aws cli로 확인해보기로 했다.

# aws kafka describe-cluster --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:522xxxxxxxx:cluster/joinc-msk-test/3d38e
{
    "ClusterInfo": {
        "BrokerNodeGroupInfo": {
            "BrokerAZDistribution": "DEFAULT",
            "ClientSubnets": [
                "subnet-43a03634",
                "subnet-ec8029b5",
                "subnet-0fa9bc492c433393c"
            ],
            "InstanceType": "kafka.m5.large",
            "SecurityGroups": [
                "sg-24005641"
            ],
            "StorageInfo": {
                "EbsStorageInfo": {
                    "VolumeSize": 5
                }
            }
        },
        "ClusterArn": "arn:aws:kafka:ap-northeast-1:522xxxxxxx:cluster/joinc-msk-test/3d38ee6a-9e6e-4a8f-b9fa-b4472b6a49f8-3",
        "ClusterName": "joinc-msk-test",
        "CreationTime": "2019-06-16T10:49:38.052Z",
        "CurrentBrokerSoftwareInfo": {
            "KafkaVersion": "2.1.0"
        },
        "CurrentVersion": "K13V1IB3VIYZZH",
        "EncryptionInfo": {
            "EncryptionAtRest": {
                "DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:522xxxxxx:key/eaccb247-9118-4885-8d9c-649d10cadedf"
            }
        },
        "EnhancedMonitoring": "PER_BROKER",
        "NumberOfBrokerNodes": 3,
        "State": "ACTIVE",
        "ZookeeperConnectString": "10.1.2.7:2181,10.1.3.94:2181,10.1.1.114:2181"
    }
}
Zookeeper 클러스터 정보를 확인 할 수 있다.

개발 및 테스트 환경 만들기

Application 네트워크(10.2.0.0/16) VPC에 EC2 인스턴스를 전개한다. MSK는 별도의 VPC MSK 네트워크(10.1.0.0/16) 네트워크에 있기 때문에, VPC Peering으로 두 개의 VPC를 연결해야 한다. AWS VPC Peering 문서를 참고해서 두 개의 VPC를 연결하자.

Topic 만들기

여기서 부터 작업은 Application 네트워크에 전개한 EC2 인스턴스를 통해서 이루어진다. 앞서 Zookeeper 클러스터 정보 "ZookeeperConnectString"으로 zookeeper에 topic 생성 요청을 해야 한다.

작업을 위해서 kafka 클라이언트 애플리케이션을 설치해야 한다. 먼저 자바를 설치한다.
# sudo yum install java-1.8.0

Apache kafka를 다운로드하고 압축을 푼다.
# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
# tar -xvzf kafka_2.12-2.1.0.tgz 

kafka 명령을 이용해서 message 토픽을 생성한다. --zookeeper에 ZookeeperConnectString 값을 설정하면 된다.
$ bin/kafka-topics.sh --create --zookeeper 10.1.2.7:2181,10.1.3.94:2181,10.1.1.114:2181 --replication-factor 3 --partitions 1 --topic message
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic "message".
토픽이 만들어졌다.

이제 프로시져&컨슈머를 테스트해보자. 아래 명령을 이용해서 프로시져와 컨슈머가 연결할 브로커 정보를 가져올 수 있다.
# kafka get-bootstrap-brokers --cluster-arn "arn:aws:kafka:ap-northeast-1:......"
{
    "BootstrapBrokerString": "b-3.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-1.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092"
}

컨슈머를 topic에 연결한다.
in/kafka-console-producer.sh --broker-list b-3.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-1.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092 --topic message
>hello world

프로시져를 topic에 연결한다.
$ bin/kafka-console-consumer.sh --bootstrap-server  b-3.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-1.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.joinc-kafka.cqiofx.c3.kafka.ap-northeast-1.amazonaws.com:9092 --topic message
hello world

hello world 메시지를 전송했는데, 잘 도착한다.

정리

앞으로 할 일

참고