Re: a *working* PostThreadMessage() implementation...?



So let's see: you write a ton of code that simulates what PostMessage does? For that
matter, why didn't you just use a simple I/O Completion Port?


On Wed, 1 Oct 2008 11:45:32 -0700 (PDT), ".rhavin grobert" <clqrq@xxxxxxxx> wrote:

On 1 Okt., 19:58, "Scott McPhillips [MVP]" <org-dot-mvps-at-scottmcp>
wrote:
".rhavin grobert" <cl...@xxxxxxxx> wrote in message

news:af8a592c-31d5-439e-b547-5e6bb649d6d8@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

in order to not reinvent the weel, has anyone already coded a
*working* PostThreadMessage implementation?
eg. an impl that has a queue of a variable, self managing size and
that - of course - doesnt drop messages just because some other thread
is in some dialog modal loop?

Is there something about the built in implementation that does not meet your
needs?  You know, I suppose, that PostThreadMessage should only be used to
post to threads that do not display any GUI.  With that limitation, the
queue is enormous and is not affected by other threads.

--
Scott McPhillips [VC++ MVP]

i want a message queue that doesnt care what the threads do currently
and who's got GUIs.
how about that aproach (not fully functional yet, QUAD = unsigned
__int64):

______________________________________________________

#define MSGBUFFER_INITIAL_SIZE 50 // queues initial slots
#define MSGBUFFER_DEFLATE_THRESHOLD 100 // if MaxSize - UsedSize is
smaller...
#define MSGBUFFER_DEFLATE_SIZE 25 // ...deflate buffer to
UsedSize + this
#define MSGBUFFER_INFLATE_SIZE 25 // increase by this if buffer
is to small


#pragma pack (push, 1)
struct SQTrdMsg {
QUAD qCommand;
QUAD qParam;
QUAD qAux;
void* pBody;
BYTE bFlags;
};
#pragma pack (pop)

class CQThread : public CWinThread
{

//* ... usual stuff here ... */


private:
// initialize message buffer and start functionality
bool _MsgBufferStart();
// free all memory used by message buffer, stop message buffering
bool _MsgBufferStop();
// add a new message to queue
bool _MsgBufferAdd(QUAD qMsg, QUAD qParam, QUAD qAux, PCVOID pBody,
BYTE bFlags = QTMF_DELETEBODY);
// add a new message to queue
bool _MsgBufferAdd(SQTrdMsg const* pMsg);
// unshift buffers oldest message from queue
bool _MsgBufferUnshift(SQTrdMsg* pTM);
// deflate queues buffer
bool _MsgBufferDeflate();
// inflate queues buffer
bool _MsgBufferInflate();
// copy contents of old queue into new buffer
void _MsgBufferRelocate(SQTrdMsg* pNewBuffer);

UINT m_nQueueLast; // last used slot in queue
UINT m_nQueueSize; // overall size of queue
UINT m_nQueueFirst; // first used slot in queue
UINT m_nQueueMax; // queues maximum allocation
SQTrdMsg* m_pQueueBuffer; // the allocated queue buffer

HANDLE m_hMessage;
CRITICAL_SECTION m_critMessage;
};

//-----------------------------------------------------------------------------
// initialize message buffer and start functionality
bool CQThread::_MsgBufferStart()
{
m_hMessage = ::CreateEvent(NULL, TRUE, FALSE, "QTreadMsgHnd");

m_nQueueLast = 0;
m_nQueueSize = 0;
m_nQueueFirst = 0;
m_pQueueBuffer = (SQTrdMsg*) malloc(sizeof(SQTrdMsg) *
MSGBUFFER_INITIAL_SIZE);

return (m_hMessage != 0);
}



//-----------------------------------------------------------------------------
// free all memory used by message buffer, stop message buffering
bool CQThread::_MsgBufferStop()
{
if (m_hMessage == 0)
return true;
EnterCriticalSection(&m_critMessage);
bool bRet = (::CloseHandle(m_hMessage) != FALSE);
m_hMessage = 0;
SQTrdMsg TM;

// remove all message-heads from queue, optionally deleting the body
while (_MsgBufferUnshift(&TM))
{
if ((TM.bFlags & QTMF_DELETEBODY) != 0)
delete TM.pBody;
}

// free queues still allocated memory
free(m_pQueueBuffer);
LeaveCriticalSection(&m_critMessage);
return bRet;
}

//-----------------------------------------------------------------------------
// add a new message to queue
bool CQThread::_MsgBufferAdd(QUAD qMsg, QUAD qParam, QUAD qAux, PCVOID
pBody,
BYTE bFlags)
{
SQTrdMsg TM;
TM.qParam = qParam;
TM.qCommand = qMsg;
TM.qAux = qAux;
TM.pBody = const_cast<void*>(pBody);
TM.bFlags = bFlags;

return _MsgBufferAdd(&TM);
}

