Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>
Iterator

Iterator

윤 상배

dreamyun@yahoo.co.kr



1절. 소개

이번글에는 STL(:12) 에 대한 간단한 응용을 포함한다. 대량의 데이타를 취급해야 할때 어떻게 작업을 해야 할런지에 대한 몇가지 문제들을 담고 있다.

이를 위해서 DOS:::ATTACK(:12)을 검출하는 어플리케이션을 예로 들어서 설명을 할것이다. 그러나 이 문서는 어디까지나 방법을 제시하는 문서로써 완벽한 코드를 제시하진 않을것이며, 테스트 가능한 수준의 코드만을 제공할것이며 구현에 대한 고민은 각자의 몫이 될것이다.

DOS 공격에 대한 내용은 네트웍 보안의 기본(1)을 참고하기 바란다.


2절. DOS ATTACK 검출 어플리케이션 제작

간단한 DOS(:12) 공격을 검색하는 어플리케이션을 만든다고 가정을 해보자. DOS 공격의 경우 보통 하나의 IP(:12)에서 특정 포트로 짧은시간에 요청이 들어올것임으로, "5초에 20번의 요청이 있으면 DOS 공격으로 간주한다" 라는 식으로 DOS 공격에 대해서 정의가 가능하다.

그렇다면 어플리케이션은 해당패킷이 들어올경우 IP 정보를 분석해서 카운팅하고, 이 카운팅이 5초 내에 20번 발생 했다면 "DOS WARN 메시지"를 시켜야 할것이다.

이 어플리케이션을 제작하는데 있어서 가장 중요한 포인트는 자료구조(:12)의 유지와 효율성이 될것이다. DOS 어택을 판단하는 근거는 IP와 PORT(:12) 가 됨으로, 모든 입력 IP와 PORT에 대한 테이블을 유지하고 있어야 한다.

그런데 아시다 시피 IP의 범위는 너무 넓다. 좀 널리 알려진 바쁜서버 라면 짧은시간에 수천-수만 혹은 그이상의 서로다른 IP 에서의 접근이 이루어질수 있을거다. 만약 라우터(:12)에 설치할경우 더욱 많은 접근이 일어날것이다. 이걸 단순히 map(:12) 이나 vector(:12) 로 IP정보 테이블을 구성할경우 효율에 문제가 발생할수 있다.


2.1절. 효율적인 자료구조를 생각해보자

그럼 어떻게 해야 효율적인 자료구조를 만들수 있을까.. 단순한 map 으로 할경우 만약 IP가 수만개가 들어온다면, 수만개의 원소로 이루어진 거대한 map 데이타가 만들어질 것이며, 더 커질수도 있을것이다. 거기에 port 정보까지 포함해야 됨으로, 단순 map 으로 구성하기에는 원소의 갯수가 너무 많아진다.

map 은 정렬연관:::컨테이너(:12)로 균형 이진트리 구조를 가진다. 다음은 map 의 복잡도(Complexity guarantees) 이다. size() 는 map 에 포함된 원소의 갯수이며, count(k) 는 삭제하고자 하는 key 의 갯수이다. N 은 원소의 갯수이다.

표 1. 평균 복잡도(Average complexity)

erase keyO(log(size()) + count(k))
erase elementconstant time
범위 삭제O(log(size()) + N)
countO(log(size()) + count(k))
findlogarithmic
예를 들어 13만개 정도의 원소를 가진 map 에서 특정한 원소를 찾을경우 평균 걸리는 시간은 약 17 정도가 될것이다. 1,000,000 은 약 20 이다.

DOS 공격을 검색해내는 어플리케이션이라면 라우터에 설치될것도 생각해봐야 한다. 그래서 평균 1,000,000 의 데이타에서 서취하는걸로 해서 계산해보면 대량 20 정도의 시간이 소모된다. 20 은 이처럼 바쁜어플리케이션 꽤나 큰수치이다. 그럼으로 시간을 좀더 줄여야 할 필요성이 있다.

