Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

공유메모리 큐 구성을 통한 효과적인 데이터 처리 프로세스 구현

Contents

소개

동일한 데이터를 여러번 복사해서 사용해야 하는 여러개의 단위 모듈로 이루어진 미들웨어 소프트웨어에서 가장 효율적인 데이터 교환에 대한 방법으로 공유메모리(:12)의 큐 구성에 대해서 알아보도록 한다.

  1. 공유메모리는 (큐)버퍼의 역할이 가능해야 하므로 배열로 구성된다.
  2. 데이터를 동기화 시킬 수 있어야 한다.
  3. 데이터 동기화를 위해서 각 단위 모듈(소비자)은 데이터 입력부분과 처리부분이 분리되어야 한다. 분리를 위해서 버퍼가 사용되어야 한다.
공유메모리를 통한 queue(12) 버퍼의 구성이라고 생각하면 될것 같다. 아마도 다음과 같은 (소프트웨어)시스템 구성을 가질 것이다.

이때 소비자는 각각의 버퍼를 유지할 수 있어야 한다.

공유메모리를 이용한 프로세스간 데이터 공유

미들웨어와 같은 규모가 있는 소프트웨어 만들어야 할경우 여러개의 단위 프로세스(:12)로 이루어지는 경우가 많다. 기능별로 모듈화 하는 방식인데, 예를 들어 통신모듈, 데이터 분석모듈, 데이터 저장 모듈, 시스템 감시 모듈.. 등으로 나누어 지는 경우다.

이렇게 하는 이유는 각 모듈별로 개발자를 할당할 수 있으며, 디버깅 및 유지보수가 수월하기 때문이다. 필요한 일만을 하기 때문에 문제가 발생할 여지도 적고, 문제가 생기더라도 전체 시스템이 멈추는 최악의 상황을 피할 수 있기 때문이다.

데이터 공유시 발생 가능한 문제

이렇게 단위 모듈별로 소프트웨어 시스템이 구성될경우 대략 다음과 같은 구성을 가지게 될것이다.

각 모듈간의 데이터 교환을 위해서 결국 IPC(:12)를 사용하게 될건데, 선택하는 IPC에 따라서 입출력 관련 부하는 그렇다 치더라도, 동일한 데이터를 몇번이고 복사하는 문제가 발생한다.

문제해결을 위한 다양한 제안

공유메모리 + 세마포어

가장 심플한 제안 같지만 몇가지 문제가 보인다. 단지 하나의 생산자와 하나의 소비자로 이루어진 경우에는 문제가 없지만 소비자가 여럿일 경우 세마포어값을 어떻게 제어할 것이냐가 중요한 문제가 된다.

공유메모리 + 세마포어의 핵심이라면 생산자가 데이터를 쌓을 때마다 세마포어 값을 1씩 증가 시키고, 소비자는 세마포어 값을 1씩 감소시키고 만약 0일 경우에는 생산자가 쓴 데이터가 없다는 가정하에 해당영역에서 기다리게 되는 것이다. 그런데 소비자가 여럿일 경우 과연 어느 시점에서 세마포어 값을 감소시켜야 하는지가 애매모호해 진다.

물론 세마포어설비를 소비자 갯수만큼 만들어서 제어하도록 하는 등의 몇가지 방법이 있겠지만 복잡한 관계로 생각하지 않기로 했다. 가능하면 효율적으로 심플하게 작동하는 방법을 찾길 원하기 때문이다.

공유메모리 + 생산자 통지

공유메모리의 가장 앞부분에 현재 생산자가 어디에 쓰고 있는지를 남기는 방법이다. 그럼 소비자는 자기가 읽고 있는 데이터가 생산자의 쓰고 있는 부분과 일치하는지를 알 수 있고 만약 일치한다면 기다리도록 하는 것이다.

