Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

분산 시스템 성능 측정의 어려움

주로 ab와 wrk 등을 이용해서 애플리케이션들에 대한 테스트를 수행했다. 단일 애플리케이션 성능 측정에는 쓸만하지만 분산 시스템 성능 측정에 써먹기는 애매모호하다. 아래와 같은 분산 시스템이 있다고 가정해 보자.

테스트를 쉽게 하기 위해서 모델을 단순화 했다. 중요한 점은 비동기 구간이 있어서, 단순히 Frontend API Server의 응답속도를 측정하는 걸로는 원하는 정보를 얻을 수 없다는 것이다. 이런 모델에서 가장 중요한 성능지표는 Frontend API server 에서 보낸 메시지가 클라이언트 애플리케이션까지 도착하는 시간이다. 이를 위해서 측정해야 하는 정보들은 아래와 같다.
  1. 단일 Frontend API Server의 초당 처리량은 얼마나 되는가.
  2. 초당 처리량이 정해질 경우 적절한 동시 접속.
  3. 동시 접속이 늘어남에 따라서 메시지 도착에 걸리는 시간.
이 외에도 각 시스템의 CPU, 메모리 자원을 측정하는 것도 중요한

분산 시스템 성능 측정 방안

시스템 구축

아래와 같은 시스템을 구축하기로 했다.

  • 패킷 생성기 : wrk를 이용해서 HTTP 패킷을 만들기로 했다.
  • API Server : 성능과 생산성을 모두를 얻기 위해서 Go 언어를 선택했다. HTTP 요청을 받아서 REDIS Queue에 쌓는다.
  • REDIS : 메시지 큐 역할을 한다.
  • Message Receiver : REDIS 큐로 부터 메시지를 읽는다. 읽은 메시지는 MQTT Push Server로 보낸다. 역시 Go 언어로 개발한다.
  • MQTT Push server : MQTT Push server다. Mosquitto로 구성한다.
  • Logging server : 시간기록을 위한 로깅 서버를 만들었다. 각 요청은 MsgID를 가지고 있다. MsgID를 이용해서 메시지를 추적할 수 있다. API 서버는 MsgID를 키로 현재 시간을 로깅 서버에 저장한다. MQTT Client도 MsgID와 시간을 저장한다.

Logging

로깅 서버(Logging server)에 저장한 시간을 이용하면, 메시지가 MQTT 클라이언트(MQTT Client)에 도착한 시간을 계산 할 수 있다.
MsgID.spentTime = MsgID.endtime - MsgID.starttime
문제는 시간의 동기화다. 성능측정은 마이크로초 단위에서 이루어지기 때문에, NTP등을 이용하는 것으로는 서로 다른 서버(여기에서는 API Server와 MQTT 클라이언트)의 시간을 일치시키는 건 불가능에 가깝다. 이 문제는 로깅을 하는 서버의 시간을 기준으로 삼는 것을 해결 할 수 있다. REDIS는 TIME 명령을 제공하는데, 이 명령을 실행하면 REDIS 서버의 시간을 반환한다.
# redis-cli -h logserver 
logserver:6379> TIME
1) "1425055608"
2) "756492"

MsgID 생성

MsgID를 이용해서 로그를 추적한다. 따라서 각 패킷에 서로 다른 MsgID 값을 줘야 한다. 패킷 생성기는 HTTP Body에 MsgID를 포함한 데이터를 실어 보낸다.
POST / HTTP/1.1   
Host: apiserver 

{
  "MsgID":1234,
  "source":"client-id",
  "target":"target-id"
}
데이터를 받은 API server는 데이터를 unmarshal해서 MsgID를 가져와서 기록 한다.

Logging 샘플링

극한의 상황에서 밴체마크 툴을 돌린다. 이 경우 로깅 행위 자체가 성능에 영향을 줄 수 있다. 성능에 영향을 최소화 하기 위해서 샘플링 하기로 했다. "MsgID % 100 == 0" 식을 이용 1/100 으로 샘플링 하기로 했다.

