Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Contents

Overlapped I/O 모델

소켓은 기본적으로 봉쇄/동기로 만들어진다. 이 모델은 데이터 입출력 부분에서 봉쇄(blocked)된다는 문제점을 가진다.

이 모델로는 하나의 쓰레드에서 두 개 이상의 소켓을 다루기가 힘들다. 이 모델을 바꾸지 않고 두개 이상의 소켓을 처리하려면 "멀티 쓰레드" 기술을 함께 사용하는 수 밖에 없다.

윈도 운영체제(:12)는 멀티 태스킹을 지원한다. 이는 입출력:::모델(:12)과는 상관없이, 커널은 여러 소켓으로 부터의 입력을 처리한다는 의미다. 이기능을 봉쇄/동기 입출력 모델의 한계로 쓰지 못하는 것일 뿐이다.

이 문제는 입출력 모델을 "비동기 / 봉쇄" 혹은 "비동기 / 비봉쇄"을 쓰는 것으로 해결할 수 있다. 비동기 봉쇄 모델을 사용하는 기술이 select(:2)함수를 이용한 입출력:::다중화(:12)이다. Overlapped I/O 모델은 "비동기 / 비봉쇄 모델"의 응용 모델이다. Overlapped에는 non-blocking이 포함되어 있다. 즉 Overlapped 는 non-block + (비동기적) 완료 통보 라고 보면 된다.

Overlapped I/O는 다음과 같이 묘사할 수 있다. (정확한 묘산는 아니다.)

앞서 언급했듯이 커널은 여러 개의 소켓의 데이터를 동시에 다룰 수 있다. 그러다 보면 시간을 축으로 데이터가 중첩이 되는 구간이 생길 수도 있을 것이다. 이제 데이터의 입력이 완료되면, 해당 소켓에서 이벤트를 통지한다. 이벤트를 받은 프로세스는 이벤트 객체를 이용해서 데이터를 처리하거나 혹은 Completion Routine를 이용해서 데이터를 처리할 수 있다. 중첩해서 들어올 수 있는 데이터의 처리를 커널에 맡기게 되므로 유저 모드에서 블럭되는 일 없이 여러 개의 소켓을 처리할 수 있다.

Overlapped I/O의 중첩을 허용하는 이런 특징 때문에, "중첩 입출력 모델"이라고 부르기도 한다. 생소한 기술 같지만, 원리에 있어서는 리눅스(:12)의 입출력:::다중화(:12)나 리얼:::타임:::시그널(:12)과 큰 차이가 없음을 알 수 있다. 다음은 비 동기 / 비 봉쇄 입출력모델을 따르는 프로그램의 흐름을 보여준다.
  1. 읽기 함수를 호출하면, 커널은 데이터를 읽기 위한 초기화 작업을 하고 데이터를 기다린다. 읽기 함수는 바로 반환한다.
  2. 이제 프로그램은 다른 작업을 진행한다.
  3. 데이터가 입력되면, 이벤트가 발생한다.
  4. 프로그램은 콜백함수 혹은 이벤트 객체를 만들어서 데이터를 읽어서 처리한다.
하지만 리눅스의 입출력 다중화나 리얼 타임 시그널등 비동기 입출력 기술과 차이점이 있다. 이들 리눅스 모델은 데이터 입력이 완료된 시점에서 데이터를 읽는 방식이 아니다. 소켓으로 데이터가 입력되는 시점에서 커널 데이터를 유저 데이터로 복사한다. 이말은 커널로 부터 데이터의 복사가 끝날 때까지 몇 번이나 입출력 함수를 호출 할 수도 있음을 의미한다. 여러 번 유저 모드와 커널 모드간의 변환이 있으므로 성능이 감소할 수 밖에 없다.

윈도의 Overlapped I/O모델은 데이터 입출력이 완료된 시점에서 이벤트를 발생한다. 이로 인해 얻을 수 있는 이점은 다음과 같다.
  1. 모드 변환이 발생하지 않는다. (여러 번의 데이터 복사가 발생하지 않는다.)
  2. 데이터 입출력이 진행되는 동안에도 다른 일을 할 수 있다.

Overlapped 소켓 프로그램

socket 함수로 만든 소켓은 기본적으로 중첩 특성을 가진다. 그러나 다른 BSD:::소켓(:12)입출력 함수들로는 중첩 소켓을 다루지 못한다. 그러므로 BSD:::소켓(:12) 입출력 함수대신 WSASend(:4100), WSARecv(:4100), WSASendTo(:4100), WSARecvFrom(:4100) 등의 함수를 이용해야 한다. WSASocket(:4100) 함수로 소켓을 만들 경우에는 WSA_FLAG_OVERLAPPED를 지정해줘야 한다.