심플한 방법이긴 한데, busy wait 상태에 놓이지 않도록 하는게 문제해결의 핵심일것 같다. 소비자가 생산자의 데이터 쓰는 시점을 기다리기 위해서 바쁘게 index값이 변하는지 검사하는 것은 아무래도 좋은 방법이 아니기 때문이다. 이를 위해 다음의 해법을 생각해 보았다.

  • 소비자가 생산자의 index와 일치하게 되어서 기다려야 할경우 생산자로의 통지를 기다리도록 한다. 생산자는 데이터를 쓰게 될경우 이를 통지해야 한다. 이럴 경우 통지를 위한 방법이 중요해진다. 간단히 생각할 수 있는 방법은 signal(2)을 이용하는 방법으로 소비자는 sigsuspend(2)로 기다리고 생산자는 kill(2)등을 이용해서 시그널을 보내도록 하는 방법이다.

*** 공유메모리 + 파일 레코드 잠금 ***

공유메모리가 담을 수 있는 원소의 크기만큼의 크기의 잠금전용 임시 파일을 만든다. 생산자는 데이터의 인덱스 위치에 대응하는 파일의 레코드를 잠근다. 소비자는 데이터를 읽기전에 레코드의 잠금을 먼저 얻도록 코딩한다. 그러면 소비자는 바쁜 대기상태에 놓이지 않으면서도 데이터의 존재 유무를 판단해서 데이터를 읽을 수 있다.

이 방법을 응용하는데 있어서 주의해야 할 점은 생산자가 다음 레코드 잠금을 얻기전에 소비자가 데이터를 읽어 버리는 경우다. 이 문제를 해결하기 위해서 생산자는 반드시 다음 레코드 잠금을 얻은 후 이전 레코드 잠금을 풀도록 코딩해야 한다.

공유메모리 + 파일 레코드 잠금 구현

이번에는 공유메모리와 파일 레코드 잠금을 통해서 구현을 하기로 했다.

아이디어는 간단하다. 생산자는 배열을 가지는 공유메모리와 공유메모리 배열의 크기와 동일한 잠금검사 전용 파일을 생성한다. 생산자는 (공유메모리)배열에 데이터를 적으면 해당 배열의 첨자를 기준으로 해서 파일의 레코드를 잠그게 된다. 10번째 배열에 썼다면 파일의 10번째 레코드를 잠그는 식이다.

잠근 레코드를 여는 시점은 생산자가 다음 레코드의 잠금을 얻얻을 때가 된다. 이런식으로 어렵지 않게 생산자와 소비자간 동기화를 이룰 수 있게 된다.

소비자는 제일 처음 시작되면 잠금 파일의 몇번째 레코드가 잠겨있는지 확인할 수 있으므로 잠긴 레코드 영역을 검사하면 가장 최근 생산자가 데이터를 쓴 공유메모리의 위치를 얻을 수 있을 것이다. 소비자는 이 위치부터 데이터를 읽어 나가면 된다.

테스트 코드

아래의 코드는 아이디어의 적응 가능 여부를 확인하기 위한 테스트용 코드다. 공유메모리 관련함수들 shmget(2), shmat(2), shmdt(2)와 레코드 잠금과 관련해서 fcntl(2)이 사용된 외에 특별한 다른 시스템콜(:12)은 사용하지 않았다.

생산자

#include <sys/ipc.h> 
#include <sys/shm.h> 
#include <fcntl.h>

#include <string.h> 
#include <unistd.h> 

#define QUEUE_SIZE 10

// 생산자와 소비자간 공유할 데이터
struct data
{
    char name[80]; 
};

struct flock lock, unlock;

int lock_open(int fd, int index)
{
  lock.l_start  = index;
  lock.l_type = F_WRLCK;
  lock.l_len = 1;
  lock.l_whence = SEEK_SET;
  return fcntl(fd, F_SETLKW, &lock);
}

