Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Thread Pooling

Thread Pooling

윤 상배

dreamyun@yahoo.co.kr

교정 과정
교정 0.72003년 1월 24일 14시
Thread Pool 구성도 이미지 추가,문서 히스토리 추가


1절. Thread Pooling

1.1절. Thread Pooling 이란

pool 의 사전적인 뜻을 찾아보면 연못, 저수지, 수영장 풀 등 "무엇을 담아놓는" 의 뜻을 가진다. 이대로 해석하자면 Thread Pooling 이란 쓰레드를 담아 놓는 용기(메모리가 될것이다) 를 뜻하며, 프로그래밍 측면에서 해석하자면, "미리 쓰레드를 할당시켜 놓는기법" 을 뜻한다.

그렇다면 쓰레드를 미리 할당시켜 놓는 이유에 대해서 생각해보자, 지금까지 이 사이트에서 다루었던 쓰레드프로그래밍 기법은 기본적으로 fork(2) 방식과 매우 비슷하며, 쓰레드를 생성시켜야 될 필요가 있을때 pthread_create(3)등의 함수를 이용하여 새로운 작업쓰레드를 생성시키는 방식을 사용했다. 보통 쓰레드프로그래밍은 네트웍 프로그래밍시 주로 사용됨으로 accept(2) 로 연결을 기다리다가 연결이 만들어지면 accept 에서 넘어온 소켓 지시자를 인자로 하는 쓰레드를 생성했다.

이러한 방식 - 요청이 있을때 쓰레드를 생성시키는 - 의 쓰레드 프로그래밍기법은 대부분의 작업을 처리하기에 충분히 효율적이며, 빠르긴하지만 클라이언트로 부터의 연결과 종료가 매우 바쁘게 일어나는 서버의 경우, 계속적으로 쓰레드를 생성하고 종료해야 하는 비용을 무시할수 없게 된다. 쓰레드가 비록 fork()에 비해서 생성과 소멸시에 훨씬 적은 비용을 소모한다고는 하지만, 이건 어디까지나 상대적인 것으로 실상은 꽤 많은 시간과 비용을 소비하는 작업이다. 특히 Linux 에서의 Pthread 의 경우 clone(2)를 이용한 구현임으로 더욱더 많은 비용을 소비하게 된다.

Thread Pooling 은 이러한 반복적인 쓰레드의 생성/소멸에 의한 비효율적인 측면을 없애고자 하는 목적으로 만들어진 프로그래밍 기법이다.


1.1.1절. Thread Pool의 구현방식

개념적으로 보자면 Thread Pool 을 구성하는건 매우 간단하다. 생성하고자 하는 크기만큼 ptread_create() 함수를 돌리면 되기 때문이다.

하지만 이건 어디까지나 개념적인 것으로 대부분의 경우 각각의 쓰레드를 스케쥴링 해주어야 함으로, 때에 따라서는 구현을 위해서 매우 복잡한 프로그래밍 기법을 동원해야 할때도 있다. 간단히 웹 서버를 Thread Pool 로 구현한다고 가정을 해보자 - 보통 웹서버는 HTTP 의 특성상 연결과/종료가 빈번하게 일어 남으로 쓰레드풀을 사용할경우 많은 이익을 얻을수 있다 -, 만약 100 개의 Thread 를 미리 생성시켰고, 각각의 Thread 는 하나의 클라이언트 연결을 처리한다고 가정했을때, main 쓰레드는 accept(2) 를 통해서 클라이언트를 받아들였을때, accept() 로 만들어진 소켓 지정번호를 미리 만들어진 100 개의 쓰레드중 "놀고" 있는 쓰레드에게 넘겨주어야 할것이다. 그러기 위해서는 main 쓰레드에서 각각의 쓰레드 상태를 유지해서 적당한 쓰레드에게 파일지정자를 넘겨줘야 할것이다.

그나마 위의 경우는 하나의 쓰레드가 하나의 연결을 처리함으로 어렵지 않게 구현하겠지만, 만약 100개의 쓰레드가 있고, 거기에 각각의 쓰레드가 10개 씩의 클라이언트 연결을 처리하도록 구성한다면, 거기에다가 적당한 로드밸런싱 기능 까지 포함시키고자 한다면, 구현이 꽤 복잡해 질수도 있다.

그림 1. Thread Pool 구성도

