// -*- C++ -*-
//
// $Id: subscriber.cpp 2013-01-25 09:36:33 $
/************************************************************************
** 开发单位:
** 开发者:Yingbo.Li
** 创建时间:2013-01-25 09:36:33
** 版本号:V0.0.0
** 描述信息:DDS Ping程序 subscriber实现
************************************************************************/
#include "DataReader.h"
#include "DemoTypeSupportImpl.h"
#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/Marked_Default_Qos.h>
#include <dds/DCPS/SubscriberImpl.h>
#include "dds/DCPS/StaticIncludes.h"
#include <ace/streams.h>
#include "ace/Get_Opt.h"
using namespace std;
using namespace DDS;
using namespace Demo;
DomainId_t g_domainId = 0;
int g_interTime = 1;
const char * g_topicName = "OpenDDS-Demo";
const char * g_typeName = "Demo-msg";
long g_sampleNum = 50;
void parse_args(int& argc, ACE_TCHAR* argv[])
{
// Command-line arguments:
//
std::string usage = "-d <domain id>\n"
" -t <inter time>\n"
" -n <topic name>\n"
" -p <topic type>\n"
" -m <samples number>\n\n";
ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("d:t:n:p:m:"));
int ich;
while ((ich = get_opts () ) != EOF)
{
switch (ich)
{
case 'd': /* specifes domainID */
g_domainId = ACE_OS::atoi(get_opts.opt_arg ());
if(g_domainId < 0)
{
cerr << "ERROR: bad domainID, exit !" <<endl;
exit(1);
}
break;
case 't': /* specifes interTime of samples */
g_interTime = ACE_OS::atoi(get_opts.opt_arg ());
if(g_interTime < 0)
{
cerr << "ERROR: bad interTime, exit !" <<endl;
exit(1);
}
break;
case 'n': /* specifes name of topic */
g_topicName = get_opts.opt_arg ();
break;
case 'p': /* specifes type of topic */
g_typeName = get_opts.opt_arg ();
break;
case 'm': /* specifes number of samples */
g_sampleNum = ACE_OS::atoi(get_opts.opt_arg ());
if(g_sampleNum < 1)
{
cerr << "ERROR: bad sampleNum, exit !" <<endl;
exit(1);
}
break;
case '?':
case 'h':
cerr<<"usage: "<<usage<<endl;
exit(1);
break;
default:
break;
}
}
}
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
try
{
SetConsoleTitle("Subscriber");
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
parse_args(argc, argv);
DDS::DomainParticipant_var participant =
dpf->create_participant(g_domainId,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil (participant.in ())) {
cerr << "create_participant failed." << endl;
return 1 ;
}
PosTypeSupportImpl* mts_servant = new PosTypeSupportImpl();
OpenDDS::DCPS::LocalObject_var safe_servant = mts_servant;
if (DDS::RETCODE_OK != mts_servant->register_type(participant.in (), g_typeName)) {
cerr << "Failed to register the MessageTypeTypeSupport." << endl;
exit(1);
}
CORBA::String_var type_name = mts_servant->get_type_name ();
DDS::Topic_var topic = participant->create_topic(g_topicName,
type_name.in(),
TOPIC_QOS_DEFAULT,
DDS::TopicListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil (topic.in ())) {
cerr << "Failed to create_topic." << endl;
exit(1);
}
// Create the subscriber
DDS::Subscriber_var sub =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
DDS::SubscriberListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil (sub.in ())) {
cerr << "Failed to create_subscriber." << endl;
exit(1);
}
// activate the listener
DDS::DataReaderListener_var listener (new ReaderListenner);
ReaderListenner* listener_servant =
dynamic_cast<ReaderListenner*>(listener.in());
if (CORBA::is_nil (listener.in ())) {
cerr << "listener is nil." << endl;
exit(1);
}
// Create the Datareaders
DDS::DataReader_var dr = sub->create_datareader(topic.in (),
DATAREADER_QOS_DEFAULT,
listener.in (),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil (dr.in ())) {
cerr << "create_datareader failed." << endl;
exit(1);
}
cout<<left<<endl;
cout<<setw(17)<<" id"
<<setw(15)<<" x"
<<setw(16)<<" y"<<endl;
cout<<setw(15)<<"------------"
<<setw(15)<<"------------"
<<setw(15)<<"----------"<<endl;
while ( listener_servant->num_reads() < g_sampleNum)
{
ACE_OS::sleep (1);
}
if (!CORBA::is_nil (participant.in ())) {
participant->delete_contained_entities();
}
if (!CORBA::is_nil (dpf.in ())) {
dpf->delete_participant(participant.in ());
}
ACE_OS::sleep(2);
cerr <<endl<<endl<< "Press any key to exit.........";
getchar();
cerr << "shutdown exit ." << endl;
TheServiceParticipant->shutdown();
}
catch (CORBA::Exception& e)
{
cerr << "SUB: Exception caught in main ():" << endl << e << endl;
return 1;
}
return 0;
}
- 1
- 2
前往页