Logging 저장 과정

각 소프트웨어 컴포넌트들은 요청받은 메시지의 MsgId를 이용해서, REDIS에 리스트(list)자료 구조로 저장한다. Reporting Server는 로그 정보를 취합해서 응답 시간을 계산한다.

wrk를 이용한 트래픽 생성

패킷 생성기는 MsgID를 만들 수 있어야 한다. 즉 데이터를 동적으로 만들 수 있는 툴을 선택해야 했다. 나는 wrk를 선택했는데, wrk가 내장한 lua 스크립팅을 이용해서 이용해서 HTTP 헤더와 바디를 자유롭게 제어 할 수 있었기 때문이다

MsgID는 일련번호로 하기로 했다. 일련번호로 할 경우 약간의 문제가 있긴하다. wrk는 스레드를 만들어서 대량의 패킷을 만드는데, 각 스레드가 독립적으로 작동한다. 따라서 8개의 스레드로 트래픽을 만들경우, 8개의 동일한 MsgID가 만들어진다. 뭐, 크게 상관이 없기는 한데, 기분이 꿀꿀하면 랜덤값을 사용하는 방법이 있기는 하다.

트래픽을 두 대 이상의 서버에서 분산해서 만들겠다면, MsgID에 서버 식별자를 붙이는 등의 작업을 해야 할 거다.

wrk 설치

패키지가 없기 때문에 직접 컴파일 해야 한다. 우분투 리눅스에 설치했다.
# apt-get install build-essential
# apt-get install libssl-dev
# apt-get install git
# git clone https://github.com/wg/wrk.git
# cd wrk
# make
# cp wrk /usr/local/bin

기본 사용

wrk는 단지 4개의 옵션만을 제공한다.
# wrk -t 12 -c 40 -d 30s http://localhost:8080/index.html
  • -t : 쓰레드 갯수
  • -c : 동접 갯수
  • -d : 테스트 시간
  • -s : Lua 스크립트를 실행할 수 있다.
위 예제는 12개의 스레드로 40개의 연결을 유지하면서 30초 동안 localhost:8080/index.html 문서를 요청한다. 출력내용은 아래와 같다.
Running 30s test @ http://127.0.0.1:8080/index.html
  12 threads and 400 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   635.91us    0.89ms  12.92ms   93.69%
    Req/Sec    56.20k     8.07k   62.00k    86.54%
  22464657 requests in 30.00s, 17.76GB read
Requests/sec: 748868.53
Transfer/sec:    606.33MB

Lua 스크립팅

wrk는 아래와 같은 Lua API를 제공한다.
init     = function(args)
request  = function()
response = function()
done     = function(summary, latency, requests)

wrk = {
	scheme   = "http",
	host     = "localhost",  
	port     = nil,
	method   = "GET",
	path     = "/",
	headers  = {},
	body     = nil
}

간단한 예제
wrk.method = "POST"   -- POST 요청을 전송한다.
count      = 1
request    = function()
   path = "/openapi"  -- /openapi 를 요청한다. 
   wrk.headers['x-application-name'] = "joinc" -- 헤더를 정의 한다.
   wrk.body = string.format("{\"msgid\":%d}", count) -- Body에 MsgID를 설정했다.
   count = count+1
   return wrk.format(nil,path)
end

요청을 확인하기 위해서 웹 서버를 하나 띄우고
# while true ; do  echo -e "HTTP/1.1 200 OK\n\n $(date)" | nc -l -p 8080  ; done

테스트를 수행했다.
# wrk -t 1 -c 1 -d 1s http://localhost:8080 -s test.lua

수행 결과
POST /openapi HTTP/1.1
Host: 10.100.1.93:8080
x-application-name: joinc
Content-Length: 11

{"msgid":6172}

개발

소개한 모델대로 인스턴스를 배치하고 소프트웨어를 개발해서 테스트 해보려 한다. 모든 소프트웨어는 Go 언어로 개발한다. Go언어를 선택한 이유는 공부 중인 언어라서...