게다가 이 어플리케이션을 위해서 map 을 구성한다고 했을때 map 의 key 는 IP와 PORT 의 조합이 될것임으로, 백만 이상의 데이타를 감당할수 있다고 봐야할것이다.


2.1.1절. 해쉬를 이용한 효율성 높이기

그래서 생각할수 있는게 Hash 함수의 사용이다. Hash 함수는 데이타 축약을 위한 함수로써, 같은 값을 입력했을때는 언제나 같은 데이타 축약을 얻어낼수 있는 함수이다.

우리가 축약해야할 데이타는 물론 32bit 크기의 IP 주소 이다. 32bit 는 범위가 너무나 넓음으로 이것을 1024(2^10) 정도로 줄이도록 하겠다. HASH 함수는 매우 단순무식한 방법으로 IP 주소 32bit 중 10bit 만을 이용해서 Hash 값을 얻도록하는데, 단지 쉬프트:::연산(:12)을 해주는 것만으로 해결가능하다.

int hash(ulong ip)
{
	return ip >> 22;
}
				
이 값들은 0 부터 1023 까지 분포가 될것임으로 vector 혹은 array 로 자료구조를 구현할수 있을것이다. IP 가 들어오면 0 - 1023 중에 하나의 값으로 hashing 될건데, vector 에 집어넣을때는 이 hashing 된 값자체를 첨자로 해서 입력할수 있음으로 시간복잡도는 O(1) 이 될것이다. 다른 말로 "한키" 에 자신이 저장될 곳을 찾아갈수 있게 된다. 이 vector 는 키를 IP와 PORT로 가지는 map 을 원소로 가지게 될것이다.

가장 이상적인 상황은 IP가 2^20 개가 들어있는데 0에서 1023까지의 vector 에 아주 균등하게 들어있는 상황으로 각 vector 의 원소는 1024 개의 원소를 가지는 map 을 가지게 될것이다. 이상황에서 원하는 데이타 (IP,PORT를 키로 가지는)를 찾는데 걸리는 시간은 "vector 에서 첨자연산하는데 걸리는 시간 O(1)" + "O(log1024)" 가 될것이다. 그럼으로 대략 11 번정도에 원하는 값을 찾을수 있게 된다. 최초 map 만을 사용했을때의 20 에 비하면 거의 절반수준으로 줄어듦을 알수 있다.

물론 위의 상황은 매우 이상적인 상황이며, 실제로는 위의 경우에 비해서 더 많은 시간이 소모될것이다. 왜냐하면 위의 단순한 해쉬함수로는 IP가 균등하게 0-1023 으로 분포되기가 힘들것이기 때문이다. 해쉬함수를 만드는데 있어서 가장 중요한 부분은 바로 입력데이타들에 대해서 최대한 균등하게 분포되는 해쉬값을 얻어내는 것인데, 위의 함수로는 분명히 균등하게 분포되지 않을것이다. 하지만 일단은 위의 해쉬함수를 사용하도록 하겠다. 좀더 성능좋은 해쉬함수는 많은 고민과 테스트를 통해서 만들어 질수 있을것이다.(공개된것들도 많이 있으니 google 등을 통해서 확인해보기 바란다)


2.1.2절. 최종적인 자료구조

다음은 이렇게 해서 만들어진 최종적인 자료구조이다.

그림 1. hash_map 자료구조

hash 값을 위한 1024 크기의 vector 가 놓인다. 이 vector 은 IP,PORT 를 key 로 가지고, TIME, COUNT 를 value로 가지는 map 을 원소로 가진다. TIME 은 패킷이 들어온 시간이며, COUNT 는 몇번 들어왔는지를 계수하기 위해서 사용된다. 이 map 의 자료구조는 대충 다음과 같을 것이다.
typedef struct _key
{
    int ip;
    int port;
} key;

typedef struct _value
{
    int time;
    int count;
} value;

struct ltstr
{
    bool operator() (key a1, key a2)
    {
        return memcmp((void *)&a1, (void *)&a2, 8) < 0;
    }
};
map<key, value, ltstr> mydata;
				
