메뉴

문서정보

쓰레드간 메시지전달

목차

소개

쓰레드 프로그래밍을 할 때 가장 신경쓰이는건 역시 쓰레드동기화(:12)와 쓰레드간 메시지 전달과 관련된 문제일 것이다. 또한 쓰레드간 메시지 전달에는 쓰레드 동기화 문제까지 함께 고민해야 한다. 이 문서는 다중쓰레드에서 쓰레드간 메시지를 효과적으로 전달하기 위한 다양한 방법들을 기술한다.

여기에서 소개하는 방법들은 수많은 방법들 중 몇가지 방법들일 뿐이다. 실제 프로젝트에서는 다양한 응용을 생각해야 할 것이다.

시나리오

영어문서를 파싱해서 Term을 얻어오고, 출현한 Term의 빈도수를 계수하는 프로그램을 만들도록 하겠다. 빠른 파싱을 위해서, 문서가 주어지면 문서를 라인수를 기준으로 4등분 한다음, 4개의 쓰레드를 돌려서 병렬로 처리하도록 할 것이다. 이 프로그램은 다음의 사항을 만족시켜야 한다.
  1. Main 쓰레드는 문서를 4등분한다음 생성된 work thread갯수를 파악한다음
  2. 파일을 open()한 후, 4개의 쓰레드에게 읽어야될 파일지정자와 파싱할 줄의 범위를 알려준다.
  3. 파일지정자와 범위를 전달받은 work thread는 해당범위의 문장을 분석해서 <Term, count> 자료구조를 만든다.
  4. 모든 작업이 끝났다면, Main Thread에게 작업이 끝났음을 알려준다. 이때, 자신이 작업한 결과에 대한 정보를 Main Thread에게 알려줘야 한다.
  5. 모든 work Thread에서 작업종료 메시지를 받았다면, Main Thread는 각 work Thread의 <Term, count>를 취합해서, 하나의 파일로 만든다.

구현방안

다음은 문서를 파싱해서 <Term, count>를 얻어오는 프로그램이다. 단일 쓰레드로 작동하는 프로토타입의 프로그램으로 아래의 코드를 멀티쓰레드방식으로 수정할 것이다.

#include <sys/types.h>
#include <regex.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <vector>
#include <string>
#include <iostream>

#include <fcntl.h>

using namespace std;

/*
 * 주어진 문서에서 단어를 얻어온다.
 * 입력된 라인은 strtok를 통해서 토큰으로 먼저 분리되고
 * 분리된 문자열은 정규표현(:12)을 만족하면 Term으로 판단하고 출력한다.
 * 실행인자 : 정규표현식
 * 사 용 예 :
 */

int main(int argc, char **argv)
{
  FILE *fp;
  int rtv;
  regex_t preg;
  char linebuf[1024];
  char *tr;
  char seps[] = " -.,()\";:{}'+@/<>[]|!?#";
  char msg[64];
  int i;
  vector<string> FileList;

  // 파싱할 파일을 등록한다.
  FileList.push_back("rfc.dat");

  if (argc != 2)
  {
    printf("Usage : %s [pattern]\n", argv[0]);
    return 1;
  }

  // 정규표현을 위한 컴파일러 생성
  rtv = regcomp(&preg, argv[1], REG_EXTENDED|REG_NOSUB);
  if(rtv != 0)
  {
    regerror(rtv, &preg, NULL, 0);
    return 1;
  }

  for(int i = 0; i < FileList.size(); i++)
  {
    fp = fopen(FileList[i].c_str(), "r");
    if (fp == NULL)
    {
      perror("Error ");
      return 1;
    }

    while(fgets(linebuf, 1024, fp) != NULL)
    {
      linebuf[strlen(linebuf)-1] = '\0';

      // 토큰으로 구분한다음
      tr = strtok(linebuf, seps);
      while(tr != NULL)
      {
        tr = strtok(NULL, seps);
        if (tr != NULL)
        {
          // 정규표현을 만족하는지 확인한다.
          if (regexec(&preg, tr, 0, NULL, 0) == 0)
          {
            cout << "Find Term : " << tr << endl;
          }
        }
      }
    }
    fclose(fp);
  }
}
다음과 같이 실행하면 된다.
# ./getterm "[a-zA-Z0-9]"

메시지큐 구현

메시지큐 구현에 대한 아이디어는 다음과 같다. attachment:messagequeue.png

worker Thread 관리

