본문 바로가기

Programming

IOCP 정리

  
IOCP ( Input Output Completion Port )
* 입출력 완료 포트

주요 개념
* 비동기 입출력 모델( Overlapped I/O )
* 여러 소켓들의 입출력을 적은 수의 쓰레드가 담당하게 하여 스레드간의 컨텍스트 스위칭을 줄이는 모델.
* 읽기와 쓰기에 대한 IO를 먼저 수행하고 완료되었을때 통지(확인) 받는다.

1. completion port 생성
HANDLE completionPort = ::CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0, 0, 0); 

2. 서버 listen 소켓 생성후 listen
SOCKET listen_sock = WSASocket( PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED );
중첩된 입출력이 가능한 non-blocking 모드의 소켓이 생성

3. accpet 쓰레드 생성( 유저 접속 처리 )
::_beginthreadex( NULL, 0, &IOCP_listener_thread, NULL, 0 , &threadID[ i ] );

4. worker 쓰레드 생성
SYSTEM_INFO si;
memset( &si, 0,  sizeof(si) );
GetSystemInfo( &si );
int event_size= si.dwNumberOfProcessors * 2           
// cpu * 2개 만큼의 쓰레드 생성

unsigned int threadID[ event_size ];
memset( threadID, 0 , event_size );

// iocp worker thread 생성
for( size_t i = 0 ; i < event_size ; i++ )
     ::_beginthreadex( NULL, 0, &WorkerThread, NULL, 0 , &threadID[ i ] );

5. 유저 접속시 accept 처리( 접속처리 대기하는 쓰레드를 따로 돌림. 유저 접속할때까지 block 상태로 있는다. )
void IOCP_listener_thread()
{
        SOCKADDR_IN sock_addr;
        int addr_size = sizeof(SOCKADDR_IN);
        SOCKET sock = WSAAccept( listen_sock, (sockaddr*)&sock_addr, &addr_size, 0 , 0 );
}

/// 유저 세션에 관련된 정보나 클래스등이 올수있다. CreateIoCompletionPort 에 completionKey 에 넣어준다.
SOCKET_INFO
{
      SOCKET client_sock; // 유저 소켓
      int user_ident;             //유저 고유 번호
};

SOCKET_INFO session;

// 접속한 유저의 소켓을 이미 만들어져 있는 completionPort 오브젝트에 등록시켜 준다.
HANDLE h = ::CreateIoCompletionPort((HANDLE)sock, completionPort , (unsigned long)&session, 0);

6.
WSARecv() 함수호출을 해준다.
 char buf[ 1024 ];     //read buffer 라고 가정!! ( test 코드임 )
 memset( buf, 0 , 1024 );
 
 WSABUF wsaBuf;    // WSARecv() 에 인자로 들어갈 WSABUF 구조체를 생성한다.
 wsaBuf.buf = buf;
 wsaBuf.len = 1024;

 DWORD recv_size = 0;
 DWORD flag = 0;

// 가르키는 포인터가  OVERLAPPED 형을 가르키면 되므로, 상속받아서 새로 Overlapped 구조체를 정의한다.
// 여기서 type_ 은 ::WSASend 나 ::WSARecv 에서 처리하게 될 IO에 대한 타입을 넘겨줌으로써,
WorkerThread 에서 io 통지가 떨어졌을때 어떤 type의 IO가 끝났는지를 식별하는 매우 중요한 역할을 한다.

struct Overlapped : public OVERLAPPED{
     size_t type_; 
};

Overlapped overlapped;
memset( &overlapped, 0, sizeof( Overlapped ) );
overlapped.type_ = RECV;
int result = ::WSARecv( sock, &wsaBuf, 1, &recv_size, &flag, (LPOVERLAPPED)&overlapped, 0 );
if( result == SOCKET_ERROR )
{
       int error = ::WSAGetLastError();
       if( error != WSA_IO_PENDING ){    
             // 클로즈 처리
       }
}

static int recv_count = 0;
recv_count ++ ;   // recv_count 를 증가시킨다