ltstr 을 주목하기 바란다. ltstr 은 key 를 정렬하기 위해서 정의되었는데, memcmp 를 이용해서 2개의 구조체의 크기를 비교한다. 2개의 int 를 가지고 있음으로 8byte 크기로 비교하면 될것이다. 다음은 테스트를 위한 간단한 코드이다.

map_test.cc

#include <vector>
#include <map>
#include <iostream>

using namespace std;

// map 의 KEY 값으로 ip와 port 정보를 가진다.
typedef struct _key
{
    ulong ip;
    int port;
} key;

// map 의 VALUE 값으로 처음 만들어진 시간과 
// 카운트 갯수이다. 
typedef struct _value
{
    int time;
    int count;
} value;

// KEY 인 struct key 를 정렬하기 위해서 사용한다. 
// memcmp 를 이용해서 정렬하도록 한다. 
struct ltstr
{
    bool operator() (key a1, key a2)
    {
        return memcmp((void *)&a1, (void *)&a2, 8) < 0;
    }
};

int main()
{
    key mkey;
    value mvalue;
    map<key, value, ltstr> mydata;

    mkey.ip = 134;
    mkey.port = 4;
    mydata[mkey] = mvalue;

    mkey.ip = 134;
    mkey.port = 3;
    mydata[mkey] = mvalue;

    mkey.ip = 132;
    mkey.port = 1;
    mydata[mkey] = mvalue;

    map<key,value,ltstr>::iterator mi;
    mi = mydata.begin();
    while(mi != mydata.end())
    {
        cout << mi->first.ip << ","<< mi->first.port << endl;
        *mi++;
    }
}
				
정렬(:12)순서는 먼저 ip를 우선으로 한다. 만약 ip 가 동일할경우 port 번호로 정렬이 될것이다. 아직은 만들어진 map 정보를 hash 테이블에 입력하지 않았는데, 2.1.3절에서 설명할것이다.


2.1.3절. skeleton code (골격 코드)

이제 실제적인 골격 코드를 만들어 보도록 하자. 이건 어디까지나 지금까지 우리가 생각했던 자료구조가 실제 구현될수 있는지를 확인해 보는 테스트수준의 코드이다.

이 어플리케이션은 IP와 Port 번호 를 이용해서 Key 를 만들고 만약 동일한 IP, Port 에서 연결이 들어왔다면, count 를 1씩 증가시킨다. 만약 지정된 시간(위으 코드에서는 30초) 동안 100 번이상의 count 가 발생할경우 Dos 공격이라고 판단하고 워닝 메시지를 출력한다.

#include <vector>
#include <map>
#include <time.h>
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>

using namespace std;

typedef struct _key
{
    ulong ip;
    int port;
} key;

typedef struct _value
{
    int time;
    int count;
} value;

struct ltstr
{
    bool operator() (key a1, key a2)
    {
        return memcmp((void *)&a1, (void *)&a2, 8) < 0;
    }
};
int hash(ulong ip)
{
    return ip >> 22;
}

map<key, value, ltstr> mydata;
vector<map<key, value, ltstr> > hash_map(1024);

void InitValue(value *mvalue);
int insert_packet(key akey);

int main()
{
    key mkey;
    value mvalue;

    // 테스트용 자료 입력 ----------------
    mkey.ip = inet_addr("192.168.100.1");
    mkey.port = 21 ;
    insert_packet(mkey);

    mkey.ip = inet_addr("192.168.100.1");
    mkey.port = 2;
    insert_packet(mkey);

    mkey.ip = inet_addr("192.168.100.3");
    mkey.port = 22;
    insert_packet(mkey);

    mkey.ip = inet_addr("192.168.100.200");
    mkey.port = 80;
    insert_packet(mkey);

    mkey.ip = inet_addr("192.168.100.1");
    mkey.port = 2;
    insert_packet(mkey);
    // 테스트용 자료 입력 ----------------


    // 테스트를 위해서 출력한다. 
    // 192.168.100.1 에 대한 hash 키를 만들고 
    // 값을 출력해본다.  
    mkey.ip = inet_addr("192.168.100.1");
    mkey.port = 2;

    map<key, value, ltstr>::iterator mi;

    cout << hash(mkey.ip) << endl;
    mi = hash_map[hash(mkey.ip)].find(mkey); 
    if (mi == hash_map[hash(mkey.ip)].end())
    {
        cout << "not found " << endl;
    }
    else
    {
        cout << mi->second.time << ":" << mi->second.count << endl;
    }
}