환경

  • AWS 인스턴스 : C4.Large
  • 개발 언어 : Go
  • 메시지 큐 : REDIS
  • MQTT Broker : Mosquitto

Go 개발 환경 설정

bitbucket.org에 bitbucket.org/dream_yun/joinc git 저장소를 만들었다. 이 저장소밑에 애플리케이션과 라이브러리들을 배치한다.

Logging 라이브러리

측정 값을 로깅하는 기능을 가지고 있다. 측정에 참여하는 모든 소프트웨어들이 공통 사용하는 코드이므로 라이브러리 형태로 빼기로 했다. 파일 이름은 .../joinc/logging.go 패키지 이름은 logging으로 했다.
package logging

import (
	"encoding/json"
	"fmt"
	redis "gopkg.in/redis.v2"
	"strings"
)

// 성능측정을 위한 로그 포맷
type LogFormat struct {
	Id   int
	time string
}

type Log struct {
	App  string
	Conn *redis.Client
}

func (l Log) Start() error {
	_, err := l.Conn.Ping().Result()
	return err
}

// msgId, time을 기록한다.
func (l Log) Push(msgId int) error {
	tvalue := l.Conn.Time()
	logdata := fmt.Sprintf("{\"id\":%d, \"time\":\"%s.%06s\"}", msgId, tvalue.Val()[0], tvalue.Val()[1])
	json.Marshal(LogFormat{Id: msgId, time: strings.Join(tvalue.Val(), ".")})
	result := l.Conn.LPush(l.App, logdata)
	return result.Err()
}

테스트를 위한 코드를 만들었다.
package logging

import (
	redis "gopkg.in/redis.v2"
	"testing"
)

func TestLogging(t *testing.T) {
	mylog := Log{App: "myapp",
		Conn: redis.NewTCPClient(&redis.Options{Addr: "localhost:6379"})}
	if mylog.Start() != nil {
		t.Error("Redis server connection error!")
	}
	err := mylog.Push(1)
	if err != nil {
		t.Error("Logging error")
	}
}

테스트 수행
# go test
PASS
ok  	bitbucket.org/dream_yun/joinc/logging	0.002s

테스트 성공했으니 빌드 후 인스톨 한다.
# go build
# go install

API Server 개발

Go HTTP 패키지와 REDIS 패키지로 개발한다. 작동 과정은 아래와 같다.
  1. HTTP 요청을 읽는다.
  2. MsgID를 Key로 성능 정보를 REDIS 서버에 기록한다.
  3. HTTP Body는 REDIS(Message Queue)에 쓴다.
    package main
    
    import (
    	"encoding/json"
    	"flag"
    	"fmt"
    	"io/ioutil"
    	"net/http"
    	"os"
    	"runtime"
    
        // 로깅 라이브러리 import
    	log "bitbucket.org/dream_yun/joinc/logging"
    	redis "gopkg.in/redis.v2"
    )
    
    type Message struct {
    	Id   int
    	Data string
    }
    
    type Handler struct {
    	logAddr string
    	appAddr string
    	perfLog log.Log
    	appConn *redis.Client
    }
    
    // redis와 mqtt 연결을 테스트한다.
    func (h *Handler) Initalize() error {
    	logConn := redis.NewTCPClient(&redis.Options{Addr: h.logAddr})
    	h.perfLog = log.Log{App: "apiserver", Conn: logConn}
    	err := h.perfLog.Start()
    	if err != nil {
    		return err
    	}
    
    	h.appConn = redis.NewTCPClient(&redis.Options{Addr: h.appAddr})
    	_, err = h.appConn.Ping().Result()
    	if err != nil {
    		return err
    	}
    
    	return nil
    }
    
    // HTTP Body를 읽어서 Unmarshal 하고
    func (h *Handler) Run(w http.ResponseWriter, r *http.Request) {
    	body, _ := ioutil.ReadAll(r.Body)
    
    	var m Message
    	err := json.Unmarshal(body, &m)
    	Check(err)
    	if m.Id%100 == 0 {
    		err = h.perfLog.Push(m.Id)
    		Check(err)
    	}
    }
    
    func Check(err error) {
    	if err != nil {
    		fmt.Print(err.Error())
    		os.Exit(1)
    	}
    }
    
    func main() {
    	runtime.GOMAXPROCS(4)
    	logAddr := flag.String("log", "localhost:6379", "Loggin REDIS Server")
    	appAddr := flag.String("app", "localhost:6379", "Application REDIS Server")
    
    	flag.Parse()
    
    	ApiServer := Handler{logAddr: *logAddr, appAddr: *appAddr}
    	err := ApiServer.Initalize()
    	Check(err)
    
    	http.HandleFunc("/", ApiServer.Run)
    	http.ListenAndServe(":8080", nil)
    }

