Re: High CPU Load when running simple socket app.



You need to debug ( profile too ) you application by yourself , but any case
, be aware that you use blocking sockets with make you task hard for
implementation
Arkady

"Rob White" <rob.white@xxxxxxxxxxxxxxxxxxxxxxx> wrote in message
news:CF28DFF4-13E9-4D45-AD80-4D7232EC04DE@xxxxxxxxxxxxxxxx
> Hi,
> I?m trying to write a socket class that is a wrapper to the winsock2 api.
> I
> need the ability to limit bandwidth, send and receive data. Other than
> that
> there isn?t a lot to it.
> I?m using WSASend and WSARecv and then WSAWaitForMultipleEvents in private
> thread to each socket instance, the idea being that calls to my wrapper
> for
> send a receive will return instantly. I have an implementation, but it is
> buggy and I don?t know how to fix the bugs.
>
> In the loop I?m being told that I?m trying to perform operations when I?m
> in
> a non signalled state and also, the CPU load is huge (> 90%) for low
> network
> load (30% on a 100mb network). I know that the server is capable of
> sending
> the data a flat out no problem as I tested it with other implementations
> that
> aren?t my own.
> Since I?m new to C++ and winsock, I?m a little stuck. Has anyone got any
> ideas, my code follows.
> Thanks
> Rob
>
> // Socket.h
>
> #ifndef CSOCKET_H
> #define CSOCKET_H
> #include <string>
> #include <vector>
>
> #include <Winsock2.h>
> #pragma comment(lib, "Ws2_32.lib")
> #include "..\..\AllProjectsCommon\Thread\thread.h"
> #include "..\..\AllProjectsCommon\Thread\TsQueue.h"
>
> const int BUFFER_SIZE = 256;
> const int CMD_TIMEOUT = 90000; // 90 seconds
>
> class SockOverlapped //represents a single asynchronus operation, must
> persist for the whole operation
> {
> public:
> SockOverlapped()
> {
> overlapped.hEvent = WSACreateEvent();
> Clear();
> };
> ~SockOverlapped()
> {
> WSACloseEvent(overlapped.hEvent);
> };
>
> void Clear(int len=BUFFER_SIZE)
> {
> m_data.resize(len);
> m_dataBuf.buf = (char*)&(m_data[0]);
> m_dataBuf.len = (u_long)m_data.size();
> memset(m_dataBuf.buf,0,m_dataBuf.len);
> };
>
> OVERLAPPED overlapped;
> std::vector<unsigned char> m_data;
> WSABUF m_dataBuf;
> };
>
> class CSocket
> {
> public:
> CSocket();
> ~CSocket();
> bool Connect(std::string server, int port, int timeout=CMD_TIMEOUT);
> bool Send(std::string buffer, int timeout=CMD_TIMEOUT);
> int ReadBufferCount();
> std::string PopReadBuffer();
> bool Close();
> void ThrottleUpload(bool throttled);
> void ThrottleDownload(bool throttled);
> std::string GetLocalIP();
> int GetLocalPort();
> bool IsOpen();
> void SetName(std::string name);
> private:
> thread::Thread m_socketWorker;
> thread::Thread m_bandwidthWorker;
> static RETTYPE RETCALL SocketThreadDispatch(ARGTYPE ptr);
> void SocketWorker();
> static RETTYPE RETCALL BandwidthThreadDispatch(ARGTYPE ptr);
> void BandwidthWorker();
> SOCKET m_skt;
> TsQueue<std::string> m_writeQueue;
> TsQueue<std::string> m_readQueue;
> bool m_connected;
> bool m_throttleUpload;
> bool m_throttleDownload;
> bool m_uploadResetRequired;
> bool m_downloadResetRequired;
> bool m_sockOpen;
> std::string m_sockName;
>
> static const int SKT_READ = 0;
> static const int SKT_WRITE = 1;
> static const int SKT_ADMIN = 2;
> static const int SKT_EVENT_COUNT = 3;
>
> SockOverlapped m_readOvl;
> SockOverlapped m_writeOvl;
> SockOverlapped m_sysOvl;
> };
> #endif
>
>
> //Socket.cpp
>
> #include "Socket.h"
> #include "debug.h"
> #include "..\..\AllProjectsCommon\Thread\thread.h"
>
> #include <iostream>
>
> CSocket::CSocket()
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> m_skt = INVALID_SOCKET;
> m_connected = false;
> m_throttleUpload = false;
> m_throttleDownload = false;
> m_uploadResetRequired = false;
> m_downloadResetRequired = false;
> m_sockOpen = false;
> m_socketWorker.Create(CSocket::SocketThreadDispatch, this);
> m_bandwidthWorker.Create(CSocket::BandwidthThreadDispatch, this);
>
> m_skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
> if(m_skt == INVALID_SOCKET)
> {
> theDebugger.write(DEBUG_COMMS_FATAL, "Unable to create socket: %s",
> FormatError(WSAGetLastError()).c_str());
> }
> m_sockName = "<No Name>";
> }
>
> CSocket::~CSocket()
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> Close();
> WSASetEvent(m_sysOvl.overlapped.hEvent);
> m_bandwidthWorker.WaitForExit(1000);
> m_socketWorker.WaitForExit(1000);
> }
>
> bool CSocket::Connect(std::string server, int port, int timeout)
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> if(m_skt == INVALID_SOCKET)
> {
> return false;
> }
> hostent *host = gethostbyname(server.c_str());
> if(host == NULL)
> {
> theDebugger.write(DEBUG_COMMS_FATAL, "Unable to resolve remote host %s: %s
> %s", server.c_str(), FormatError(WSAGetLastError()).c_str(),
> m_sockName.c_str());
> return false;
> }
>
> sockaddr_in addr;
> addr.sin_family = AF_INET;
> addr.sin_addr = *((struct in_addr *)host->h_addr_list[0]);
> addr.sin_port = htons(port);
>
> if(connect(m_skt, (SOCKADDR*) &addr, sizeof(addr)) == SOCKET_ERROR)
> {
> theDebugger.write(DEBUG_COMMS_FATAL, "Unable to connect to %s: %s %s",
> server.c_str(), FormatError(WSAGetLastError()).c_str(),
> m_sockName.c_str());
> return false;
> }
> m_connected = true;
> m_sockOpen = true;
> return true;
> }
>
> bool CSocket::Send(std::string buffer, int timeout)
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> if(m_skt == INVALID_SOCKET)
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "Can't send on a closed socket");
> return false;
> }
> m_writeQueue.push(buffer);
>
> if(!m_throttleUpload)
> {
> if(WSASetEvent(m_writeOvl.overlapped.hEvent))
> {
> return true;
> }
> else
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "Unable to SetEvent " + m_sockName);
> return false;
> }
> }
> else
> {
> m_uploadResetRequired = true;
> return true;
> }
> }
>
> int CSocket::ReadBufferCount()
> {
> //Don't log this call, it'll just flood the log
> //theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> return m_readQueue.size();
> }
>
> std::string CSocket::PopReadBuffer()
> {
> //Don't log this call, it'll just flood the log
> //theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> std::string retVal;
> if(m_readQueue.size() > 0)
> {
> retVal = m_readQueue.pop();
> }
> return retVal;
> }
>
> bool CSocket::Close()
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> m_connected = false;
> if(closesocket(m_skt) == SOCKET_ERROR)
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()).c_str(), m_sockName.c_str());
> return false;
> }
> theDebugger.write(DEBUG_GENERAL_INFO, "Closed Socket: " + m_sockName);
> return true;
> }
>
> void CSocket::ThrottleUpload(bool throttled)
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> m_throttleUpload = throttled;
> }
>
> void CSocket::ThrottleDownload(bool throttled)
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> m_throttleDownload = throttled;
> }
>
> RETTYPE RETCALL CSocket::SocketThreadDispatch(ARGTYPE ptr)
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> ((CSocket*)ptr)->SocketWorker();;
> return RETVAL;
> }
>
> void CSocket::SocketWorker()
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> WSAEVENT EventArray[SKT_EVENT_COUNT];
> EventArray[SKT_READ] = m_readOvl.overlapped.hEvent;
> EventArray[SKT_WRITE] = m_writeOvl.overlapped.hEvent;
> EventArray[SKT_ADMIN] = m_sysOvl.overlapped.hEvent;
>
> while(m_socketWorker.exitFlag() == false)
> {
> if(m_skt != INVALID_SOCKET && m_connected == true)
> {
> //socket has been created - post first read
> DWORD readBytes = 0;
> DWORD readFlags = 0;
> if(WSARecv(m_skt, &m_readOvl.m_dataBuf, 1, &readBytes, &readFlags,
> (OVERLAPPED*)&m_readOvl, NULL) == SOCKET_ERROR)
> {
> int error = WSAGetLastError();
> if(error != WSA_IO_PENDING)
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(error).c_str(), m_sockName.c_str());
> }
> }
> //now while socket persists - service reads/writes
> while( m_skt != INVALID_SOCKET && m_connected == true)
> {
> switch(WSAWaitForMultipleEvents(SKT_EVENT_COUNT, EventArray, FALSE,
> WSA_INFINITE, FALSE))
> {
> case SKT_READ:
> {
> //the bandwithworker hasn't reset the event yet
> //and we are just looking at a Ghost event.
> if(m_downloadResetRequired)
> {
> Sleep(1);//helps to bring the CPU use down
> continue;
> }
> DWORD byteCount = -1;
> DWORD flags = NULL;
> if(!WSAGetOverlappedResult(m_skt, (OVERLAPPED*)&m_readOvl, &byteCount,
> FALSE, &flags))
> {
> int error = WSAGetLastError();
> if(error != WSA_IO_PENDING)
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(error).c_str(), m_sockName.c_str());
> Close();
> continue;
> }
> }
> if(byteCount == 0)//if byteCount is 0 then the socket has closed
> {
> m_sockOpen = false;
> Close();
> }
> //store this bit of data: m_readOvl.m_dataBuf into the read queue
> m_readQueue.push(std::string(m_readOvl.m_dataBuf.buf, byteCount));
>
> ///re-post the recv request
> if(!m_throttleDownload)
> {
> if(!WSAResetEvent(m_readOvl.overlapped.hEvent))
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()).c_str(), m_sockName.c_str());
> Close();
> continue;
> }
> if(m_sockOpen)
> {
> if(WSARecv(m_skt, &m_readOvl.m_dataBuf, 1, &readBytes, &readFlags,
> (OVERLAPPED*)&m_readOvl, NULL) == SOCKET_ERROR)
> {
> int error = WSAGetLastError();
> if(error != WSA_IO_PENDING)
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()).c_str(), m_sockName.c_str());
> Close();
> continue;
> }
> }
> }
> m_downloadResetRequired = false;
> }
> else
> {
> m_downloadResetRequired = true;
> }
> }
> break;
> case SKT_WRITE:
> {
> //the bandwithworker hasn't reset the event yet
> //and we are just looking at a Ghost event.
> if(m_uploadResetRequired)
> continue;
> if(m_writeQueue.size() > 0) //This should always evaluate to true
> {
> std::string buffer = m_writeQueue.pop();
>
> DWORD sentBytes = 0;
> DWORD flags = 0;
> m_writeOvl.m_dataBuf.buf = (char*)buffer.data();
> m_writeOvl.m_dataBuf.len = buffer.size();
> if(WSASend(m_skt, &m_writeOvl.m_dataBuf, 1, &sentBytes, flags,
> (OVERLAPPED*)&m_writeOvl, NULL) == SOCKET_ERROR)
> {
> int error = WSAGetLastError();
> if(error != WSA_IO_PENDING)
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()).c_str(), m_sockName.c_str());
> Close();
> }
> }
> m_writeOvl.Clear();
> if(!WSAResetEvent(m_writeOvl.overlapped.hEvent))
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()), m_sockName.c_str());
> Close();
> continue;
> }
> m_uploadResetRequired = m_throttleUpload;
> }
> }
> break;
> case SKT_ADMIN:
> {
> theDebugger.write(DEBUG_GENERAL_INFO, "Shutdown requested");
> Close();
> if(!WSAResetEvent(m_sysOvl.overlapped.hEvent))
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()).c_str(), m_sockName.c_str());
> continue;
> }
> }
> break;
> default:
> {
> theDebugger.write(DEBUG_CRICTICAL_INFO, "ERROR received default on
> socket: " + m_sockName);
> Close();//since we should never recieve this error we'll just log it
> and shutdown
> }
> break;
> }
> }
> }
> else
> Sleep(50); //socket is not there, so just wait a bit
> }
>
> Close();
> theDebugger.write(DEBUG_GENERAL_INFO, "Socket Worker Completed: %s",
> m_sockName.c_str());
> }
>
> RETTYPE RETCALL CSocket::BandwidthThreadDispatch(ARGTYPE ptr)
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> ((CSocket*)ptr)->BandwidthWorker();;
> return RETVAL;
> }
>
> void CSocket::BandwidthWorker()
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> while(m_bandwidthWorker.exitFlag() == false)
> {
> if( m_skt != INVALID_SOCKET && m_connected == true)
> {
> if(m_throttleDownload && m_downloadResetRequired)
> {
> if(WSAResetEvent(m_readOvl.overlapped.hEvent))
> {
> m_downloadResetRequired = false;
> DWORD readBytes = 0;
> DWORD readFlags = 0;
> if(WSARecv(m_skt, &m_readOvl.m_dataBuf, 1, &readBytes, &readFlags,
> (OVERLAPPED*)&m_readOvl, NULL) == SOCKET_ERROR)
> {
> int error = WSAGetLastError();
> if(error != WSA_IO_PENDING)
> {
> m_downloadResetRequired = true;
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()).c_str(), m_sockName.c_str());
> Close();
> }
> }
> else
> {
> theDebugger.write(DEBUG_GENERAL_INFO, "Download Reset: %s",
> m_sockName.c_str());
> }
> }
> else
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "Could not ResetEvent: %s %s",
> FormatError(GetLastError()).c_str(), m_sockName.c_str());
> Close();
> }
> }
> if(m_throttleUpload && m_uploadResetRequired)
> {
> m_uploadResetRequired = false;
> if(!WSASetEvent(m_writeOvl.overlapped.hEvent))
> {
> m_uploadResetRequired = true;
> theDebugger.write(DEBUG_COMMS_ERROR, "Could not ResetEvent: %s %s",
> FormatError(GetLastError()).c_str(), m_sockName.c_str());
> Close();
> }
> else
> {
> theDebugger.write(DEBUG_GENERAL_INFO, "Upload Reset: %s",
> m_sockName.c_str());
> }
> }
> }
> //TODO:this value needs fettling to give sensible bandwidth
> Sleep(2000);
> }
> theDebugger.write(DEBUG_GENERAL_INFO, "BandwidthWorker Terminated: %s",
> m_sockName.c_str());
> }
>
> void CSocket::SetName(std::string name)
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> m_sockName = name;
> }
>
> std::string CSocket::GetLocalIP()
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> struct sockaddr_in local_addr;
> int len = sizeof(local_addr);
> if(getsockname(m_skt, (struct sockaddr*) &local_addr, &len) ==
> SOCKET_ERROR)
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()).c_str(), m_sockName.c_str());
> return "";
> }
> return std::string(inet_ntoa(local_addr.sin_addr));
> }
>
> int CSocket::GetLocalPort()
> {
> theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> struct sockaddr_in local_addr;
> int len = sizeof(local_addr);
> if(getsockname(m_skt, (struct sockaddr*) &local_addr, &len) ==
> SOCKET_ERROR)
> {
> theDebugger.write(DEBUG_COMMS_ERROR, "%s %s",
> FormatError(WSAGetLastError()).c_str(), m_sockName.c_str());
> return -1;
> }
> return (int)ntohs(local_addr.sin_port);
> }
>
> bool CSocket::IsOpen()
> {
> //Don't log this call, it'll just flood the log
> //theDebugger.write(DEBUG_GENERAL_FUNCS, "%s", __FUNCSIG__);
> return m_sockOpen;
> }
> --
> ------------------------------------
> .Net and Embedded XP Development Engineer
> ECM Systems Ltd