// key 를 입력받아서 
// hash 키를 만들어서 해당 hash_map 자료구조에 입력한다. 
int insert_packet(key akey)
{
    value mvalue;
    int hash_value = hash(akey.ip);
    map<key,value,ltstr>::iterator mi;
    mi = hash_map[hash_value].find(akey);

    InitValue(&mvalue);

    // 만약 이미 존재하고 있다면
    // count 를 증가 시킨다. 
    if (mi != hash_map[hash_value].end())
    {
        cout << "Count 증가" << endl;
        mi->second.count++;
    }
    // 그렇지 않다면 새로 생성한다. 
    else
    {
        cout << "Insert " << endl;
        hash_map[hash_value].insert(pair<key, value>(akey, mvalue));
    }
}

void InitValue(value *mvalue)
{
    mvalue->time = time((time_t *)NULL); 
    mvalue->count = 1;
}
				


2.2절. 문제 발생(효율 문제)

비록 골격 코드이긴 하지만 위의 코드가 비교적 효율적으로 작동하리란걸 예상할수 있다. 그러나 이건 어디까지나 데이타 입력의 경우이다.

위의 방식으로 해서 데이타가 계속 쌓이는 건 그렇다 치고, 저런 상태로 몇시간만 돌면 분명히 map 에는 수십/수백만건의 데이타가 쌓이게 될것이다. 기본적으로 카운터가 일정수를 넘어가야만 지워지기 때문에 카운터가 초과했을경우 자료구조에서 지워진다고 하더라도, 그렇지 않은 수많은 데이타는 지워지지 않을것이기 때문이다. 결국 몇시간도 안되어서 모든 시스템 메모리 자원을 몽땅 써버리게 될것이다.

이러한 문제의 해결을 위해서 value 에 time 이라는 변수를 두긴 했음으로, 일정시간 마다 time 를 확인하고, 초과된 메시지는 지워줌으로써 해결이 가능할 것이다.

그렇다면 어떻게 지워줄것인가.. 단순한방법은 vector 의 0번부터 1023번까지의 모든 map 을 순환하면서 일일이 time 비교를 해서 time 이 초과했을경우 지워주는 방법인데, 척 봐도 알겠지만, 너무너무 비효율적이다. 데이타가 십만 혹은 백만 정도 들어있다고 가정해 보자. 제대로 작동하는걸 기대하기 힘들것이다.


2.2.1절. 해결 - 이터레이터 저장하기

이럴경우 생각할수 있는 방법이 garbage 데이타 정리를 위한 별도의 자료구조를 하나 만들어서 유지하는 것이다. 여러가지 자료구조를 생각할수 있는데, 여기에서는 multimap을 사용하도록 했다. key 는 time 이 될것인데, 이 time 이 분명히 겹칠수 있을것이기 때문이다.

multimap<int, map<key.value.ltstr>::iterator> garbage_collect;
				
아이디어는 간단하다. 위에서 우리가 map 데이타를 입력할때 동시에 위의 garbage_collect 자료구조에 map데이타의 iteraotr 을 입력시켜주는 것이다. 그리고 일정시간이 지나면 garbage_collect 자료구조를 순환하면서 시간이 초과된 iterator 에 대해서 삭제를 해주면 될것이다. 포인터를 별도로 저장해놓고 포인터를 이용해서 garbage 데이타를 삭제해주는 것과 비슷한 방식이다.

시간 복잡도를 계산해보면 garbage_collect 에서 원하는 범위의 원소를 삭제하는데 걸리는 시간 "O(log(size()) + N)", garbage_collect 에서 나온 iterator 를 이용해서 삭제시키는데 걸리는 시간 O(1) + N, ip 를 hash 로 바꾸는데 걸리는 시간 O(1) + N 정도가 된다.