// IO 통지를 하게될 WorkerThread
unsigned int WINAPI WorkerThread( void* arg )
{
  DWORD bytesTransferred = 0;
  unsigned long compKey = 0;
  Overlapped * overlap = 0;
  SOCKET_INFO* session  = 0;

  while( IOCP_listener_thread 가 active 상태일때 ){
      int result = GetQueuedCompletionStatus( completionPort , &bytesTransferred , &compKey, \
                (LPOVERLAPPED *)overlap, INFINITE );

      session  =  (SOCKET_INFO*)compKey;
      
      if(! overlap )
           continue;

      if(! session )
           continue;

      // 에러
      if( result == FALSE ){
         if( overlap->type_ == RECV )
             recv_count -- ;
         else if ( overlap->type_ = SEND )
            send_count -- ;

            // 클로즈 처리
            continue;
      }
      // 접속이 끊어짐.
      if( bytesTransferred == 0 ){
             if( ( overlap->type_ == RECV) ||  ( overlap->type_ == SEND ) ) 
             {    
                    if( overlap->type_ == RECV )
                          recv_count -- ;
                    else if ( overlap->type_ = SEND )
                          send_count -- ;
                    // 클로즈 처리
                    continue;
             }
       }
     
     if( overlap->type_ == SEND )
     {
         send_count -- ;
         // send 큐에 있는 내용을 하나 빼서 WSASend() 함수 호출을 해준다. 아래는 간단한 배열을 이용한 테스트코드
         char send_buf[1024];
         memset( send_buf, 0 , 1024 );
         strncpy( send_buf, "jds83" , 5 );

         WSABUF wsa_sendBuf;
         wsa_sendBuf.buf = send_buf;
         wsa_sendBuf.len = 5;
        
         Overlapped overlapped;
         memset( &overlapped, 0, sizeof( Overlapped ) );
          DWORD send_size = 0;
          DWORD flag = 0;
          int result = ::WSASend( sock, &wsa_sendBuf, 1, &send_size, flag , &overlapped, 0 );
          if( result == SOCKET_ERROR )
          {
               int error = ::WSAGetLastError();
               if(  error != WSA_IO_PENDING )
               {
                     // 클로즈 처리
               }
          }   
     }
     else if( overlap->type_ == RECV )
     {
         recv_count -- ;
         char buf[ 1024 ];
         memset( buf, 0 , 1024 );
 
         WSABUF wsaBuf;
         wsaBuf.buf = buf;
         wsaBuf.len = 1024;

         DWORD recv_size = 0;
         DWORD flag = 0;
 
        Overlapped overlapped;
        memset( &overlapped, 0, sizeof( OVERLAPPED ) );
 
        int result = ::WSARecv( sock, &wsaBuf, 1, &recv_size, &flag, &overlapped, 0 );
        if( result == SOCKET_ERROR )
        {
            int error = ::WSAGetLastError();
            if(  error != WSA_IO_PENDING )
           {
                     // 클로즈 처리
            }
        }   
     }
  }
}


* 정리

처음에는  위에 1~5번 과정 순서대로 진행.
1. CreateIoCompletionPort 함수로 completionPort 오브젝트를 하나 생성한다( 마지막 인자는 0 )
2. 서버 listen 소켓 생성후 listen
3. 유저를 받을수 있는 accpet 쓰레드를 생성한다.
4. 시스템 정보를 얻어온다( 시스템에 몇개의 프로세서가 있는지 확인한다. )
5. worker 쓰레드를 cpu * 2개 만큼 생성하여 GetQueuedCompletionStatus 함수호출후 잠겨놓는다. ( Wait Thread Queue - LIFO )

유저 접속시
6. WSAAccpet 함수를 이용하여 들어오는 연결 요청을 받아들인다.
7. 유저의 정보 및 소켓 핸들을 구조체및 클래스에 저장한다.( 유저 session 정보 )
8. CreateIoCompletionPort 함수로 completionPort 오브젝트에 유저 소켓과 completionKey 인자로 session 을 등록시켜준다. completionKey는 WorkerThread 에서 유저정보를 확인하는 아주 중요한 역할을 한다.
9. WSARecv() 함수를 호출해 준다. ( 이때 Overlapped 구조체에 type 을 꼭 넘겨서 WorkerThread 에서 처리할수 있게 끔 한다.

WorkerThread 에 통지 떨어질때
* 유저의 접속종료 / 읽기,  쓰기 통지시 처리한다.
   하나의 io 가 완료되어 작업 큐에 쌓이면 쉬고있는 쓰레드 하나를 깨워서 작업을 수행한다. ( LIFO 의 큐 구조이기 때문에. 
   같은 쓰레드가 여러번 작업을 수행할 수 있다 )

*  읽기, 쓰기 카운트를 핸들링의 이유
1. 비정상적으로 종료 혹은 유저 종료시, 카운트가 0이 아니라는 것은 진행중인 io 카운트가 남아있다는 뜻이므로, 
   소켓만 -1 로 만들고, session 객체는 유지시킨다. 나중에 WorkerThread 에서 카운트가 정리된 이후.
   network 메인 쓰레드에서session 객체 정리 )