주고 받는 데이터에 타입을 두어서 관리하도록 할 것이다.
struct Data
{
	int type;
	char *Data;
	int size;
};
Type는 다음과 같이 정의 할 것이다.
1 << 1 일반 데이터
1 << 2 제어 데이터
이 방식은 구현이 단순하긴 하지만 worker thread -> Main thread로의 단방향 데이터 전송만 가능하다는 단점이 있다. 메시지큐(:12)를 하나더 만드는등 다양한 방법이 있을 수 있는데, 이는 뒤에서(몇가지만) 다루도록 하겠다.

쓰레드 동기화

쓰레드간 동기화는 mutex(:12) 잠금과 조건변수(:12)를 이용할 것이다.

프로시져

대충 그림을 그려야 코드가 만들어 지는 스타일이라서...
main
{
	생성 쓰레드 갯수는 4개로 한다.
	문서를 읽어들여서 문서의 Line수를 계수한다.
	메시지큐(:12)를 생성한다.
	for(i = 0; i < 4; i++)
	{
		// 쓰레드 동기화 Start
		Worker 쓰레드를 생성한다. 인자로 문서의 Offset정보를 넘긴다.
		쓰레드 함수명은 WThread로 한다. 
		// 쓰레드 동기화 End 
	}
	메인 쓰레드를 수행한다.
	while(1)
	{
		메시지 큐로부터 데이터를 읽는다.
		switch(데이터 타입)
		case 일반데이터
		{
			Term을 계수한다.
		}
		case 제어데이터
		{
			종료루틴을 수행한다.
			모든 쓰레드가 작업을 종료했다면, Break; 
		}
	}
	<Term,Count>결과를 출력한다.
}

코드 구현

<!> 미완성 코드다. g++ 로 컴파일 하면, 메시지큐를 통해서 worker thread에서 main thread로 분석된 Term이 전달되는걸 확인할 수 있다. 메시지큐를 통한 데이터 통신의 기본적인 구현은 끝났다고 볼 수 있다. 코드를 좀더 깔끔하게 하고, 몇 군데 예외처리를 해주어야 한다.
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <regex.h>

#include <fcntl.h>
#include <string.h>

#define ThreadNum 4

pthread_mutex_t mutex_lock;
pthread_cond_t  sync_cond;

pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  gcond  = PTHREAD_COND_INITIALIZER;

const char seps[] = " -.,()\";:{}'+@/<>[]|!?#\n"; // Token

// 주고 받을 데이터
struct Data
{
	int queuenum;
	int msgtype; 
	int tid;
	char msg[256];
	int size;
	key_t key_id;
};

// worker 쓰레드에 넘겨줄 정보
struct DocInfo
{
	int tnum;       // thread id
	int start;      // 문서시작 위치
	int end;        // 문서 끝 위치
	char fname[80]; // 작업 파일명
	char regex[24]; // 정규표현 문자열 
};

/*
 * worker 쓰레드 함수
 */
void *Tfunction(void *data)
{
	struct Data lData;
	struct DocInfo *lDocInfo; 
	FILE *fp;
	char line[256];
	int rtv;
	int bsize;
	int readn = 0;
  regex_t preg;
  char *tr;

	lData = *(struct Data *)(data);
	lDocInfo = (struct DocInfo *)lData.msg; 

	bsize = lDocInfo->end - lDocInfo->start;
	printf("DEBUG Thread %d %d\n", lDocInfo->tnum, bsize);
	if((fp = fopen(lDocInfo->fname, "r")) == NULL)
	{
		perror("Fopen Error");
		exit(0);
	}

	lseek(fileno(fp), lDocInfo->start, SEEK_SET);	
  rtv = regcomp(&preg, lDocInfo->regex, REG_EXTENDED|REG_NOSUB);

	pthread_mutex_lock(&mutex_lock);
	pthread_cond_signal(&sync_cond);
	pthread_mutex_unlock(&mutex_lock);

	if (rtv != 0)
	{
		regerror(rtv, &preg, lDocInfo->regex, REG_EXTENDED|REG_NOSUB);
		exit(0);
	}
	for (readn = 0; readn < bsize; )
	{
		if(fgets(line, 256, fp) == NULL)
			break;
		tr = strtok(line, seps);
		if (tr != NULL)
		{
			if (regexec(&preg, tr, 0, NULL, 0) == 0)
			{
				lData.queuenum = 1;
				lData.msgtype = 1 << 1;
				lData.size = strlen(tr);
				sprintf(lData.msg, "%s", tr);

				if(msgsnd(lData.key_id, (void *)&lData, sizeof(lData), IPC_NOWAIT) <0)
				{
					perror("msg snd error");
					exit(0);
				}
			}
		}
		readn += strlen(line);
		sleep(1);
	}
	// 종료 메시지를 보낸다.
}


