메뉴

문서정보

목차

메시지 시스템

메시지 애플리케이션은 어떤 애플리케이션에서 다른 애플리케이션으로 데이터를 보내는 시스템이다. 메시지 시스템은 두 애플리케이션 사이에서 전령 역할을 한다. 메시지의 전송과 수신은 메시지 시스템이 안전하게 처리해 줄 것이라는 것을 보증한다. 따라서 메시지 애플리케이션은 메시지의 송/수신 과정에 신경 쓸 필요 없이, 메시지(데이터)에만 신경을 쓰면 된다.

메시징 방식은 크게 P2P(Point to Point)와 PUB/SUB(Publish - Subscribe) 메시지 시스템이 있다.

P2P 메시징 시스템

 P2P 메시징 시스템

한지점에서 다른 지점으로 메시지를 전송한다. 하나 이상의 컨슈머가 메시지를 읽을 수 있지만, 특정 메시지는 오직 하나의 컨슈머만 읽을 수 있다. 컨슈머가 읽은 메시지는 대기열에서 사라지기 때문이다. 이 모델을 사용 할 수 있는 대표적인 시스템이 주문 처리 시스템이다. 커피전문점에서 여러 바리스타가 주문을 처리하는 모습을 생각하면 된다.

PUB/SUB 메시징 시스템

 PUB/SUB 메시징 시스템

하나의 메시지를 여러 컨슈머가 구독할 수 있다. 전형적인 뉴스구독 시스템에 어울리는 방식이다. 메시지를 읽으면서 큐에서 제거해 버리면 다른 컨슈머가 읽을 수 없으므로, 메시지는 큐에 계속 남아 있는다. 메시지가 늘어날 경우 저장공간이 부족하거나 데이터 처리의 효율이 떨어질 수 있으므로 일정 시간이 지난 메시지는 삭제한다.

설치

여기에서는 로컬 시스템에 직접 구성한다. 도커 환경 구성은 Docker로 Kafka 테스트문서를 참고하자. Kafka 설치 환경은 아래와 같다. 카프카 버전은 (2017년 4월 2일)0.10.2다.
$ find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
kafka_2.11-0.10.2.0-sources.jar.asc

시스템 구성

 Kafka 테스트 환경 구성

자바 환경 설치

$ apt-get install default-jdk
$ javac -version
javac 1.8.0_121

주키퍼 설치

$  apt-get install zookeeper zookeeperd -y
자세한 내용은 주키퍼 튜토리얼 문서 참고 한다.

zoo.cfg 설정은 다음과 같다.
# cat /etc/zookeeper/conf/zoo.cfg
......
server.1=kafka-01:2888:3888
server.2=kafka-02:2888:3888
server.3=kafka-03:2888:3888

Kafka 서버 설치

Java와 주키퍼가 설치됐다면 kafka를 설치한다.
$ wget http://apache.mirror.cdnetworks.com/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
/opt/kafka 디렉토리에에 압축을 풀었다.
$ tar -xvzf kafka_2.11-0.10.2.0.tgz /opt/kafka
kafka의 주키퍼 설정을 변경한다.
# cat /opt/kafka/conf/zookeeper.properties
....
clientPort=2180
server.1=kafka-01:2888:3888
server.2=kafka-02:2888:3888
server.3=kafka-03:2888:3888
kafka 서버 설정을 한다. 쥬키퍼와 마찬가지로 kafka는 전체 클러스터 안에서 유일한 브로커 id를 가지고 있어야 한다. kafka-01, kafka-02, kafka-03 각각 0, 1, 2의 아이디를 설정한다.
# cat /opt/kafka/config/server.properties
broker.id=0

kafka를 실행하자.
$ cd /opt/kafka/bin
$ ./kafka-server-start.sh /opt/kafka/config/server.properties 
[2017-03-19 21:22:08,517] INFO Kafka version : 0.10.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-03-19 21:22:08,518] INFO Kafka commitId : 576d93a8dc0cf421 (org.apache.kafka.common.utils.AppInfoParser)
[2017-03-19 21:22:08,518] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

kafka 노드 3개를 모두 실행 한다음 zkCli.sh를 이용해서 상태를 확인해 보자.
$ sudo ./zkCli.sh 
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids
null
cZxid = 0x100000007
ctime = Sun Mar 19 21:17:39 KST 2017
mZxid = 0x100000007
mtime = Sun Mar 19 21:17:39 KST 2017
pZxid = 0x10000005f
cversion = 15
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 3
3개의 kafka 노드들이 붙어 있는 걸 확인할 수 있다.

카프카 서버 테스트

kafka에서 제공하는 테스트 스크립트를 이용해서 testing이라는 이름의 토픽을 만들었다.
# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2  --partitions 1 --topic testing
zkCli.sh로 토픽목록을 확인 할 수 있다.
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[testing]
kafka-topics.sh 스크립트로도 확인 할 수 있다. 이 스크립트도 주키퍼 서버에 연결해서 토픽정보를 얻어오는 일을 한다.
$ ./kafka-topics.sh --list --zookeeper localhost:2181
testing
이제 testing 토픽에 셈플 메시지를 PUB 해보자. kafka-01 서버에서 PUB를 해봤다.
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic testing
Hello World?
kafka-03 서버에서 testing 토픽의 메시지를 SUB했다.
$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic testing --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Hello World?
kafka-01에서 메시지를 입력하면 kafka-03에서 SUB 하는 것을 확인 할 수 있을 것이다. kafaka-02에도 SUB을 붙여보면, 모든 컨슈머에 메시지가 전달되는 걸 확인 할 수 있을 것이다.

참고