O(log(size()) + N) + O(2N) 
				
N 은 삭제해야할 원소의 갯수이다. 예를들어 1000000 개의 데이타에서 100 개의 데이타를 삭제해야 한다고 가정했을때, 6+100+200 = 306 정도가 될것이다.

hashvector 의 처음부터 끝까지 순환하는 무식한 방법을 사용했을경우 순환에 걸리는 시간 N 에 비교하는데 걸리는 시간, 삭제하는데 걸리는 시간등을 예상하면 최소 2000000 시간이 소모될것이다. (순환에 걸리는 시간 1000000 + 비교하는데 걸리는시간 1000000, 거기에 삭제할 원소 N개)

그럼 간단하게 테스트 코드를 하나 만들어 보도록 하겠다. 비록 테스트 코드이긴 하지만 "이터레이터 저장" 아이디어가 현실적인지는 확인해 볼수 있을 것이다.

iterator_map.cc

#include <map>
#include <string>
#include <iostream>
using namespace std;

typedef struct _mydata
{
    int age;
    int num;
} mydata;

int main()
{
    mydata md;

    // map 자료구조및 iterator 선언
    map<int, mydata> data[1024];
    map<int, mydata>::iterator mi, my;

    // iterator 를 저장하기 위한 map 과 
    // 이것의 iterator 선언
    map<int, map<int,mydata>::iterator> garbage_map;
    map<int, map<int,mydata>::iterator>::iterator mmi;

    // 테스트용 데이타 입력
    md.age = 12;
    md.num = 1;
    data[0][1] = md;

    md.age = 19;
    md.num = 3;
    data[0][3] = md;

    md.age = 28;
    md.num = 2;
    data[0][2] = md;

    md.age = 40;
    md.num = 22;
    data[0][4] = md;

    md.age = 42;
    md.num = 26;
    data[0][5] = md;

    md.age = 38;
    md.num = 12;
    data[0][6] = md;
    // 테스트용 데이타 입력 종료

    int i = 1;
    mi = data[0].begin();
    // 테스트용 데이타를 iterator 로 순환하면서 
    // 해당 iterator 를 iterator 를 저장하기 위한 map 에 
    // 입력 
    while(mi != data[0].end())
    {
        // 입력 디버깅용 출력
        cout << mi->first << ": " << mi->second.num <<  ","
            << mi->second.age<< endl;
        // iterator 를 자료구조에 저장함
        garbage_map[i] = mi;
        *mi++;
        i++;
    }
    cout << "==============================" << endl;

    // key 의 값이 4보다 더큰 이터레이터를 모두 삭제한다.   
    mmi = garbage_map.find(4);
    while(mmi != garbage_map.end())
    {
        cout << "erase " << endl;
        data[0].erase(mmi->second);
        *mmi++;
    }

    // garbage map 에서 필요없는 데이타를 삭제한다. 
    mmi = garbage_map.find(4);
    garbage_map.erase(mmi, garbage_map.end());
    cout << "iterator num "  << garbage_map.size() << endl;

    cout << "==============================" << endl;

    // map 데이타를 출력한다. 디버깅용  
    mi = data[0].begin();
    while(mi != data[0].end())
    {
        cout << mi->first << ": " << mi->second.num <<  ","
            << mi->second.age<< endl;
        *mi++;
    }
    cout << "==============================" << endl;
}
				
간단하게 테스트 가능할 거다.


3절. 결론

이상 간단하게 STL에 대한 몇가지 이슈에 대해서 알아보았다. 개인적으로는 DOS ATTACK 검출을 위한 좀더 완벽한 예제를 들어보고 싶었으나 그렇게 하기 위해선 패킷캡쳐와 이를 효율적으로 분석해서 검출해 내는 방법에 대해서 좀더 고민해야 하기 때문에(한마디로 귀찮아서) 완벽한 예제를 만들지는 못했다.

언젠가 이 문서의 후속편이 만들어지면 좀더 완벽한 예제를 포함시키도록 하겠다.