#include "PostgresqlPool.h"
PostgresqlPool* PostgresqlPool::m_instance=new PostgresqlPool("hostaddr = '127.0.0.1' dbname = 'eps3' user = 'eps' password = 'eps' connect_timeout = '1'","eps_systemconfig");
PostgresqlPool::PostgresqlPool(const PostgresqlPool&)
{
}
PostgresqlPool& PostgresqlPool::operator =(const PostgresqlPool& post)
{
return *this;
}
PostgresqlPool* PostgresqlPool::getInstance()
{
return m_instance;
}
PostgresqlPool::PostgresqlPool(string m_connStr,string testtable,bool needslat)
{
InitializeCriticalSection(&m_section);
InitializeCriticalSection(&m_readMutex);
this->m_mainConnected=false;
this->m_slaveConnected=false;
this->m_hasSlave=needslat;
ifstream ifs("psqlconn.ini",std::ios::in);
if(ifs)
{
char line[1024*2]={0};
if(this->m_hasSlave)//有从数据库
{
ifs.getline(line,sizeof(line)); //第一行为主
string tmp(line);
this->m_mainconnStr=tmp;
ifs.getline(line,sizeof(line)); //第二行为从
string tmp2(line);
this->m_slaveconnStr=tmp2;
}
else//无从数据库
{
ifs.getline(line,sizeof(line));
string tmp(line);
this->m_mainconnStr=tmp;
}
}
else
{
this->m_mainconnStr=m_connStr;
this->m_hasSlave=false;
}
ifs.close();
this->m_testTable=testtable;
this->setInitNums(5);
this->setIncreatmentNums(2);
this->setMaxNums(20);
bool success=createConnections(this->getInitNums(),this->m_mainConnections,false);//初始化5个连接对象
this->m_mainConnected=success;
CreateThread(NULL,NULL,CheckMainDb,(void*)this,NULL,NULL);
if(this->m_hasSlave)
{
bool slavesuccess=createConnections(this->getInitNums(),this->m_slaveConnections,true);//创建从数据库对象
this->m_slaveConnected=slavesuccess;
//CreateThread(NULL,NULL,CheckSlaveDb,(void*)this,NULL,NULL);
}
}
PostgresqlPool::PostgresqlPool(string dburl,string dbname,string dbusername,string password)
{
this->m_dbUrl=dburl;
this->m_dbName=dbname;
this->m_dbUserName=dbusername;
this->m_dbPassWord=password;
}
DWORD WINAPI PostgresqlPool::CheckMainDb(LPVOID lpParam)
{
PostgresqlPool* pthis=(PostgresqlPool*)lpParam;
while(1)
{
if(pthis->m_mainConnected)
{
EnterCriticalSection(&pthis->m_readMutex);
pthis->readLocal();
LeaveCriticalSection(&pthis->m_readMutex);
for(vector<string>::iterator it=pthis->m_offSqls.begin();it!=pthis->m_offSqls.end();++it)
{
pthis->SQL_EXEC_DDL((*it).c_str());
}
pthis->m_offSqls.clear();
}
else
{
//尝试连接主数据库
PGconn* conn=pthis->newConnection();
if(conn!=NULL) //主数据库能连上
{
pthis->closeConnection(conn);
pthis->m_mainConnected=true;
}
}
Sleep(3000);
}
return 0;
}
PostgresqlPool::~PostgresqlPool(void)
{
}
void PostgresqlPool::setInitNums(int initnums)
{
this->m_initNums=initnums;
}
int PostgresqlPool::getInitNums()
{
return this->m_initNums;
}
void PostgresqlPool::setIncreatmentNums(int increatmentnums)
{
this->m_increatmentNums=increatmentnums;
}
int PostgresqlPool::getIncreatmentNums()
{
return this->m_increatmentNums;
}
void PostgresqlPool::setMaxNums(int maxnums)
{
this->m_MaxNums=maxnums;
}
int PostgresqlPool::getMaxNums()
{
return this->m_MaxNums;
}
bool PostgresqlPool::createConnections(int numconnects,vector<PooledConnection*>& connections,bool isSlave)
{
bool success=false;
for(vector<PooledConnection*>::iterator it=connections.begin();it!=connections.end();++it)
{
if((*it)->getConnection()==NULL)
{
PGconn* conn=newConnection(isSlave);
if(conn==NULL)
{
return false;//只要有一个创建失败就返回
}
connections.push_back(new PooledConnection(conn));
success=true;
}
}
if(success)//如果含的有空的 并且重新创建了对象 那么就可以返回了
{
return success;
}
for(int i=0;i<numconnects;++i)//没有空的情况 那么就说明没有可用的连接了 需要增加
{
//如果最大值设置为0或者为负数就表示没有限制,可以无限创建连接数量
if(this->m_MaxNums>0&&(int)connections.size()>=this->m_MaxNums)
{
return false;
}
PGconn* conn=newConnection(isSlave);
if(conn==NULL)
{
return false;
}
connections.push_back(new PooledConnection(conn));
}
return true;
}
PGconn* PostgresqlPool::newConnection(bool isSlave)
{
PGconn* conn=NULL;
if(isSlave)
{
conn= PQconnectdb(this->m_slaveconnStr.c_str());
}
else
{
conn= PQconnectdb(this->m_mainconnStr.c_str());
}
if (PQstatus(conn) != CONNECTION_OK)
{
PQfinish(conn);
conn = NULL;
return conn;
}
setEncode(conn,"gbk");
return conn;
}
PGconn* PostgresqlPool::getConnection(bool &isSlave)
{
if(this->m_mainConnections.size()==0&&this->m_slaveConnections.size()==0)//连接池还么创建
{
return NULL;
}
if(!this->m_mainConnected)
{
if(this->m_slaveConnected)
{
isSlave=true;
return getFreeConnection(this->m_slaveConnections,true);
}
else
{
return NULL;
}
}
PGconn* conn=getFreeConnection(this->m_mainConnections,false);
if(!this->m_mainConnected)//如果主连接再次又段开了 则使用从连接
{
if(this->m_slaveConnected)
{
isSlave=true;
return getFreeConnection(this->m_slaveConnections,true);
}
else
{
return NULL;
}
}
return conn;
}
PGconn* PostgresqlPool::getFreeConnection(vector<PooledConnection*>& connections,bool isSlave)
{
EnterCriticalSection(&m_section);
PGconn* conn=NULL;
conn = findFreeConnection(connections,isSlave);
if(conn==NULL)//目前连接池中没有可用的连接对象 或者断网了
{
//创建一些连接对象
if(!createConnections(this->m_increatmentNums,connections,isSlave))
{
LeaveCriticalSection(&m_section);
if(isSlave)
{
this->m_slaveConnected=false;
}
else
{
this->m_mainConnected=false;
}
return NULL;
}
//重新从连接池中查找可用连接对象
conn=findFreeConnection(connections,isSlave);
if(conn==NULL)//如果还是没有可用的
{
LeaveCriticalSection(&m_section);
if(isSlave)
{
this->m_slaveConnected=false;
}
else
{
this->m_mainConnected=false;
}
return NULL;//则返回失败
}
}
LeaveCriticalSection(&m_section);
return conn;//返回NULL时一定是数据库断网等情况
}
PGconn* PostgresqlPool::findFreeConnection(vector<PooledConnection*>& connections,bool isSlave)
{
PGconn* conn=NULL;
for(vector<PooledConnection*>::iterator it=connections.begin();it!=connections.end();++it)
{
if((*it)->getConnection()==NULL)
{
continue;
}
if(!(*it)->getIsBusy())
{
(*it)->setIsBusy(true);
conn=(*it)->getConnection();
if(!testConnection(conn))
{
closeConnection(conn);
conn=newConnection(isSlave);
(*it)->setConnection(conn);
if(conn==NULL)
{
continue;
}
}
break;
}
}
return conn;
}
bool PostgresqlPool::testConnection(PGconn*connection)
{
string sql="select count(*) from "+this->m_testTable;
PGresult*ret = PQexec(connection, sql.c_str());
if (PQresultStatus(ret) != PGRES_TUPLES_OK)
{
PQclear(ret);
ret = NULL;
return false;
}
return true;
}
void PostgresqlPool::returnConnection(PGconn*connecton,bool isSlave)
{
if(isSlave)
{
if(this->m_slaveConnections.size()==0)
{
return;
}
for(vector<PooledConnection*>::iterator it=this->m_slaveConnections.begin();it!=this->m_slaveConnections.end();++it)
{
if(connecton==(*it)->getConnection())//首地址比较
{
(*it)->setIsBusy(false);
break;
}
}
}
else
{
if(this->m_mainConnections.size()==0)
{
return;
}
for(vector<PooledConnection*>::iterator it=this->m_mainConnections.begin();it!=this->m_mainConnections.end();++it)
{
if(connecton==(*it)->getConnection())//首地址比较
{
(*it)->setIsBusy(false);
break;
}
}
}
}
void PostgresqlPool::closeConnectionPool(vector<PooledConnection*>& connections)
{
PooledConnection* pConn=NULL;
for(vector<PooledConnection*>