Push Server 개발

Message Push Server다. REDIS로 부터 읽은 메시지를 MQTT Broker로 전송한다.
package main

import (
	"errors"
	"flag"
	"fmt"
	"os"
	"strconv"
	"sync"

	redis "gopkg.in/redis.v2"

	MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)

type Message struct {
	Id   int
	Data string
}

type Handler struct {
	redisconn *redis.Client
	mqttconn  *MQTT.MqttClient
	topic     string
	rediskey  string
}

// REDIS 서버와 MQTT 브로커에 연결한다.
func (h *Handler) Initalize(r *redis.Client, m *MQTT.MqttClient) error {
	if !m.IsConnected() {
		return errors.New("MQTT Client connect error")
	}
	_, err := r.Ping().Result()
	if err != nil {
		return err
	}
	h.redisconn = r
	h.mqttconn = m
	return nil
}

// Redis로부터 데이터를 읽은 다음
// MQTT Broker로 보낸다.
func (h *Handler) Run() {
	for {
		data := h.redisconn.BRPop(0, h.rediskey)
		h.mqttconn.Publish(MQTT.QOS_ZERO, h.topic, data.Val()[1])
	}
}

func (h *Handler) SetTopic(topic string) *Handler {
	h.topic = topic
	return h
}

func (h *Handler) SetRedisKey(key string) *Handler {
	h.rediskey = key
	return h
}

func Check(err error) {
	if err != nil {
		fmt.Print(err.Error())
		os.Exit(1)
	}
}

func main() {
	// 실행인자 처리
	mqtthost := flag.String("mqtt", "tcp://localhost:1883", "mqtt server")
	redishost := flag.String("redis", "localhost:6379", "redis server")
	thread := flag.String("thread", "4", "Thread num")
	threadnum, err := strconv.Atoi(*thread)
	flag.Parse()

	// MQTT 연결
	opts := MQTT.NewClientOptions().AddBroker(*mqtthost).SetCleanSession(true)
	opts.SetClientId("joinc-test")
	mqttclient := MQTT.NewClient(opts)
	_, err = mqttclient.Start()
	Check(err)

	// REDIS 연결
	redisclient := redis.NewTCPClient(&redis.Options{Addr: *redishost})

	// Handler 초기화
	app := Handler{}
	app.SetTopic("/channel").SetRedisKey("myapp")
	err = app.Initalize(redisclient, mqttclient)
	Check(err)

	// Handler 실행
	var wg sync.WaitGroup
	wg.Add(1)
	for i := 0; i < threadnum; i++ {
		go func() {
			app.Run()
		}()
	}
	wg.Wait()
}

MQTT Client

종단에 있는 MQTT 클라이언트다. 브로커로 부터 topic 데이터를 읽는 일을 한다. 성능측정을 위해서 REDIS에 로그를 쌓는다.

package main

import (
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	redis "gopkg.in/redis.v2"
	"os"
	"strconv"
	"sync"

	log "bitbucket.org/dream_yun/joinc/logging"
	MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)

type Message struct {
	Id   int
	Data string
}

type Handler struct {
	redisconn *redis.Client
	mqttconn  *MQTT.MqttClient
	perfLog   log.Log
	topic     string
}