int lock_close(int fd, int index)
{
  unlock.l_start  = index;
  unlock.l_type = F_UNLCK;
  unlock.l_len = 1;
  unlock.l_whence = SEEK_SET;
  return fcntl(fd, F_SETLK, &unlock);
}

void lock_init()
{
  lock.l_start  = 0;
  lock.l_type = F_WRLCK;
  lock.l_len = 1;
  lock.l_whence = SEEK_SET;
}
void unlock_init()
{
  unlock.l_start  = 0;
  unlock.l_type = F_UNLCK;
  unlock.l_len = 1;
  unlock.l_whence = SEEK_SET;
}

int main()
{
	int shmid;
	int i = 0;
	int offset = 0;

	struct data *cal_num;
	void *shared_memory;
	struct data ldata;    
	int fd;

	lock_init();
	unlock_init();

	// 잠금 파일을 생성한다.
	if ((fd = open("shm_lock", O_CREAT|O_RDWR)) < 0)
	{
		perror("file open error ");
		exit(0);
	}
	// 파일을 공유메모리 큐의 크기만큼 만든다.
	write(fd, (void *)'\0', sizeof(char)*QUEUE_SIZE);

	// 공유메모리를 생성한다.
	// 공유메모리의 크기는 QUEUE_SIZE * 원소의 크기가 된다.  
	shmid = shmget((key_t)1234, sizeof(ldata)*QUEUE_SIZE, 0666|IPC_CREAT);
	if (shmid == -1)
	{
		perror("shmget failed : ");
		exit(0);
	}

	// 공유할 메모리의 크기를 할당하고 이를 공유 메모리영역에 붙인다.  
	shared_memory = (void *)malloc(sizeof(ldata)*QUEUE_SIZE);
	shared_memory = shmat(shmid, (void *)0, 0);
	if (shared_memory == (void *)-1)
	{
		perror("shmat failed : ");
		exit(0);
	}

	while(1)
	{
		// 공유할 데이터 
		sprintf(ldata.name, "write Data : %d\n",i);
		// 이건 디버깅용 출력물
		printf("%d %s",(i==0)? QUEUE_SIZE - 1:i-1, ldata.name);

		// 레코드를 잠근다.
		if(lock_open(fd, i)< 0)
		{
			perror("lock error");
		}
		// 레코드 잠금을 얻었다면
		// 이전 레코드의 잠금을 푼다.
		if(lock_close(fd, (i==0)? QUEUE_SIZE - 1: i-1) < 0)
		{
			perror("flock error");
		}
		// 공유메모리에 데이터를 쓴다.
		memcpy((void *)shared_memory+offset, (void *)&ldata, sizeof(ldata));
		sleep(1);
		offset += sizeof(ldata);
		i++;

		// 이건 순환 큐이다. 만약 큐의 크기를 모두 채웠다면 
		// offset과 인덱스 번호 i를 초기화 한다.
		if (i == QUEUE_SIZE) {i = 0; offset = 0;}
	}
}

소비자

#include <sys/ipc.h> 
#include <sys/shm.h> 
#include <errno.h>
#include <fcntl.h>

#include <string.h> 
#include <unistd.h> 

#define QUEUE_SIZE 10

struct data
{
    char name[80]; 
};

struct flock lock, unlock;

int lock_open(int fd, int index)
{
	lock.l_start	= index; 
	lock.l_type = F_WRLCK;
	lock.l_len = 1;
	lock.l_whence = SEEK_SET;
	return fcntl(fd, F_SETLKW, &lock);
}

int lock_isopen(int fd, int index)
{
	lock.l_start	= index; 
	lock.l_type = F_WRLCK;
	lock.l_len = 1;
	lock.l_whence = SEEK_SET;
	return fcntl(fd, F_SETLK, &lock);
}

int lock_close(int fd, int index)
{
	unlock.l_start	= index; 
	unlock.l_type = F_UNLCK;
	unlock.l_len = 1;
	unlock.l_whence = SEEK_SET;
	return fcntl(fd, F_SETLK, &unlock);
}