2. WSASend 시 - application 단에서 한유저가 send를 여러번 호출했을때, 연속으로 WSASend 를 부르지 않기 위해서이다.
쉽게 설명하면, 하나의 send() 호출시 WSASend 가 호출되고, send_count 를 1증가시킨다.
그 다음에 send() 가 또 호출되면, 아직 I/O 가 진행중이므로, send_q에만 넣고 빠져나온다.
하나의 Send I/O 가 끝날때까지 계속 큐에만 쌓여두고, I/O 종료 통지가 떨어졌을때, 다음 큐를 꺼내어서 다시 WSASend 를 호출한다.

읽기 성공 통지
* 읽기 성공이 떨어졌을때 recv_buffer 에서 데이터를 parsing 한다. parsing 이후, 정상적인 패킷 데이터가 왔으면, 메시지 큐에다 넣어준다. 그러면, network main 쓰레드에서 메시지 큐를 체크하여, application 컨텐츠 단으로 패킷을 내려준다.
그리고 다시 WSARecv 함수를 호출한다.

쓰기 성공 통지
* 쓰기 성공이 떨어졌을때 send 큐 front_pop 시키고, 다음 send 큐에 보낼 데이터가 있다면 WSASend 함수를 호출한다.

* 중요 함수 레퍼런스
HANDLE CreateIoCompletionPort(
    HANDLE FileHandle,                         // 연결시킬 소켓 핸들
    HANDLE ExistingCompletionPort,        // 연결시킬 completion port (처음 오브젝트 생성시 0 )
    ULONG_PTR CompletionKey,             // completion key  (처음 오브젝트 생성시 0 )
    DWORD NumberOfConcurrentThreads  // 동시 실행가능한 쓰레드 개수( 0 -> 운영체제가 cpu개수만큼 세팅 )
    );


int WSASend(
            SOCKET s,                                  //소켓 핸들
          LPWSABUF lpBuffers,                    //WSABUF 구조체 배열의 포인터
          DWORD dwBufferCount,                 //lpBuffers가 가리키는 배열의 크기
          LPDWORD lpNumberOfBytesSent    //전송된 바이트 수를 저장하기 위한 포인터
             DWORD dwFlags,
          LPWSAOVERLAPPED lpOverlapped,
             LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

* 리턴값 :

0 -> 정상적으로 send 가 끝났다. lpNumberOfBytesSent 포인터로 send 한 데이터의 size 가 저장된다. 
SOCKET_ERROR -> 에러 ( 반드시 WSAGetLastError 로 검사한다. WSA_IO_PENDING 이 아닐경우 종료 처리 )

typedef struct __WSABUF{
            u_long len;                       //버퍼의 길이
            char FAR * buf;                 //버퍼를 가리키는 포인터

} WSABUF, FAR * LPWSABUF;

int WSARecv(
SOCKET s,           
LPWSABUF lpBuffers,                    //WSABUF 구조체 배열의 포인터
DWORD dwBufferCount,                 //lpBuffers가 가리키는 배열의 크기
LPDWORD lpNumberOfBytesRecvd,// 수신된 바이트 수를 저장하기 위한 포인터
LPDWORD lpFlags,
LPWSAOVERLAPPED lpOverlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

* 리턴값 :

0 -> 정상적으로 recv 가 끝났다.  lpNumberOfBytesRecvd 포인터로 recv 한 데이터의 size 가 저장된다. 
SOCKET_ERROR -> 에러 ( 반드시 WSAGetLastError 로 검사한다. WSA_IO_PENDING 이 아닐경우 종료 처리 )

'Programming' 카테고리의 다른 글

CPPUNIT 테스트  (0) 2011.01.07
기초지식 정리  (0) 2010.12.20
SELECT 함수 정리  (0) 2010.11.30
리눅스 SIGPIPE 처리  (0) 2010.11.29
compile - error 팁  (0) 2009.08.10