func (h *Handler) SetTopic(t string) {
	h.topic = t
}
func (h *Handler) Initalize(r *redis.Client, m *MQTT.MqttClient) error {
	h.perfLog = log.Log{App: "mqttclient", Conn: r}
	err := h.perfLog.Start()
	if err != nil {
		return err
	}
	if !m.IsConnected() {
		return errors.New("MQTT Client connect error")
	}
	_, err = r.Ping().Result()
	if err != nil {
		return err
	}
	h.redisconn = r
	h.mqttconn = m
	return nil
}

// MQTT Broker로 부터 메시지를 Subscribe 한다. 
// 메시지를 읽은 후, REDIS에 로그를 남긴다. 
func (h *Handler) Run() {
	filter, _ := MQTT.NewTopicFilter(h.topic, 1)
	var m Message
	h.mqttconn.StartSubscription(func(client *MQTT.MqttClient, msg MQTT.Message) {
		err := json.Unmarshal(msg.Payload(), &m)
		if err != nil {
			fmt.Println("Unmarshal Error")
		} else {
			fmt.Println(string(msg.Payload()))
			if m.Id%100 == 0 {
				h.perfLog.Push(m.Id)
			}
		}
	}, filter)
}

func Check(err error) {
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(1)
	}
}
func main() {
	mqtthost := flag.String("mqtt", "tcp://localhost:1883", "mqtt server")
	redishost := flag.String("redis", "localhost:6379", "redis erver")
	thread := flag.String("thread", "4", "Thread num")
	topic := flag.String("topic", "/channel", "Mqtt Sub topic")
	flag.Parse()
	threadnum, err := strconv.Atoi(*thread)
	Check(err)

	opts := MQTT.NewClientOptions().AddBroker(*mqtthost).SetCleanSession(true)
	opts.SetClientId("mqtt-client")
	mqttclient := MQTT.NewClient(opts)
	_, err = mqttclient.Start()
	Check(err)

	redisclient := redis.NewTCPClient(&redis.Options{Addr: *redishost})

	app := Handler{}
	Check(app.Initalize(redisclient, mqttclient))
	app.SetTopic(*topic)

	var wg sync.WaitGroup
	wg.Add(1)
	for i := 0; i < threadnum; i++ {
		go func() {
			app.Run()
		}()
	}
	wg.Wait()
}

테스트

아래의 서버들을 배치했다.
  • 패킷 생성 서버 : wrk를 이용해서 패킷을 마구 만든다.
  • API Server : HTTP 패킷을 받아서 처리한다.
  • REDIS Server : EC2 인스턴스에 redis 서버를 설치했다.
  • Push server : pushserver 소프트웨어를 실행
  • MQTT Broker : 모스키토를 설치했다.
  • MQTT Client : mqttclient를 실행

curl을 이용한 단위 테스트

정식 테스트 전, curl을 이용해서 제대로 작동하는지를 간단히 테스트 했다.
# cat test.json
{"data":"news","id":12000}
# curl -X POST http://apiserver:8080/adapter2/poweron/44300 -d @test.json 

제대로 작동했다면 REDIS 로그 서버에 키 이름이 "apiserver", "mqttclient"인 리스트가 만들어져야 한다.
# redis-cli -h redis_server
> KEYS *
1) "mqttclient"
2) "apiserver"

> LRANGE mqttclient 0 -1
1) "{\"id\":12000, \"time\":\"1426087754.105100\"}"
> LRANGE apiserver 0 -1
1) "{\"id\":12000, \"time\":\"1426087754.104515\"}"
잘 만들어졌다. 메시지가 목적지까지 도착한 시간은 "1426087754.105100 - 1426087754.104515 == 0.000585"이다.

정식 테스트

wrk를 이용해서 정식으로 테스트를 수행한다. 1부터 20까지 동접을 늘려가면서 테스트를 수행한다. 자동으로 테스트를 수행하는 파이슨 스크립트를 만들었다. 예쁜 코딩 같은 건 신경쓰지 않았다. 결과는 표준출력 되며, 이걸 파일로 저장해서 gnuplot로 시각화 하기로 했다. (파이선용 gnuplot 패키지가 있긴 하던데..)
import os
import re
import time
import redis
import json