int main(int argc, char **argv)
{
	struct Data lData;
	struct DocInfo lDocInfo; 

	pthread_t p_thread[ThreadNum];

	int fd;
	char *fname;
	int fsize = 0;
	int blocksize = 0;
	struct stat fileinfo;
	char *regex;

	// Message queue
	key_t key_id;
	int msgtype;	

	fname = argv[1];
	regex =  argv[2];

	// 메시지큐 생성
	key_id = msgget((key_t)8888, IPC_CREAT|0666);
	if (key_id == -1)
	{
		perror("msgget error : ");
		exit(0);
	}

	// Open File
	fd = open(fname, O_RDONLY);	
	if (fd < 0)
	{
		perror("Open file error");
		return 1;
	}
	if(fstat(fd, &fileinfo)< 0) 
	{
		return 1;
	}
	fsize = fileinfo.st_size;
	printf("T Size is %d\n", fsize);

	blocksize =  fsize / ThreadNum; 

	for (int i = 0; i < ThreadNum; i++)
	{
		lDocInfo.start = (i*blocksize);
		lDocInfo.end = lDocInfo.start+blocksize;
		sprintf(lDocInfo.fname, "%s", fname);
		sprintf(lDocInfo.regex, "%s", regex);
		lDocInfo.tnum = i;

		if (i == (ThreadNum -1))
			lDocInfo.end += fsize%ThreadNum;
		lData.queuenum = 1; 
		lData.msgtype = 1 << 2; 
		lData.size = sizeof(lDocInfo); 
		lData.key_id = key_id;
		lData.tid = lDocInfo.tnum; 
		memcpy((void *)lData.msg, (void *)&lDocInfo, sizeof(lDocInfo));

		pthread_mutex_lock(&mutex_lock);
		pthread_create(&p_thread[i], NULL, Tfunction, (void *)&lData);
		pthread_cond_wait(&sync_cond, &mutex_lock);
		pthread_mutex_unlock(&mutex_lock);
	}

	while(1)
	{
		if(msgrcv(key_id, (void *)&lData, sizeof(lData), (1 >> 1), 0)	== -1)
		{
			perror("msgrcv error: ");
		}
		printf("Get Term %d %s\n", lData.tid, lData.msg);
	}
}

공유메모리 구현

공유메모리(:12)는 커널에 메모리 공간을 할당함으로써, 시스템 전역적으로 자원을 사용할 수 있도록 지원하는 IPC(:12) 설비중 하나다. 이러한 특성을 이용 쓰레드간 메시지 교환은 공유메모리(:12)상에 환형큐(:12)를 만드는 것으로 구현가능할 수 있을 것이다.

이 환형큐에는 여러개의 쓰레드가 접근을 하게 될 것이므로, 쓰레드간 동기화가 필수 적일건데, mutex(:12)가 아닌 레코드:::잠금(:12)으로도 쓰레드간 접근제어를 할 수 있다. 레코드 잠금을 통해서 쓰레드동기화를 제어하는 방법은 공유메모리와 파일잠금을 이용한 프로세스간 데이터 공유에서 이미 다룬바가 있다.

쓸만한 예제라고 생각은 하지만 이번 구현에 그대로 적용하기에는 성격이 다르다. 이번 구현은 여러개의 생산자와 하나의 소비자가 존재하는 형식이기 때문이다. 뭐 그렇다고 크게 문제될건 없다. 아래와 같이 생산/소비자 방식을 약간 수정하는 것으로 해결책을 삼았다.

attachment:thread.png

쓰레드간 동기화는 그대로 파일의 레코드 잠금을 이용할 것이다. 이 경우 다수의 생산자를 제어해야 하는데, 처음 8 byte(:12)를 생산자간 접근제어를 위한 잠금으로 사용할 것이다. 처음 레코드에 대한 생산자만이 레코드에 큐(빨간색)에 쓸권한을 가지게 된다.

데이터 쓰기 권한을 얻은 쓰레드는 공유메모리에 데이터를 쓰고, 해당 공유메모리에 대응되는 레코드에 잠금을 풀게된다. 그러면 소비자는 잠금을 얻고, 잠금에 대응되는 공유메모리를 찾아가서 정보를 읽어오면 된다. 예제로 제시한 문서를 이해하고 있다면, 위의 방식으로 수정하는건 그리 어렵지 않을 것이니, 굳이 코드를 만들진 않도록 하겠다.