중첩 소켓을 사용은 WSAsend(:4100) 함수의 lpOverlapped 매개 변수와 lpCompletionRoutine 매개 변수로 이루어진다.
  1. lpOverlapped : WSAOVERLAPPED(:4300)구조체의 포인터를 넘긴다. 만약 NULL 이라면 중첩 소켓 특성을 사용하지 않는다.
  2. lpCompletionRoutine : 입력이 완료 되었을 때 호출할 완료 루틴을 설정한다. 만약 NULL 이라면, 이벤트 객체 기반으로 처리한다.

이벤트 객체 기반 처리

lpCompletionRoutine이 NULL일 경우, WSAOVERLAPPED 구조체의 이벤트 객체 핸들러로 입력을 처리한다. 이때 accept(:4100)함수를 어떻게 처리해야 할 것이낙 하는 문제가 있다. accept 함수는 중첩 소켓 특성을 이용하지 못한다. 또한 WSAAccept 함수도 중첩 소켓 특성을 이용하지 못한다. accept는 단 1 바이트만 들어와도 요청이 완료되므로, 굳이 중첩 특성을 이용할 필요가 없기 때문이다. acceptEx 함수가 있기는 하지만, 그다지 쓰고 싶지 않다.

다음 두 가지 방법 중 하나를 선택하면 될 것 같다.
  1. 멀티 쓰레드(:12) 프로그래밍
accept는 메인 쓰레드에서 처리하고, 입출력은 워커 쓰레드에서 처리하도록 한다. 두 개의 쓰레드가 필요하다.
  1. WSAEventSelect 모델을 이용해서 처리한다.
이벤트 객체 기반 처리 예제. 멀티 쓰레드를 이용해서 accept 처리를 분리 했다.
#include <winsock2.h>
#include <windows.h>
#include <stdio.h>
 
#define PORT 5150
#define DATA_BUFSIZE 8192

// 소켓 정보를 저장하기 위한 구조체 
typedef struct _SOCKET_INFORMATION {
   CHAR Buffer[DATA_BUFSIZE];
   WSABUF DataBuf;
   SOCKET Socket;
   WSAOVERLAPPED Overlapped;
   DWORD BytesSEND;
   DWORD BytesRECV;
} SOCKET_INFORMATION, * LPSOCKET_INFORMATION;

// 연결 소켓 (inbound connection)으로 입출력 처리를 위한 쓰레드 함수
DWORD WINAPI ProcessIO(LPVOID lpParameter);
 
DWORD EventTotal = 0;

// 이벤트 저장을 위한 구조체 배열과
// 소켓 정보를 저장하기 위한 구조체 배열
WSAEVENT EventArray[WSA_MAXIMUM_WAIT_EVENTS];
LPSOCKET_INFORMATION SocketArray[WSA_MAXIMUM_WAIT_EVENTS];

// 임계 영역 설정을 위한 크리티컬 섹션 객체
// EventArray, SocketArray 연산 영역을 임계 영역으로 지정하기 위해서 사용한다.
CRITICAL_SECTION CriticalSection;
 