void lock_init()
{
  lock.l_start  = 0;
  lock.l_type = F_WRLCK;
  lock.l_len = 1;
  lock.l_whence = SEEK_SET;
}
void unlock_init()
{
  unlock.l_start  = 0;
  unlock.l_type = F_UNLCK;
  unlock.l_len = 1;
  unlock.l_whence = SEEK_SET;
}

int main()
{
	int shmid;
	int i = 0;
	int offset = 0;
  int fd;

	void *shared_memory;
	struct data *ldata;    
	lock_init();
	unlock_init();

  if ((fd = open("shm_lock", O_RDWR)) < 0)
  {
  	perror("file open error ");
   	exit(0);
  }

	shmid = shmget((key_t)1234, sizeof(struct data)*QUEUE_SIZE, 0666);
	if (shmid == -1)
	{
		perror("shmget failed : ");
		exit(0);
	}

	shared_memory = (void *)malloc(sizeof(ldata)*QUEUE_SIZE);
	shared_memory = shmat(shmid, (void *)0, 0);
	if (shared_memory == (void *)-1)
	{
		perror("shmat failed : ");
		exit(0);
	}

	// 이 부분은 생산자가 가장 최근에 쓴 데이터의 인덱스를 
	// 찾아내기 위한 코드다.  
	// 잠금 파일의 레코드를 차례대로 검사하면서 잠금이 있는 부분을 검사한다. 
	// 잠금이 검사되면, 리턴한다.  
	while(1)
	{
		if(lock_isopen(fd, i)< 0)
		{
			if (errno == EAGAIN)
			{
				printf("Read index is %d %d %d\n", i, EAGAIN, errno);
				fcntl(fd, F_GETLK, &lock);  // 코드 1 
				offset = sizeof(struct data)*i; 
				break;
			}
			else
			{
				printf("Init Error\n");
				exit(0);
			}
		}
		lock_close(fd, i);
		i++;
		if (i == QUEUE_SIZE)
		{
			printf("Server Error\n");
		}
	}

	// 공유 메모리로 부터 데이터를 읽는다.
	while(1)
	{
		if (lock_open(fd, i) < 0)
		{
			perror("flock error");
		}
		ldata = (struct data *)(shared_memory+offset);
		printf("%s",ldata->name);	
		lock_close(fd, i);

		offset += sizeof(struct data);
		i++;
		if (i == QUEUE_SIZE) {i = 0;offset = 0;}	
	}
}

문제점 및 해결 방안

여러개의 소비자 참가
여러개의 소비자가 참가할 경우 각 소비자 마다 파일의 잠금을 얻게 되므로 늦게 참가하는 소비자는 앞의 소비자의 잠금 때문에 최신의 데이터를 읽지 못하는 문제가 발생할 수 있다. 이 문제의 해결은 소비자 예제 소스의 코드 1를 통해서 해결한다. GETLK는 파일 잠금에 대한 어떠한 작동도 하지 않으면서 현재 얻은 파일잠금을 해제한 프로세스의 PID를 가져올 수 있다. 우리는 해당 PID가 생산자의 PID인지 판단해서 적절한 행동을 취하면 된다. 만약 다른 소비자의 잠금이라면 fcntl을 SETLK로 호출하고, 생산자의 잠금이라면 SETLKW로 호출하면 될것이다.

생산자의 비정상 종료
생산자가 비정상 종료하는 일이 없도록 만들어 줘야 하겠지만, 프로그래머의 입장에서는 만약에 있을 지도 모르는 비장상종료를 염두에 두어야 한다. 생산자가 비정상 종료를 하게 되면, 해당 레코드에 대한 잠금이 풀려버리고, 소비자들은 잠금이 없는 상태에서 폭주(무한 순환)하게 될것이다.