# 표준출력된 wrk 테스트 결과를 파싱해서 원하는 형식으로 만든다.
# 동접\tLatency\t초당처리건수를 출력한다.
def report(c, lines):
    rtv = []
    for str in lines:
        str=re.sub("^[ \t]+",'', str)
        if str.find("Latency") == 0 or str.find("Requests/sec") == 0:
            token = re.split("[ \t]+", str)
            value = re.sub("[^0-9\.]", '', token[1])
            if re.match("[0-9\.]+us", token[1]):
                rtv.append(float(value)/1000)
            elif re.match("[0-9\.]+s", token[1]):
                rtv.append(float(value)*1000)
            else:
                rtv.append(float(value))
    return "%d\t%f\t%f" % (c, rtv[0], rtv[1])

def run(cmd):
    stdin, stdout, stderr = os.popen3(cmd)
    return stdout.readlines()

offset = 5
maxConcurrency = offset * 61
duration = 10

# redis 서버에 연결
r = redis.StrictRedis(host='redis-server', port=6379, db=0)

# 1부터 10까지 동접을 늘려가면서 테스트 한다.
for num in range(1, 11):
    cmd = ("wrk -t %d -c %d -d 10s http://apiserver:8080 -s post.lua" % (num, num))
    result =run(cmd)

    report_str =report(num, result)
    time.sleep(15)

    # apiserver의 로그와 mqttclient의 로그를 이용해서, 도착까지 걸린 시간을 구한다.
    # 제대로 하려면 기록된 모든 msgid에 대한 처리 시간을 구한다음, 평균을 내야 할 것이다.
    # 귀찮아서 제일 마지막에 기록된 msgid로만 연산을 하기로 했다. (별차이 없다.)
    start_data = json.loads(r.lrange('apiserver', 0, 0)[0])
    end_data = json.loads(r.lrange('mqttclient', 0, 0)[0])
    print report_str,"\t",float(end_data['LogTime']) - float(start_data['LogTime'])
    r.flushall()
    time.sleep(5)

아래와 같은 테스트 결과를 얻었다.
1	0.256310	3773.640000	0.000211000442505
2	0.273040	6976.360000	0.000239133834839
3	0.282040	10224.680000	0.000272035598755
4	0.294580	13108.570000	0.000375986099243
5	0.306490	15752.810000	0.000341892242432
6	0.318830	18254.500000	0.000343084335327
7	0.335950	20140.750000	0.000355005264282
8	0.361870	21735.800000	0.000284910202026
9	0.397630	22588.990000	0.000802040100098
10	0.436230	23428.370000	0.0718810558319
Gnuplot으로 시각화 했다.

테스트 결과를 분석해 보자.
  • HTTP 서버가 하는 일은 거의 없다. 따라서 응답지연은 완만하게 증가한다.
  • 최대 초당처리 건수는 대략 25,000건 정도다. 이후 동접을 늘이면, 응답지연이 늘어나게 될 것이다.
  • 초당처리 22,000 정도까지는 매우 빠르게 도착된다. 하지만 24,000 이상되면, 지연이 늘어난다. REDIS와 MQTT는 HTTP 보다 훨씬 빠르게 작동한다. Push server나 MQTT Client 어딘가에 문제가 있을 거라고 예상할 수 있다.
  • Push server도 로그를 남긴다면, 문제가 되는 부분을 더 정확히 찾을 수 있을 거다.
  • runtime.GOMAXPROCS(4) 설정이 영향을 줄 수 있다. 코드상에서 설정하지 않으면, 기본 값이 설정되는데, 테스트를 진행한 C4.large의 PROCS는 2다.
  • Push server의 경우 REDIS 핸들러와 MQTT 핸들러가 동기적으로 작동한다. 채널을 사이에 두고 분리할 수 있는데, 성능에 어떤 영향을 줄지 궁금하다.