博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
IO处理线程
阅读量:6970 次
发布时间:2019-06-27

本文共 8147 字,大约阅读时间需要 27 分钟。

客户IO处理,是在工作线程,_WorkerThreadProc中完成的

函数,在完成端口上调用GetQueuedCompletionStatus函数等待IO完成,并调用自定义函数HandleIO来处理IO,具体代码如下:

DOWRD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam){    #ifdef _DEBUG        ::OutputDebugString("Worker Thread startup...\n");    #endif //_DEBUG    CIOCPServer *pThis = (CIOCPServer*)lpParam;    CIOCPBuffer *pBuffer;    DWORD dwKey;    DWORD dwTrans;    LPOVERLAPPED lpol;    while(TRUE)    {        //在关联到此完成端口的所有套接字上等待IO完成        BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,&dwTrans,(LPDWORD)&dwKey,(LPOVERLAPPED)&lpol,WSA_INFINITE);        if(dwTrans == -1)        {            #ifdef _DEBUG                ::OutputDebugString("Worker Thread startup...\n");            #endif //_DEBUG                            ::ExitThread(0);        }                pBuffer=CONTAINING_RECORD(lpol,CIOCPBuffer,ol);        int nError = NO_ERROR;        if(!bOK)        {            SOCKET s;            if(pBuffer->nOperation == OP_ACCEPT)            {                s=pThis->m_sListen;            }            else            {                if(dwKey == 0)                    break;                s=((CIOCPContext*)dwKey)->s;            }            DWORD dwFlags = 0;            if(!::WSAGetOverlappedResult(s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags))            {                nError = ::WSAGetLastError();            }        }        pThis->HandleIO(dwKey,pBuffer,dwTrans,nError);    }    #ifdef _DEBUG        ::OutputDebugString("Worker Thread out...\n");    #endif //_DEBUG    return 0;}

SendText成员函数用于在连接上发送数据,执行时先申请一个缓冲区对象,把用户将要发送的数据复制到里面,然后调用postSend成员函数投递这个缓冲区对象

BOOL CIOCPServer::SendText(CIOCPContext *pContext,char *pszText,int nLen){    CIOCPBuffer *pBuffer = AllocateBuffer(nLen);    if(pBuffer != NULL)    {        memcpy(pBuffer->buff,pszText,nLen);        return PostSend(pContext,pBuffer);    }    return FALSE;}

 

下面的HandleIO函数是关键,

处理完成的IO,投递新的IO请求,释放完成的缓冲区对象,关闭客户上下文对象

下面是主要的实现代码:

void CIOCPServer::HandleIO(DWORD dwKey,CIOCPBuffer *pBuffer,DOWRD dwTrans,int nError){    CIOCPContext *pContext = (CIOCPContext*)dwKey;    #ifdef _DEBUG        ::OutputDebugString("HandleIO startup..\n");    #endif //_DEBUG    //减少套接字未决IO计数    if(pContext!=NULL)    {        ::EnterCriticalSection(&pContext->Lock);        if(pBuffer->nOperation == OP_READ)            pContext->nOutstandingRecv--;        else if(pBuffer->nOperation == OP_WRITE)            pContext->nOutstandingSend--;        ::LeaveCriticalSection(&pContext->Lock);        //检查套接字是否已经打开        if(pContext->bClosing)        {            #ifdef _DEBUG                ::OutputDebugString("HandleIO startup..\n");            #endif //_DEBUG                        if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)            {                ReleaseContext(pContext);            }            //释放已关闭套接字的未决IO            ReleaseBuffer(pBuffer);            return;        }    }    else    {        RemovePendingAccept(pBuffer);    }    //检查套接字上发生的错误,然后直接关闭套接字    if(nError!=NO_ERROR)    {        if(pBuffer->nOperation != OP_ACCEPT)        {            OnConnectionError(pContext,pBuffer,nError);            CloseAConnection(pContext);            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)            {                ReleaseContext(pContext);            }            #ifdef _DEBUG                ::OutputDebugString("HandleIO startup..\n");            #endif //_DEBUG        }        else//在监听套接字上发生错误        {            if(pBuffer->sClient != INVALID_SOCKET)            {                ::closesocket(pBuffer->sClient);                pBuffer->sClient = INVALID_SOCKET;            }            #ifdef _DEBUG                ::OutputDebugString("HandleIO startup..\n");            #endif //_DEBUG        }        ReleaseBuffer(pBuffer);        return;    }    //开始处理    if(pBuffer->nOperation == OP_ACCEPT)    {        if(dwTrans == 0)        {            #ifdef _DEBUG                ::OutputDebugString("HandleIO startup..\n");            #endif //_DEBUG            if(pBuffer->sClient != INVALID_SOCKET)            {                ::closesocket(pBuffer->sClient);                pBuffer->sClient = INVALID_SOCKET;            }        }        else        {            //为接收新连接的申请客户上下文对象            CIOCPContext *pClient = AllocateContext(pBuffer->sClient);            if(pClient != NULL)            {                if(AddAConnection(pCliebt))                {                    //取得用户地址                    int nLocalLen,nRmoteLen;                    m_lpfnGetAcceptExSockaddrs(                            pBuffer->buff,                            pBuffer->nLen-((sizeof(sockaddr_in)+16)*2),                            sizeof(sockaddr_in)+16,                            sizeof(sockaddr_in)+16,                            (SOCKADDR **)&pLocalAddr,                            &nLocalLen,                            (SOCKADDR **)&pRemoteAddr,                            &nRmoteLen);                    memcpy(&pClient->addrLocal,pLocalAddr,nLocalLen);                    memcpy(&pClient->addrRemote,pRemoteAddr,nRmoteLen);                    //关联新连接到完成端口对象                    ::CreateIoCompletionPort((HANDLE)pClient->s,m_hCompletion,(DWORD)pClient,0);                    //通知用户                    pBuffer->nLen = dwTrans;                    OnConnectionEstablished(pClient,pBuffer);                    //向新连接投递Read请求                    for(int i=0;i<5;i++)                    {                        CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);                        if(p != NULL)                        {                            if(!PostRecv(pClient,p))                            {                                CloseAConnection(pClient);                                break;                            }                        }                    }                }                else                {                    CloseAConnection(pClient);                    ReleaseContext(pClient);                }            }            else            {                //资源不足,关闭与客户的连接即可                ::closesocket(pBuffer->sClient);                pBuffer->sClient = INVALID_SOCKET;            }        }        //Accept请求完成,释放IO缓冲区        ReleaseBuffer(pBuffer);        //通知监听线程继续再投递一个Accept请求        ::InterlockedDecrement(&m_nRepostCount);        ::SetEvent(m_hRepostEvent);    }    else if(pBuffer->nOperation == OP_READ)    {        if(dwTrans == 0)        {            //先通知用户            pBuffer->nLen = 0;            OnConnectionClosing(pContext,pBuffer);            //再关闭连接            CloseAConnection(pContext);            //释放客户上下文和缓冲区对象            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)            {                ReleaseContext(pContext);            }            ReleaseBuffer(pBuffer);        }        else        {            pBuffer->nLen = dwTrans;            //按照IO投递的顺序读取接收到的数据            CIOCPBuffer *p = GetNextReadBuffer(pContext,pBuffer);            while(p!=NULL)            {                OnReadCompleted(pContext,p);                //增加要读的序列号的值                ::InterlockedDecrement((LONG*)pContext->nCurrentReadSequence);                //释放IO                ReleaseBuffer(p);                p = GetNextReadBuffer(pContext,NULL);            }            //继续投递一个新的请求            pBuffer = AllocateBuffer(BUFFER_SIZE);            if(pBuffer==NULL || !PostRecv(pContext,pBuffer))            {                CloseAConnection(pContext);            }        }    }    else if(pBuffer->nOperation == OP_WRITE)    {        if(dwTrans == 0)        {            //先通知用户            pBuffer->nLen = 0;            OnConnectionClosing(pContext,pBuffer);            //再关闭连接            CloseAConnection(pContext);            //释放客户上下文和缓冲区对象            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)            {                ReleaseContext(pContext);            }            ReleaseBuffer(pBuffer);        }        else        {            //写操作完成,通知用户            pBuffer->nLen = dwTrans;            OnWriteCompleted(pContext,pBuffer);            //释放SendText函数申请缓冲区            ReleaseBuffer(pBuffer);        }    }}

转载地址:http://yrasl.baihongyu.com/

你可能感兴趣的文章
修改VirtualBox虚拟机默认存储路径及虚拟机迁移方法
查看>>
hdu 3440 House Man
查看>>
Error && MFC
查看>>
深入理解Servlet3.0异步请求
查看>>
Lua C API 遍历 table
查看>>
Harbor镜像迁移
查看>>
实验六 在应用程序中播放音频和视频
查看>>
数组A - 财务管理
查看>>
linux getch()实现
查看>>
[宽度优先搜索] FZU-2150 Fire Game
查看>>
group by
查看>>
简单投票系统学到的一些东西
查看>>
简明 Vim 练级攻略(转载)
查看>>
ubuntu12.04下virtualbox访问usb
查看>>
android打电话,接电话,挂电话过程
查看>>
【LeanEAP.NET】精益企业应用平台实战----表格批量编辑与Undo/Redo功能实现
查看>>
从Excel中读取数据(python-xlrd)
查看>>
iframe显示高度自适应 兼容多浏览器
查看>>
Struts2 技术全总结 (正在更新)
查看>>
站在产品经理的角度看问题
查看>>