HRESULT EnableAsyncReceive(
RECEIVE_CONTEXT* prc
)
{
// Validate the input parameter.
if (prc == NULL)
{
return MQ_ERROR_INVALID_PARAMETER;
}
// Initiate a receive operation.
return MQReceiveMessage(
prc->hQueue, // Queue handle
INFINITE, // Time to wait for a message
MQ_ACTION_RECEIVE,
prc->pmsgprops, // Pointer to an MQMSGPROPS structure
&prc->ov, // OVERLAPPED structure
NULL, // No callback function
NULL, // No cursor
MQ_NO_TRANSACTION // Not in a transaction
);
}
DWORD WINAPI CompletionPortThread(LPVOID lParam)
{
HANDLE hPort = (HANDLE)lParam;
DWORD dwNoOfBytes = 0;
ULONG_PTR ulKey = 0;
OVERLAPPED* pov = NULL;
HRESULT hr = MQ_OK;
for ( ; ; )
{
// Wait for a completion notification.
pov = NULL;
BOOL fSuccess = GetQueuedCompletionStatus(
hPort, // Completion port handle
&dwNoOfBytes, // Bytes transferred
&ulKey,
&pov, // OVERLAPPED structure
INFINITE // Notification time-out interval
);
//
// Add code to handle specific errors when fSuccess is FALSE
// and provide a way to exit the loop.
//
if (pov == NULL)
{
// An unrecoverable error occurred in the completion port. Wait for the next notification.
continue;
}
// Get the base address of the RECEIVE_CONTEXT structure
// containing the OVERLAPPED structure received.
RECEIVE_CONTEXT* prc = CONTAINING_RECORD(pov, RECEIVE_CONTEXT, ov);
// Get the final status (last error) for the message received.
hr = MQGetOverlappedResult(&prc->ov);
if (FAILED(hr))
{
return hr;
}
// Retrieve the application-specific data in the message.
long lAppSpecific = prc->pmsgprops->aPropVar[0].lVal;
wprintf(L"Message received. Thread ID: %x. Application-specific data: %ld\n",
GetCurrentThreadId(),
lAppSpecific
);
// Initiate the next message receive operation.
hr = EnableAsyncReceive(prc);
if (FAILED(hr))
{
return hr;
}
}
return hr;
}
HRESULT AsyncReadCP(
WCHAR * wszFormatName,
DWORD dwSleepTime
)
{
int cThread = 0, cReceive = 0;
HANDLE hPort = NULL;
QUEUEHANDLE hQueue = NULL;
const int NUMBEROFTHREADS = 2;
const int NUMBEROFRECEIVES = 4;
HRESULT hr = MQ_OK;
// Validate the input parameter and open the queue.
if (wszFormatName == NULL)
{
return MQ_ERROR_INVALID_PARAMETER;
}
hr = MQOpenQueue(
wszFormatName,
MQ_RECEIVE_ACCESS,
MQ_DENY_NONE,
&hQueue
);
if (FAILED(hr))
{
wprintf(L"MQOpenQueue failed. Error: 0x%X.\n, hr");
return hr;
}
// Create a new completion port.
hPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, // Do not associate a queue handle.
NULL, // New port
0, // No completion key
NUMBEROFTHREADS // Number of concurrently executed threads
);
if (hPort == NULL)
{
MQCloseQueue(hQueue);
return GetLastError();
{
// Create worker threads for the completion port.
for(cThread = 0; cThread < NUMBEROFTHREADS; cThread++)
{
DWORD dwThreadId = 0;
HANDLE hThread = NULL;
hThread = CreateThread(
NULL, // Default thread security descriptor
0, // Default stack size
CompletionPortThread, // Start routine
hPort, // Start routine parameter
0, // Run immediately
&dwThreadId // Thread ID
);
if (hThread == NULL)
{
//
// Add code to terminate any other threads created in this function.
//
MQCloseQueue(hQueue);
CloseHandle(hPort);
return GetLastError();
}
CloseHandle(hThread);
}
// Associate the queue with the completion port already created.
hPort = CreateIoCompletionPort(
hQueue, // Queue handle
hPort, // Port handle
0, // No completion key
NUMBEROFTHREADS
);
if (hPort == NULL)
{
MQCloseQueue(hQueue);
return GetLastError();
}
// Initiate several overlapped receive operations.
RECEIVE_CONTEXT* prc[NUMBEROFRECEIVES];
for (cReceive; cReceive < NUMBEROFRECEIVES; cReceive++)
{
// Allocate and initialize a receive context.
prc[cReceive] = new RECEIVE_CONTEXT;
if (prc[cReceive] == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
memset(prc[cReceive], 0, sizeof(RECEIVE_CONTEXT));
prc[cReceive]->hQueue = hQueue;
prc[cReceive]->pmsgprops = new MQMSGPROPS;
if (prc[cReceive]->pmsgprops == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
const int NUMBEROFPROPERTIES = 1;
prc[cReceive]->pmsgprops->aPropID = new MSGPROPID[NUMBEROFPROPERTIES];
if (prc[cReceive]->pmsgprops->aPropID == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
prc[cReceive]->pmsgprops->aPropVar = new MQPROPVARIANT[NUMBEROFPROPERTIES];
if (prc[cReceive]->pmsgprops->aPropVar == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
prc[cReceive]->pmsgprops->aStatus = new HRESULT[NUMBEROFPROPERTIES];
if (prc[cReceive]->pmsgprops->aStatus == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
DWORD cPropId = 0;
prc[cReceive]->pmsgprops->aPropID[cPropId] = PROPID_M_APPSPECIFIC;
prc[cReceive]->pmsgprops->aPropVar[cPropId].vt = VT_UI4;
cPropId++;
prc[cReceive]->pmsgprops->cProp = cPropId;
hr = EnableAsyncReceive(prc);
if (FAILED(hr))
{
break;
}
}
if (SUCCEEDED(hr))
{
Sleep(dwSleepTime)
}
// Free the memory allocated for the RECEIVE_CONTEXT structures.
for ( ; cReceive >= 0; cReceive--)
{
delete prc[cReceive]->pmsgprops->aStatus;
delete prc[cReceive]->pmsgprops->aPropVar;
delete prc[cReceive]->pmsgprops->aPropID;
delete prc[cReceive]->pmsgprops;
delete prc[cReceive];
}
// Close the queue and free the port.
MQCloseQueue(hQueue);
CloseHandle(hPort);
return hr;
}