//-----------------------------------------------------------------------------
// add a new message to queue
bool CQThread::_MsgBufferAdd(SQTrdMsg const* pMsg)
{
if (pMsg == NULL || m_hMessage == 0)
return false;

EnterCriticalSection(&m_critMessage);

// if buffer is not big enough, we have to resize it!
if (m_nQueueSize >= m_nQueueMax)
VERIFY(_MsgBufferInflate());

m_nQueueLast++;
m_nQueueSize++;

// start at beginning if next slot exceeds buffer
if (m_nQueueLast > m_nQueueMax)
m_nQueueLast = 0;

// copy message into current slot
*(m_pQueueBuffer + m_nQueueLast) = *pMsg;

LeaveCriticalSection(&m_critMessage);

//you've got message...
return (::SetEvent(m_hMessage) != FALSE);
****
SetEvent returns BOOL. NEVER compare a BOOL variable to a BOOL literal. This is a
beginner's mistake.

Note that SetEvent cannot work correctly as you have done it, there is a fundamental race
condition. It is a common beginner error to think events can be used to simulate
semaphores. Since you want a semaphore, implement a semaphore. I see no place where you
actually wait for this event. As far as I can tell, you are always going to return TRUE,
since ::SetEvent can only return FALSE if it fails because there is an actual error; it
does not tell you the previous state of the event. If the event was already set,
::SetEvent returns a nonzero value. If the even was not set, ::SetEvent returns a nonzero
value. So why do you care what the state of the event is?

Where are you waiting for the event? Or are you polling?

Overall, this seems a remarkably clumsy way to implement what is already built into the
system.

You could have used PostMessage with a hidden window, or PostQueuedCompletionStatus, and
done your enqueing in a single line, and your dequeuing in a single line. I don't know
if, other than the SetEvent, this code is correct, but it is many more lines than are
required to accomplish a fundamentally trivial task. Even my complete semaphore example
is simpler than this code.
joe
****
}

//-----------------------------------------------------------------------------
// unshift buffers oldest message from queue
bool CQThread::_MsgBufferUnshift(SQTrdMsg* pTM)
{
if (pTM == NULL || m_nQueueSize == NULL)
return false;

EnterCriticalSection(&m_critMessage);

// copy message from first slot to given buffer
*(m_pQueueBuffer + m_nQueueFirst) = *pTM;

// start at beginning if first slot exceeds buffer
if (m_nQueueFirst > m_nQueueMax)
m_nQueueFirst = 0;

m_nQueueSize--;

// if size of buffer is too big, we shrink it
if (m_nQueueSize + MSGBUFFER_DEFLATE_THRESHOLD <= m_nQueueMax)
VERIFY(_MsgBufferDeflate());

LeaveCriticalSection(&m_critMessage);
return true;
}

//-----------------------------------------------------------------------------
// deflate queues buffer
bool CQThread::_MsgBufferDeflate()
{
// allocate new buffer
UINT nStackNew = (m_nQueueSize + MSGBUFFER_DEFLATE_SIZE);
UINT nBytesNew = sizeof(SQTrdMsg) * nStackNew;
SQTrdMsg* pQueueNew = (SQTrdMsg*) malloc(nBytesNew);

if (pQueueNew == NULL)
return false;

_MsgBufferRelocate(pQueueNew);
m_nQueueMax = nStackNew;
return true;
}

//-----------------------------------------------------------------------------
// inflate queues buffer
bool CQThread::_MsgBufferInflate()
{
// allocate new buffer
UINT nStackNew = (m_nQueueMax + MSGBUFFER_INFLATE_SIZE);
UINT nBytesNew = sizeof(SQTrdMsg) * nStackNew;
SQTrdMsg* pQueueNew = (SQTrdMsg*) malloc(nBytesNew);

if (pQueueNew == NULL)
return false;

_MsgBufferRelocate(pQueueNew);
m_nQueueMax = nStackNew;
return true;
}


//-----------------------------------------------------------------------------
// copy contents of old queue into new buffer
void CQThread::_MsgBufferRelocate(SQTrdMsg* pNewBuffer)
{
ASSERT(pNewBuffer != NULL);
if (m_nQueueSize == 0)
return;

// copy contents of old buffer to new location
if (m_nQueueFirst < m_nQueueLast)
{
// currently used queue doesnt span over buffers end: normal copy
memcpy(pNewBuffer, m_pQueueBuffer + m_nQueueFirst, m_nQueueSize);
}
else
{
// currently used does span over buffers end: swap at copy
UINT nCut = m_nQueueMax - m_nQueueFirst;
memcpy(pNewBuffer, m_pQueueBuffer + m_nQueueFirst, nCut);
memcpy(pNewBuffer + nCut, m_pQueueBuffer, m_nQueueLast);
}
m_nQueueFirst = 0;
m_nQueueLast = m_nQueueSize - 1;
free(m_pQueueBuffer);
m_pQueueBuffer = pNewBuffer;
}
Joseph M. Newcomer [MVP]
email: newcomer@xxxxxxxxxxxx
Web: http://www.flounder.com
MVP Tips: http://www.flounder.com/mvp_tips.htm
.



Relevant Pages


Loading