.



Relevant Pages

  • Re: stderr, stdout, stdin bei Win32-App umleiten?
    ... DWORD WINAPI ReadFromPipe; ... VOID ErrorExit; ... HANDLE m_hChildStdoutRd; ... BOOL fSuccess; ...
    (microsoft.public.de.vc)
  • Re: Creating OAL for PB 5.0
    ... Thanks for the reply Valter but the first error I encounter with is ... DWORD OEMARMCacheMode ... void OEMDataAbortHandler ... BOOL OEMGetExtensionDRAM ...
    (microsoft.public.windowsce.platbuilder)
  • lineOpen always returns LINEERR_INVALPRIVSELECT
    ... bool QTInit; ... virtual void QTOnEvent(DWORD dwMessage, DWORD P1, DWORD P2, DWORD P3) ... void static CALLBACK _lineCBF(DWORD hDevice, DWORD dwMsg, DWORD ... implementation of the CQTapi class. ...
    (microsoft.public.win32.programmer.tapi)
  • COM driver WaitCommEvent
    ... work with WaitCommEvent function, what are the functions I need to implement ... COMDLL_API BOOL COM_Deinit ... COMDLL_API DWORD COM_Init ... COMDLL_API void COM_PowerDown ...
    (microsoft.public.pocketpc.developer)
  • Re: Getting notification for dynamically added controls
    ... BOOL SetMatrix; ... void SetSliderRangeMinMax; ... int m_nScrollPos; ... BOOL UpdateDataPart(int k, BOOL bSaveAndValidate = TRUE); ...
    (microsoft.public.vc.mfc)