위는 Thread Pool 의 대략적인 구현상태를 그림? 으로 나타낸 것이다. Thread Pool 에 들어있는 각각의 쓰레드를 관리하기 위해서는 필수적으로 각각의 쓰레드의 상태를 가지고 있는 Schedul 자료구조 를 가지고 있어야한다. 그래야만 MAIN THREAD 에서 쓰레드 상태를 확인해서 적당한 쓰레드로 작업분배가 가능할것이기 때문이다. - 실제 Linux 커널도 각각의 task 의 스케쥴링을 위해서 task 구조체를 유지한다. -


1.1.2절. 구현 프로세스

이제 구현방식에 대한 밑그림이 나왔으니, 실제로 구현을 위한 프로세스를 만들어 보도록 하자. 프로세스는 슈도코드로 구성을 하도록 하겠다. 네트웍 서버 작성을 기준으로 하겠다.

스케쥴관련 자료구조
{
   현재 연결된 클라이언트수
   현재 처리해야될 클라이언트 소켓지시자

   쓰레드풀에 만들어진 쓰레드 상태 : 쓰레드풀 크기만큼의 배열 
   {
       0 이면 휴식상태 
       1 이면 작업상태 
       처리중인 소켓지시자
   }
};

main 함수시작
{
    아규먼트로 몇개의 쓰레드를 생성할지를 받음
    while(쓰레드 생성수만큼)
    {
        pthread_create 를 이용해서 쓰레드 생성
        // 통신쓰레드 함수
        {
            WAIT:
            main 쓰레드가 깨우길 기다린다. 
            만약 main 쓰레드로 부터 깨움이 있다면   
            {
                스케쥴 자료구조->현재 처리해야될 소켓지시자 를 읽어온다. 
                스케쥴 자료구조->자신의 상태를 1로 세팅한다.  
                스케쥴 자료구조->처리중인 소켓지시자를 세팅한다. 
                while(1)
                {
                    클라이언트와 통신한다. 
                    만약 에러가 발생하면 
                    {
                        스케쥴 자료구조->처리중인 소켓지시자를 0으로 세팅  
                        스케쥴 자료구조->자신의 상태를 0으로 세팅
                        스케쥴 자료구조->현재 연결된 클라이언트수 --; 
                        goto WAIT:
                    }
                }
            }
        } 
    }

    // main 쓰레드
    while(1)
    {
        만약 accept 를 통해서 연결이 발생한다면
        {
            스케쥴관련 자료구조->현재연결된 클라이언트수가 MAX 를 초과하지 않았다면
            {
                스케쥴관련 자료구조->현재연결된 클라이언트수 ++; 
                스케쥴관련 자료구조->현재처리해야될 클라이언트 소켓지시자 = accept();
                스케줄관련 자료구조->쓰레드풀에 만들어진 쓰레드상태 가 0인 
                  쓰레드를 찾아서 해당 쓰레드를 깨운다.  
            }
            그렇지 않고 초과했을경우
            {
                클랑리언트에게 에러메시지를 전송한다. 
            }
        }
    }
}
				
구현은 구현하는 프로그래머가 상황에 따라서 선택하기 나름이긴 하지만 보통은 위의 방법을 기본으로 해서, 약간의 변경을 가하는 정도가 될것이다. 위의 슈도코드를 보면 main 쓰레드에서 accept 를 받으면 휴식상태에 있는 쓰레드를 깨운다고 되어있는데, 이때 깨우기 위해서는 쓰레드 조건변수를 사용하면 될것이다.

그렇다면 스케쥴관련 자료구조는 어떻게 구현하는게 쉬운방법인지 생각해보도록 하자. 구현하는 방법은 프로그래머 맘이겠지만, 필자가 구현하고자 한다면 multimap 을 이용해서 구현할것이다. 이 자료구조는 아마 다음과 같을것이다.

// 쓰레드 정보 구조체
struct ph
{
   int sockfd;    // 처리중인 소켓지정번호
   int index_num; // 쓰레드의 인덱스 번호
};

// 쓰레드 구조체 MAP
multimap<int, struct ph> phinfo;

struct schedul_info
{
    int client_num;      // 총 연결중인 클라이언트수 
    int current_sockfd;  // 가장최근에 연결된 소켓지정번호
    phinfo mphinfo;      // 쓰레드 구조체 map
} 
				