이 문제의 가장 간단한 해결방법은 배열의 원소 구조체의 가장 앞에 flag를 둬서 이 값을 체크하도록 하는 것이다. 예를 들어 생산자가 99번째 배열에 데이터를 썼다면 flag를 1로 하고, 그다음 데이터의 flag를 0으로 하는 것이다. 그렇다면 만약의 경우 생잔자가 죽어서 잠금이 풀려버리더라도 소비자는 flag값을 통해서 생산자가 비정상적으로 종료했음을 인지할 수 있게 된다. flag는 int형 정도로 하면 될 것이다.

공유 메모리큐 라이브러리 제작

지금까지의 내용을 기본으로 일반적으로 사용가능한 공유 메모리 큐 라이브러리를 작성한다.

요구 사항

  • 일반적으로 사용가능
    1. 공유 메모리큐의 관리를 위한 구조체 정의(버전, 생산자 정보, 메모리큐의 크기등을 명시)
  • 모듈에서 사용가능한 큐 버퍼사용
    1. 데이터의 읽는 부분과 처리부분을 분리한다. 이를 위해서 버퍼를 구성할 수 있어야 한다.
  • 효율적인 작동
    1. 가능하면 데이터 복사는 최소한으로 이루어지도록 제한해야 한다. 예를 들어서 모듈내부에서도 데이터 읽는 부분과 처리부분의 분리를 위해서 버퍼를 사용한다고 했는데, 가능하면 포인터 복사만이 이루어지게 하는등 데이터 복사를 최소화시킬 필요가 있다.
  • 일반 구조

    다음은 여기에서 만들 공유 메모리 큐 라이브러리와 각 모듈간의 구조를 나타낸 그림이다.

    소비자(단위 모듈)은 공유 메모리 큐에서 데이터를 읽어들이는 부분과 이를 처리하는 부분을 분리시키도록 하며 분리를 위해서 중간에 버퍼를 둔다. 버퍼를 두어서 분리시키는 이유는 특별히 짧은 시간에 다량의 처리해야할 데이터가 들어오더라도 이를 버퍼에 담아 둠으로써 안정적으로 데이터를 처리할 수 있도록 하기 위함이다.

    모듈 내부 버퍼 관리 클래스 : TQueue

    일반구조를 보면 모듈 내부에서 입력부와 처리부를 분리하기 위해서 버퍼를 사용하고 있음을 알 수 있다. 이는 멀티 쓰레드 혹은 멀티 프로세스 구조로 모듈이 작성되어야 함을 의미한다. 멀티 프로세스 구조로 갈경우 고려해야할 복잡한 문제가 있기 때문에 - IPC를 사용할 경우 데이터를 모두 복사해서 보내야 하는데, 이는 최초 데이터 복사를 제거하기 위해서 공유메모리를 사용한 의미를 무색케 만든다. 자식과 부모간 공유메모리를 구성하는 방법도 있겠으나 너무 복잡하다 - 멀티 쓰레드 환경으로 구성하기로 했다.

    버퍼관리 클래스는 쓰레드 전역적인 위치에 존재하며, Input/Output을 위한 두개의 메서드를 제공한다. 큐는 템플릿을 이용해서 구현하기로 한다. Input/Output 동기화를 위해서 내부적으로 mutex와 조건변수를 사용하게 된다.

    다음은 대략적인 클래스 정의다. 다반 뼈대만 보여주는 것으로 변경될 수 있다.
    template <typename T> 
    class TQueue
    {
    	private:
    		T *container;	
    		unsigned int size;
    
    	public:
    }

    공유메모리 관리 클래스 : ShmQue

    라이브러리는 C++을 이용해서 작성하기로 했다. 아무래도 메서드와 데이터를 함께 관리할 수 있는게 큰 장점이고, 모듈내부에서 관리되는 버퍼를 위해서도 STL의 컨테이너를 이용할 수 있는등의 장점을 가지기 때문이다.

    참고문헌

    1. fcntl을 이용한 레코드 잠금
    2. 공유 메모리