int main(int argc, char **argv)
{
   WSADATA wsaData;
   SOCKET ListenSocket, AcceptSocket;
   SOCKADDR_IN InternetAddr;
   DWORD Flags;
   DWORD ThreadId;
   DWORD RecvBytes;
 
   InitializeCriticalSection(&CriticalSection);
 
   if (WSAStartup((2,2),&wsaData) != 0)
   {
      printf("WSAStartup() failed with error %d\n", WSAGetLastError());
      WSACleanup();
      return 1;
   }
   else
     printf("WSAStartup() looks nice!\n");
 
   if ((ListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
   {
      printf("Failed to get a socket %d\n", WSAGetLastError());
      return 1;
   }
   else
      printf("WSASocket() is OK lol!\n");
 
   InternetAddr.sin_family = AF_INET;
   InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
   InternetAddr.sin_port = htons(PORT);
 
   if (bind(ListenSocket, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr)) == SOCKET_ERROR)
   {
      printf("bind() failed with error %d\n", WSAGetLastError());
      return 1;
   }
   else
      printf("YOu see, bind() is working!\n");
 
   if (listen(ListenSocket, 5))
   {
      printf("listen() failed with error %d\n", WSAGetLastError());
      return 1;
   }
   else
      printf("listen() is OK maa...\n");
 
   // 중첩(:12) 소켓 생성 
   if ((AcceptSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
   {
      printf("Failed to get a socket %d\n", WSAGetLastError());
      return 1;
   }
   else
      printf("WSASocket() looks OK!\n");
 
   if ((EventArray[0] = WSACreateEvent()) == WSA_INVALID_EVENT)
   {
      printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
      return 1;
   }
   else
      printf("WSACreateEvent() is OK!\n");
 
   // 중첩 입출력을 다루기 위한 쓰레드(:12) 생성 
   if ( CreateThread(:4200)(NULL, 0, ProcessIO, NULL, 0, &ThreadId) == NULL )
   {
      printf("CreateThread() failed with error %d\n", GetLastError());
      return 1;
   }
   else
      printf("Nothing to say, CreateThread() is OK!\n");
 
   EventTotal = 1;

   // 메인 쓰레드는 accept 처리만 담당한다. 
   while(TRUE)
   {
      // 클라이언트 연결 요청을 가져온다.
      if ((AcceptSocket = accept(ListenSocket, NULL, NULL)) == INVALID_SOCKET)
      {
          printf("accept() failed with error %d\n", WSAGetLastError());
          return 1;
      }
      else
          printf("accept() is OK!\n");

      // 성공적으로 연결 소켓을 가져오면, 이벤트 정보 배열과 소켓 정보 배열에 정보를 추가하고
      // 중첩 입출력 연산을 수행한다.
      EnterCriticalSection(&CriticalSection);

      if ((SocketArray[EventTotal] = (LPSOCKET_INFORMATION) GlobalAlloc(GPTR, sizeof(SOCKET_INFORMATION))) == NULL)
      {
         printf("GlobalAlloc() failed with error %d\n", GetLastError());
         return 1;
      }
      else
         printf("GlobalAlloc() for LPSOCKET_INFORMATION is pretty fine!\n");
 
      // 연결 소켓 정보를 소켓 정보 구조체 배열에 추가한다. 
      SocketArray[EventTotal]->Socket = AcceptSocket;
      ZeroMemory(&(SocketArray[EventTotal]->Overlapped), sizeof(OVERLAPPED));
      SocketArray[EventTotal]->BytesSEND = 0;
      SocketArray[EventTotal]->BytesRECV = 0;
      SocketArray[EventTotal]->DataBuf.len = DATA_BUFSIZE;
      SocketArray[EventTotal]->DataBuf.buf = SocketArray[EventTotal]->Buffer;
 
      if ((SocketArray[EventTotal]->Overlapped.hEvent = EventArray[EventTotal] = WSACreateEvent()) == WSA_INVALID_EVENT)
      {
         printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
         return 1;
      }
      else
         printf("WSACreateEvent() is OK!\n");
 
      // WSARecv 함수를 호출해서 입력 작업 초기화를 한다. 
      Flags = 0;
      if (WSARecv(SocketArray[EventTotal]->Socket,
         &(SocketArray[EventTotal]->DataBuf), 1, &RecvBytes, &Flags, &(SocketArray[EventTotal]->Overlapped), NULL) == SOCKET_ERROR)
      {
         if (WSAGetLastError() != ERROR_IO_PENDING)
         {
            printf("WSARecv() failed with error %d\n", WSAGetLastError());
            return 1;
         }
      }
      else
            printf("WSARecv() should be working!\n");
 
      EventTotal++;
      LeaveCriticalSection(&CriticalSection);
 
      // 워커 쓰레드에 이벤트를 한번 보내본다.
      if (WSASetEvent(EventArray[0]) == FALSE)
      {
         printf("WSASetEvent() failed with error %d\n", WSAGetLastError());
         return 1;
      }
      else
         printf("Don't worry, WSASetEvent() is OK!\n");
   }
}
 
DWORD WINAPI ProcessIO(LPVOID lpParameter)
{
   DWORD Index;
   DWORD Flags;
   LPSOCKET_INFORMATION SI;
   DWORD BytesTransferred;
   DWORD i;
   DWORD RecvBytes, SendBytes;
 
   while(TRUE)
   {
      // wait 함수로 이벤트를 기다린다.
      if ((Index = WSAWaitForMultipleEvents(EventTotal, EventArray, FALSE,  WSA_INFINITE, FALSE)) == WSA_WAIT_FAILED)
      {
         printf("WSAWaitForMultipleEvents() failed %d\n", WSAGetLastError());
         return 0;
      }
      else
         printf("WSAWaitForMultipleEvents() is OK!\n");
 
      // 0 번 이벤트 객체에서의 이벤트는 듣기 소켓에 관련된 것이다. 
      // 무시하고 넘어간다.
      if ((Index - WSA_WAIT_EVENT_0) == 0)
      {
         WSAResetEvent(EventArray[0]);
         continue;
      }
 
      SI = SocketArray[Index - WSA_WAIT_EVENT_0];
      WSAResetEvent(EventArray[Index - WSA_WAIT_EVENT_0]);

      // 중첩연산 결과를 확인한다. 
      // 에러라면 소켓을 닫고, 소켓 정보 배열과 이벤트 정보 배열을 재 배치한다. 
      if (WSAGetOverlappedResult(SI->Socket, &(SI->Overlapped), &BytesTransferred, FALSE, &Flags) == FALSE || BytesTransferred == 0)
      {
         printf("Closing socket %d\n", SI->Socket);
 
         if (closesocket(SI->Socket) == SOCKET_ERROR)
         {
            printf("closesocket() failed with error %d\n", WSAGetLastError());
         }
         else
            printf("closesocket() is OK!\n");
 
         GlobalFree(SI);
         WSACloseEvent(EventArray[Index - WSA_WAIT_EVENT_0]);
         // Cleanup SocketArray and EventArray by removing the socket event handle
         // and socket information structure if they are not at the end of the arrays
         EnterCriticalSection(&CriticalSection);
 
         if ((Index - WSA_WAIT_EVENT_0) + 1 != EventTotal)
            for (i = Index - WSA_WAIT_EVENT_0; i < EventTotal; i++)
            {
               EventArray[i] = EventArray[i + 1];
               SocketArray[i] = SocketArray[i + 1];
            }
 
         EventTotal--;
         LeaveCriticalSection(&CriticalSection);
         continue;
      }


      if (SI->BytesRECV == 0)
      {
         SI->BytesRECV = BytesTransferred;
         SI->BytesSEND = 0;
      }
      else
      {
         SI->BytesSEND += BytesTransferred;
      }
 
      if (SI->BytesRECV > SI->BytesSEND)
      {
         // Post another WSASend() request.
         // Since WSASend() is not guaranteed to send all of the bytes requested,
         // continue posting WSASend() calls until all received bytes are sent
         ZeroMemory(&(SI->Overlapped), sizeof(WSAOVERLAPPED));
         SI->Overlapped.hEvent = EventArray[Index - WSA_WAIT_EVENT_0];
 
         SI->DataBuf.buf = SI->Buffer + SI->BytesSEND;
         SI->DataBuf.len = SI->BytesRECV - SI->BytesSEND;
 
         if (WSASend(SI->Socket, &(SI->DataBuf), 1, &SendBytes, 0, &(SI->Overlapped), NULL) == SOCKET_ERROR)
         {
            if (WSAGetLastError() != ERROR_IO_PENDING)
            {
               printf("WSASend() failed with error %d\n", WSAGetLastError());
               return 0;
            }
         }
         else
              printf("WSASend() is OK!\n");
      }
      else
      {
         SI->BytesRECV = 0;
         // Now that there are no more bytes to send post another WSARecv() request
         Flags = 0;
         ZeroMemory(&(SI->Overlapped), sizeof(WSAOVERLAPPED));
         SI->Overlapped.hEvent = EventArray[Index - WSA_WAIT_EVENT_0];
 
         SI->DataBuf.len = DATA_BUFSIZE;
         SI->DataBuf.buf = SI->Buffer;
 
         if (WSARecv(SI->Socket, &(SI->DataBuf), 1, &RecvBytes, &Flags, &(SI->Overlapped), NULL) == SOCKET_ERROR)
         {
            if (WSAGetLastError() != ERROR_IO_PENDING)
            {
               printf("WSARecv() failed with error %d\n", WSAGetLastError());
               return 0;
            }
         }
         else
             printf("WSARecv() is OK!\n");
      }
   }
}

콜백 함수 호출

콜백 함수 호출 기반의 간단한 에코 서버 프로그램

#include <winsock2.h>
#include <windows.h>
#include <stdio.h>

#define MAXLINE 1024

// 중첩 정보를 비롯한 소켓 정보를 저장하기 위한 구조체
struct SOCKETINFO
{
	WSAOVERLAPPED overlapped;
	SOCKET fd;
	WSABUF dataBuf;
	char buf[MAXLINE];
	int readn;
	int writen;
};

// 입출력 완료시 호출할 콜백 함수
void CALLBACK WorkerRoutine(DWORD Error, DWORD readn, LPWSAOVERLAPPED overlapped, DWORD lnFlags);

int main(int argc, char **argv)
{
	WSADATA wsaData;
	SOCKET listen_fd, client_fd;
	struct sockaddr_in addr;
	struct SOCKETINFO *sInfo;
	int flags;
	int readn;

	if(argc != 2)
	{
		printf("Usage : %s [port number]\n", argv[0]);
		return 1;
	}
	if(WSAStartup( MAKEWORD(2,2), &wsaData) != 0 )
	{
		return 1;
	}
	if( (listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET )
	{
		return 1;
	}

	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = htonl(INADDR_ANY);
	addr.sin_port = htons(atoi(argv[1]));

	if( bind(:4100)(listen_fd, (struct sockaddr *)&addr, sizeof(addr)) == SOCKET_ERROR )
	{
		return 1;
	}
	if( listen(listen_fd, 5) == SOCKET_ERROR )
	{
		return 1;
	}

	while(1)
	{
                // accept 함수는 중첩 입출력을 사용하지 않았다.
                // 중첩 입출력 모델을 사용해서 별도의 완료 루틴을 호출하기 원한다면, accetEx 함수를 호출해야 한다.
		if( (client_fd = accept(listen_fd, NULL, NULL)) == INVALID_SOCKET)
		{
			printf("Accept Error\n");
			return 1;
		}

		sInfo = (void *)malloc(sizeof(struct SOCKETINFO));
		memset(:2)((void *)sInfo, 0x00, sizeof(struct SOCKETINFO));

		sInfo->fd = client_fd;
		sInfo->dataBuf.len = MAXLINE;
		sInfo->dataBuf.buf = sInfo->buf;

		flags = 0;

                // 입력이 완료되면 WorkerRoutine를 호출한다.
		if( WSARecv(:4100)(sInfo->fd, &sInfo->dataBuf, 1, &readn, &flags, &(sInfo->overlapped), WorkerRoutine) == SOCKET_ERROR )
		{
			if( WSAGetLastError() != WSA_IO_PENDING )
			{
				printf("wsarecv error %d\n", WSAGetLastError());
			}
		}
	}
	return 1;
}

void CALLBACK WorkerRoutine(DWORD Error, DWORD transfern, LPWSAOVERLAPPED overlapped, DWORD lnFlags)
{
	struct SOCKETINFO *si;

	int readn, writen;
	int flags = 0;
	si = (struct SOCKETINFO *)overlapped;

	if(transfern == 0)
	{
		printf("socket close\n");
		closesocket(si->fd);
		free(si);
		return ;
	}
	
        // readn이 0이라는 얘기는 현재 이 콜백 함수는 입력완료가 되어서 호출 되었음을 의미한다.
	if(si->readn == 0)
	{
		si->readn = transfern;
		si->writen = 0;
	}
	else
	{
		si->writen += transfern;
	}

        // 만약 쓴 데이터가 읽은 데이터 보다 작다면, 읽은 데이터를 모두 전송하지 않았음을 의미하다.
        // 읽은 데이터를 모두 전송할 때 까지 WSASend를 계속 호출 한다.
	if(si->readn > si->writen)
	{
		memset(&(si->overlapped), 0x00, sizeof(WSAOVERLAPPED));
		si->dataBuf.buf = si->buf + si->writen;
		si->dataBuf.len = si->readn - si->writen;
		
		if( WSASend(:4100)(si->fd, &(si->dataBuf), 1, &writen, 0, &(si->overlapped), WorkerRoutine) == SOCKET_ERROR )
		{
			if(WSAGetLastError() != WSA_IO_PENDING)
			{
				printf("WSASend Error\n");
			}
		}
	}
	else
	{
		si->readn = 0;
		flags = 0;
		memset(:2)(&(si->overlapped), 0x00, sizeof(WSAOVERLAPPED));
		si->dataBuf.len = MAXLINE;
		si->dataBuf.buf = si->buf;
		if( WSARecv(:4100)(si->fd, &si->dataBuf, 1, &readn, &flags, &(si->overlapped), WorkerRoutine) == SOCKET_ERROR)
		{
			if( WSAGetLastError() != WSA_IO_PENDING )
			{
				printf("wsarecv error %d\n", WSAGetLastError());
			}
		}
	}
	return ;
}