멀티맵의 key 는 쓰레드의 활성화 여부로 1 혹은 0이 된다. 그리고 value 는 해당 쓰레드 정보가 될것이다. 이렇게 멀티맵으로 만든이유는 간단하다. 멀티맵은 정렬연관 컨테이너 임으로 key 를 기준으로 자동적으로 정렬이 될것이다. 만약 첫번째 쓰레드가 처리중(1)로 변경되었다면 이 원소는 multimap 의 가장 뒤로 정렬이 될것이다. 그럼으로 우리는 클라이언트의 수가 총연결가능한 클라이언트수(Thread Pool 에 생성된 쓰레드수) 를 초과하지 않는한 phinfo.begin() 으로 가져온 쓰레드는 휴식상태(0) 이라는걸 믿을수 있게 된다. 다시 말해서 복잡해서 쓰레드상태가 0인지 1인지 처음부터 검사할 필요가 없다는 뜻이다.
  1 2 3 4 5 6 7    99 100  : 쓰레드 번호
 +-+-+-+-+-+-+-+---+-+-+
 |0|0|0|0|0|0|0|...|0|0|
 +-+-+-+-+-+-+-+---+-+-+

 --> 연결이 들어왔다면 
  1 2 3 4 5 6 7    99 100  : 쓰레드 번호
 +-+-+-+-+-+-+-+---+-+-+
 |1|0|0|0|0|0|0|...|0|0|
 +-+-+-+-+-+-+-+---+-+-+
  |                   |  
  +----------->-------+
  가장 뒤로 자동으로 sort 됨

 --> Sort 후
  2 3 4 5 6 7 8   100 1 : 쓰레드 번호
 +-+-+-+-+-+-+-+---+-+-+
 |0|0|0|0|0|0|0|...|0|1|
 +-+-+-+-+-+-+-+---+-+-+

 --> 클라이언트가 99개가 접속해 있을경우
 +-+-+-+-+-+-+-+---+-+-+
 |0|1|1|1|1|1|1|...|1|1|
 +-+-+-+-+-+-+-+---+-+-+

 그럼으로 begin() 을 사용하게 될경우 
 언제나 휴식상태에 있는 쓰레드를 가져올수 있음 
				
사실 multimap 을 쓴다면 굳이 "현재 연결된 클라이언트 수" 를 유지하기 위해서 별도의 변수를 둘 필요가 없을것이다. multimap 에서 제공하는 count() 를 이용해서 key 가 "1" 인 요소의 수를 구하면 되기 때문이다. 만약 multimp 의 begin() 값이 1 이라면 MAX 클라이언트가 가득찼다는걸 의미할것이다.

물론 multimap 의 경우 기본적으로 key 값의 수정은 허용하지 않기 때문에 0 을 1로 변경할경우 실제로는 0 을 가지는 요소를 삭제하고, 1을 가지는 새로운 요소를 삽입하는 방식을 취해야 할것이다. 마찬가지로 클라이언트가 종료해서 1을 0으로 변경할때에도 삭제/인서트를 해야할것이다. Value(값) 는 그대로 복사해서 삭제/인서트를 해야 한다.

이 방법이 번거롭다면, 그냥 배열을 쓰거나 혹은 다른 어떤 자료구조를 쓰더라도 전혀 관계없기는 하다. 그건 자기의 기호에 맞게 선택해서 사용하면 될문제이다.



1.2절. 예제

지금까지 Thread POOL 의 구현방법에 대해서 알아봤으니, 간단하게 구현해 보도록 하겠다. 이 코드는 지극히 기능구현에만 신경쓴 코드이다. 에러처리와 몇군데 mutex잠금처리는 각자의 재량에 맡기겠다.

예제 : pool_echo.cc

#include <map>
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <signal.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

// 최대 쓰레드 POOL 크기
#define MAX_THREAD_POOL 256
using namespace std;

// 전역 쓰레드 구조체 
typedef struct _ph
{
    int sockfd;    // 현재 사용중인 소켓 fd
    int index_num; // 인덱스 번호
} ph;

// 전역쓰레드 구조체로써 
// 현재 쓰레드 상황을 파악함
struct schedul_info
{
    int client_num;       // 현재 연결된 클라이언트수
    int current_sockfd;   // 가장최근에 만들어진 소켓지시자
    multimap<int, ph> phinfo; 
};

// 각 쓰레드별 조건변수
pthread_cond_t *mycond;
// 쓰레드 동기화를 위한 조건변수
pthread_cond_t async_cond = PTHREAD_COND_INITIALIZER;

// 각 쓰레드별 조건변수의 크리티컬세션 지정을 위한 
// 뮤텍스 
pthread_mutex_t mutex_lock= PTHREAD_MUTEX_INITIALIZER; 
// 쓰레드 동기화용 조건변수의 크리티컬세션 지정을 위한 
// 뮤텍스
pthread_mutex_t async_mutex = PTHREAD_MUTEX_INITIALIZER;

