#include "stdafx.h"
#include <streams.h>
#include "basemux.h"
#include "mediasample_ex.h"
#pragma warning(disable: 4018) // signed/unsigned mismatch
//-----------------------------------------------------------------------------
//
// CMuxInterPacket class
//
//-----------------------------------------------------------------------------
CMuxInterPacket::CMuxInterPacket():
m_rtStart_10mhz(0),
m_rtStop_10mhz(0),
m_has_time(false),
m_sync_point(false),
m_stream_no(-1),
m_is_eos(false),
m_data(NULL),
m_size(0)
{
m_timestamp_ex = false;
m_pts = 0;
m_dts = 0;
m_clock = 0;
}
CMuxInterPacket::~CMuxInterPacket()
{
if (m_data) {
free(m_data);
m_data = NULL;
}
}
void CMuxInterPacket::LoadFrom(IMediaSample *sample)
{
// kick any data we might have held
if (m_data) {
free(m_data);
m_data = NULL;
}
m_is_eos = false;
// get the timestamps
REFERENCE_TIME start, stop;
HRESULT hr = sample->GetTime(&start, &stop);
if (FAILED(hr)) {
m_rtStart_10mhz = 0;
m_rtStop_10mhz = 0;
m_has_time = false;
} else {
if (hr == VFW_S_NO_STOP_TIME) {
stop = start;
}
m_rtStart_10mhz = start;
m_rtStop_10mhz = stop;
m_has_time = true;
}
CComPtr<IMediaSampleEx> msex;
hr = sample->QueryInterface(IID_IMediaSampleEx, (void**)&msex);
if (SUCCEEDED(hr)) {
hr = msex->GetTimeEx(&m_pts, &m_dts, &m_clock);
if (SUCCEEDED(hr)) {
m_timestamp_ex = true;
}
msex = NULL;
} else {
m_timestamp_ex = false;
}
long lsize = sample->GetActualDataLength();
if (lsize > 0) {
// now read the data
m_data = (BYTE*)malloc(lsize);
if (!m_data) {
// not enough memory !!!
m_size = 0;
return ;
}
BYTE *buf;
sample->GetPointer(&buf);
memcpy(m_data, buf, lsize);
m_size = lsize;
} else {
// support for VFW - delay samples
m_data = NULL;
m_size = 0;
}
if (sample->IsSyncPoint() == NOERROR) {
m_sync_point = true;
} else {
m_sync_point = false;
}
}
void CMuxInterPacket::Clear()
{
m_rtStart_10mhz = 0;
m_rtStop_10mhz = 0,
m_has_time = false;
m_sync_point = false;
m_stream_no = -1;
m_is_eos = false;
if (m_data)
{
free(m_data);
m_data = NULL;
}
m_size = 0;
m_timestamp_ex = false;
m_pts = 0;
m_dts = 0;
m_clock = 0;
}
//-----------------------------------------------------------------------------
//
// CMuxInterStream class
//
//-----------------------------------------------------------------------------
CMuxInterStream::CMuxInterStream():
m_index(0),
m_start(0),
m_offset(0),
m_isinterleaved(true),
m_active(false),
m_size(MAXPACKETS),
m_ev_can_write(TRUE)
{
InterPacket::Init(MAXPACKETS);
m_ev_can_write.Set();
m_data = NULL;
}
CMuxInterStream::~CMuxInterStream()
{
Flush();
}
void CMuxInterStream::Flush()
{
// delete all queued samples
while (m_queue.size() > 0) {
CMuxInterPacket *packet = m_queue.front();
m_queue.pop();
InterPacket::Release(packet);
}
// since we haven't got anything - the stream is writable again
m_ev_can_write.Set();
}
int CMuxInterStream::Write(CMuxInterPacket *packet)
{
// append the new packet into the queue
m_queue.push(packet);
if (m_queue.size() >= m_size)
m_ev_can_write.Reset();
return 0;
}
int CMuxInterStream::Read(CMuxInterPacket **packet)
{
if (m_queue.size() == 0)
return -1;
// load the first packet
ASSERT(packet);
CMuxInterPacket *p = m_queue.front();
// if it's an EOS packet we won't remove it from the queue
if (p->m_is_eos)
return -3;
m_queue.pop();
*packet = p;
if (m_queue.size() < m_size)
m_ev_can_write.Set();
return 0;
}
int CMuxInterStream::GetStartTime(REFERENCE_TIME *time)
{
if (m_queue.size() == 0)
return -1;
// get the first timestamp
// Note: This makes only sense if the incomming packets carry timestamps
CMuxInterPacket *packet = m_queue.front();
ASSERT(time);
*time = packet->m_rtStart_10mhz;
return 0;
}
int CMuxInterStream::GetPacketCount()
{
return (int)m_queue.size();
}
bool CMuxInterStream::IsEOS()
{
// no packets = no EOS
if (m_queue.size() == 0)
return false;
CMuxInterPacket *packet = m_queue.front();
if (packet->m_is_eos)
return true;
return false;
}
//-----------------------------------------------------------------------------
//
// CMuxInterleaver class
//
//-----------------------------------------------------------------------------
CMuxInterleaver::CMuxInterleaver():
m_ev_abort(TRUE),
m_ev_can_read(TRUE),
m_align(0)
{
m_offset = 0;
m_last_gt = GetTickCount();
m_ev_can_read.Reset();
m_ev_abort.Reset();
}
CMuxInterleaver::~CMuxInterleaver()
{
Clear();
}
void CMuxInterleaver::Clear()
{
// delete all streams
for (int i = 0; i < m_streams.size(); i++) {
CMuxInterStream *stream = m_streams[i];
delete stream;
}
m_streams.clear();
}
int CMuxInterleaver::AddStream(CMuxInterStream **stream)
{
CMuxInterStream *st = new CMuxInterStream();
m_streams.push_back(st);
st->m_index = (int)m_streams.size() - 1;
*stream = st;
//for timestamp alignment
m_align |= (1 << st->m_index);
return 0;
}
void CMuxInterleaver::SetAbort(BOOL abort)
{
if (abort) {
m_ev_abort.Set();
} else {
m_ev_abort.Reset();
}
}
void CMuxInterleaver::FlushStream(int stream_no)
{
CAutoLock lock(&m_lock_queue);
if (stream_no < 0 || stream_no >= m_streams.size())
return ;
m_streams[stream_no]->Flush();
// update the can_read event
bool can_read = false;
for (int i=0; i < m_streams.size(); i++) {
CMuxInterStream *stream = m_streams[i];
if (stream->m_queue.size() > 0) {
can_read = true;
}
}
if (!can_read)
m_ev_can_read.Reset();
}
void CMuxInterleaver::Flush()
{
CAutoLock lock(&m_lock_queue);
// flush everything
for (int i=0; i < m_streams.size(); i++) {
m_streams[i]->Flush();
}
m_ev_can_read.Reset();
}
int CMuxInterleaver::Write(CMuxInterPacket *packet)
{
CAutoLock lock(&m_lock_queue);
int streamno = packet->m_stream_no;
if (streamno < 0 || streamno >= m_streams.size()) {
return -1;
}
// append it into the queue
CMuxInterStream *stream = m_streams[streamno];
int ret = stream->Write(packet);
if (ret < 0)
return ret;
// mark as ready to read
m_ev_can_read.Set();
return ret;
}
void CMuxInterleaver::ComputeOffset()
{
REFERENCE_TIME base = m_streams[0]->m_start;
m_streams[0]->m_offset = 0;
for (int i = 1; i < m_streams.size(); i++)
{
CMuxInterStream *stream = m_streams[i];
stream->m_offset = stream->m_start - base;
}
}
int CMuxInterleaver::Read(CMuxInterPacket **packet, DWORD timeout)
{
/*
Synchronnous read operation with timeout.
Result values:
0: success
-1: timeout
-2: abort
*/
LARGE_INTEGER litemp;
LONGLONG QPartStart, QPartEnd;
double fMinus, fFreq, fTime;
QueryPerformanceFrequency(&litemp);
fFreq = (double)litemp.QuadPart;
QueryPerformanceCounter(&litemp);
QPartStart = litemp.QuadPart;
__int64 end_time = GetTime() + timeout;
__int64 start_time = 0, lowest_time = 0;
int lowest_time_index = -1;
int ret;
bool can_deliver;
bool is_eos;
HANDLE events[] = { m_ev_can_read, m_ev_abort };
DWORD dw;
// let's wait and see what happens
dw = WaitForMultipleObjects(2, events, FALSE, (end_time - GetTime()));
if (dw == WAIT_TIMEOUT || GetTime() > end_time)
{
return -1;
}
if (m_ev_abort.Check())
return -2;
QueryPerformanceCounter(&litemp);
QPartEnd = litemp.QuadPart;
fMinus = (double)(QPartEnd - QPartStart);
fTime = fMinus*1000 / fFreq;
DebugMsg(_T("wait packet used time [%0.3f]."), fTime);
{
CAutoLock lock(&m_lock_queue);
/*
Scan through all the streams and find the one with
lowest start time.
*/
lowest_time_index = -1;
can_del