// 클라이언트와의 통신용 쓰레드
void *thread_func(void *data);
// 현재 클라이언트 상태 모니터용 쓰레드
// 한마디로 디버깅용 
void *mon_thread(void *data);

schedul_info s_info;

// 메인 함수
int main(int argc, char **argv)
{
    int i;
    ph myph;
    int status;
    int pool_size = atoi(argv[2]);
    pthread_t p_thread;
    struct sockaddr_in clientaddr, serveraddr;
    int server_sockfd;
    int client_sockfd;
    int client_len;    

    // 풀사이즈 검사
    if ((pool_size < 0) || (pool_size > MAX_THREAD_POOL))
    {    
        cout << "Pool size Error" << endl;
        exit(0);    
    }

    // Make Socket
    if ((server_sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
        perror("error : ");
        exit(0);
    }

    // Bind
    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(atoi(argv[1]));

    if (bind (server_sockfd, (struct sockaddr *)&serveraddr, 
                        sizeof(serveraddr)) == -1) 
    {
        perror("bind error : ");
        exit(0);
    }

    // Listen
    if (listen(server_sockfd, 5) == -1)
    {
        perror("listen error : ");
        exit(0);
    }

    // 쓰레드 갯수만큼 조건변수 생성 
    mycond     = (pthread_cond_t *)malloc(sizeof(pthread_cond_t)*pool_size);

    // 쓰레드 전역변수 초기화
    s_info.client_num = 0;
    s_info.current_sockfd = 0; 

    // 쓰레드 POOL 새성
    for (i = 0; i < pool_size; i++)
    {
        memset((void *)&myph, 0x00, sizeof(myph));
        myph.index_num = i;
        s_info.phinfo.insert(pair<int, ph>(0, myph));

        // 조건변수를 이용해서 쓰레드간 동기화를 실시한다.
        pthread_mutex_lock(&async_mutex);
        if (pthread_create(&p_thread, NULL, thread_func, (void *)&i) < 0)
        {
            perror("thread Create error : ");
            exit(0);    
        }    
        pthread_cond_wait(&async_cond, &async_mutex); 
        pthread_mutex_unlock(&async_mutex);
    }

    // 디버깅용 쓰레드 생성
    pthread_create(&p_thread, NULL, mon_thread, (void *)NULL);

    // MAIN THREAD accept wait
    client_len = sizeof(clientaddr);

    // 클라이언트 ACCEPT 처리를 위한 
    // MAIN 쓰레드 
    while(1)
    {
        multimap<int, ph>::iterator mi;
        client_sockfd = accept(server_sockfd, (struct sockaddr *)&clientaddr, 
                                        (socklen_t *)&client_len);
        if (client_sockfd > 0)
        {
            // 만약 쓰레드풀이 가득찼다면 클라이언트 연결을
            // 종료시킨다.
            mi = s_info.phinfo.begin();
            if (mi->first == 1)
            {
                //error message send to client_sockfd
                cout << "SOCKET IS FULL" << endl;    
                close(client_sockfd);
            }
            // 그렇지 않다면 연결을 받아들이고 
            // 클라이언트 전역 변수를 세팅한다. 
            // 세팅후 해당 처리쓰레드에게 시그널을 보내어서 
            // 처리하게 한다. 
            else
            {
                ph tmpph;
                int psockfd;
                int pindex_num;
                s_info.current_sockfd = client_sockfd;

                tmpph.sockfd = client_sockfd;
                tmpph.index_num = mi->second.index_num;
                s_info.phinfo.erase(mi);    
                s_info.phinfo.insert(pair<int, ph>(1,tmpph));
                s_info.client_num ++;
                cout << "SEND SIGNAL " << mi->second.index_num << endl;     
                pthread_cond_signal(&mycond[mi->second.index_num]);    
            }
        }
        else
        {
            cout << "ACCEPT ERROR " << endl;    
        }
    }
    pthread_join(p_thread, (void **)status);
}

void *thread_func(void *data)
{
    char buf[255];
    int mysocket;
    int mynum = *((int *)data); 
    multimap<int, ph>::iterator mi;
    // 쓰레드 동기화용 조건변수
    pthread_mutex_lock(&async_mutex);
    pthread_cond_signal(&async_cond);
    pthread_mutex_unlock(&async_mutex);

    cout << "Thread create " << mynum << endl;
    while(1)
    {
        // MAIN 쓰레드로 부터 신호를 기다린다. 
        // 신호가 도착하면 쓰레드 전역변수로 부터 
        // 현재 처리해야할 소켓지정값을 가져온다. 
        pthread_mutex_lock(&mutex_lock);
        pthread_cond_wait(&mycond[mynum], &mutex_lock);
        mysocket = s_info.current_sockfd;
        pthread_mutex_unlock(&mutex_lock);
        memset(buf, 0x00, 255);    

        // 데이타를 처리한다. 
        // 만약 quit 문자열을 만나면 
        // 쓰레드 전역변수를 세팅한다음 연결종료 한다. 
        while(1)
        {
            read(mysocket, buf, 255);
            if (strstr(buf, "quit") == NULL)
            {
                write(mysocket, buf, 255);
            }
            else
            {
                mi = s_info.phinfo.begin();
                while(mi != s_info.phinfo.end())
                {
                    cout << "search " << mi->second.index_num << endl;
                    if (mi->second.index_num == mynum)
                    {
                        ph tmpph;
                        tmpph.index_num = mynum;
                        tmpph.sockfd = 0;
                        s_info.phinfo.erase(mi);
                        s_info.phinfo.insert(pair<int, ph>(0, tmpph));
                        s_info.client_num --;
                        close(mysocket);
                        break;
                    }
                    mi ++;
                }
                break;
            }
            memset(buf, 0x00, 255);    
        }
    }
}

void *mon_thread(void *data)
{
    cout << "moniter thread" << endl;
    while(1)
    {
        sleep(10);
        multimap<int, ph>::iterator mi;
        mi = s_info.phinfo.begin();
        cout << "size " << s_info.phinfo.size() << endl;
        while(mi != s_info.phinfo.end())
        {
            cout << mi->first << " : " << mi->second.index_num 
                 << " : " << mi->second.sockfd << endl; 
            mi ++;
        }
    }
}
			
이 프로그램은 2개의 인자를 받아들이며, 클라이언트의 입력을 되돌려주는 일을한다 (echo 서버). 첫번째 인자는 서비스할 PORT 번호이고, 두번째 인자는 쓰레드 생성갯수이다. 프로그램은 인자의 정보를 이용해서 PORT 를 열고 클라이언트를 받아들인다. 클라이언트가 연결하면, Thread Pool 에 남는 공간이 있는지를 확인하고, 남는 공간이 있다면 클라이언트와 통신하게 된다.

단지 쓰레드를 미리 생성시키고 나서, 이것을 스케쥴링하기 위한 코드가 몇줄 추가되었을 뿐 특별히 복잡한 코드는 아닐거라고 생각된다.


2절. 결론

이상 간단한 쓰레드 풀의 작성요령에 대해서 알아보았다. 위에서 설명했듯이 쓰레드 풀이란 개념적인 요소에 가까움으로 어떻게 구현할지는 상황에 따라서 매우 달라지게 되며, 위의 예제는 그러한 여러가지 상황중 가장 기본적인 상황을 예로 해서 만들어진 것이다. 어쨋든 위의 예제를 충분히 이해한다면 다른 상황으로의 응용역시 별 어려움없을 것이라고 생각된다.

쓰레드 풀은 보통 매우 효율적인 성능을 보장해주는 어플리케이션의 작성을 위해서 사용되어짐으로, 가능한한 빠른 쓰레드간 전환이 가능하도록 고민해서 코딩을 해야 한다. 위의 경우 쓰레드간 전환을 위해서 multimap 을 사용하고 있는데, accept 가 들어왔을경우 해당 클라이언트에 대한 쓰레드 할당은 매우 빠르다고 볼수 있을것이다. 그러나 종료할경우에는 multimap 의 첫번째 원소부터 마지막번 원소까지 search 해야 한다. 이것은 매우 비효율적임으로 개선할 여지가 있다. 가장 간단하게 생각할수 있는 것은 multimap 의 key 값이 1인 원소내에서만 검색하는 것이다. 우리는 쓰레드 풀의 크기와 현재 연결된 클라이언트의 수를 알고 있음으로, multimap 의 몇번째 요소부터 key 값이 1인지를 계산해 낼수 있기 때문이다. 이렇게 할경우 약간의 시간단축효과를 기대할수 있을것이다.

   1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |0|0|0|0|0|0|0|0|1|1|1|1|1|1|1|
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
                  |             |
                  +-------------+ 
                      15 - 8 
		
이 시간단축효과는 연결된 클라이언트의 수가 전체 POOL 사이즈에 비례해서 작을 수록 커질것이다.

나머지 방법은 각자 고민을 해보기 바란다. 아마 전혀 다른 자료구조를 사용할수도 있을것이다.