import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@Slf4j
@Service
@PropertySource("classpath:daopenapi.properties")
public class DBSyncServiceImpl implements DBSyncService {
@Value("${FormalDB.name}")
private String FORMALDB_NAME; //正式库名称
@Value("${FormalDB.pwd}")
private String FORMALDB_PWD; //密码
@Value("${FormalDB.serverName}")
private String FORMALDB_SERVERNAME; //服务器名称
@Value("${SyncDB.name}")
private String SYNCDB_NAME; //同步库名称
@Value("${SyncDB.pwd}")
private String SYNCDB_PWD; //密码
@Value("${SyncDB.serverName}")
private String SYNCDB_SERVERNAMEE; //服务器名称
@Override
public ResultBean dbSync() {
selectData();
return new ResultBean();
}
//查询正式库所有表的数据并插入到同步库里
public Map<String, List<HashMap<String, Object>>> selectData() {
//获取连接
Connection con = null;
Connection conTest = null;
Statement stmt1 = null;
try {
con = getConnection();
log.info("{}库连接成功", FORMALDB_NAME);
conTest = getTestConnection();
log.info("{}库连接成功", SYNCDB_NAME);
//需要同步的数据库表名
List<String> list_table_names = new ArrayList<String>();
list_table_names.add("CT_DEV_QUICKMARK_FIELD");
list_table_names.add("CT_DEV_BUSINESS");
list_table_names.add("CT_DEV_PROVIDER");
list_table_names.add("CT_DEV_CONTRACT_PAY");
list_table_names.add("CT_DEV_CONTRACT");
list_table_names.add("CT_DEV_CONTRACT_DETAIL");
list_table_names.add("CT_DEV_LOCATION");
list_table_names.add("CT_DEV_LOCA_SPOT");
list_table_names.add("CT_DEV_DEVICE_INFO");
list_table_names.add("CT_DEV_DEVICE_INFO_HIS");
list_table_names.add("CT_DEV_BASE_MODEL");
list_table_names.add("CT_DEV_BASE_TYPE");
list_table_names.add("CT_DEV_TEMPLATE");
list_table_names.add("CT_DEV_TEMPLATE_DETAIL");
list_table_names.add("CT_DEV_BASE_CATE");
list_table_names.add("CT_DEV_ITEM");
//这个map 键为表的名称,值为表中所有数据的list集合,list集合包含map,其中键为列名称,值为列对应的值
Map<String, List<HashMap<String, Object>>> totalMap = new HashMap<>();
for (int i = 0; i < list_table_names.size(); i++) {
//循环取出表名
String tableName = list_table_names.get(i);
//查询表的全部信息
String sql = "select * from " + tableName;
//统计数据表有多少条
String countSQL = "SELECT COUNT(*) as count FROM (" + sql + ")";
stmt1 = con.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
//一次一万条
stmt1.setFetchSize(10000);
stmt1.setFetchDirection(ResultSet.FETCH_FORWARD);
ResultSet rs2 = stmt1.executeQuery(countSQL);
int count = 0;
while (rs2.next()) {
count = rs2.getInt(1);
}
log.info(tableName + "表共有{}条数据", count);
//判断表是否存在
exitsTable(tableName, conTest, con);
ResultSet rs1 = stmt1.executeQuery(sql);
rs1.setFetchSize(10000);
ResultSetMetaData rsm = rs1.getMetaData();
//这里List,便于保存
List<HashMap<String, Object>> rows = new ArrayList<HashMap<String, Object>>();
//循环出每列的值放入map里
while (rs1.next()) {
HashMap<String, Object> row = new HashMap<>();
for (int j = 1; j <= rsm.getColumnCount(); j++) {
row.put(rsm.getColumnName(j), rs1.getObject(j));
}
rows.add(row);
//一万条存一次,防止内存溢出
if (rs1.getRow() % 10000 == 0) {
totalMap.put(tableName, rows);
insertNewDataBase(totalMap, conTest);
log.info("插入一万条数据成功");
rows = new ArrayList<>();
totalMap = new HashMap<>();
//不足一万条的存储
} else if (rs1.getRow() == count) {
totalMap.put(tableName, rows);
insertNewDataBase(totalMap, conTest);
log.info("剩余数据插入成功");
rows = new ArrayList<>();
totalMap = new HashMap<>();
}
}
}
con.close();
return totalMap;
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
stmt1.close();
con.close();
conTest.close();
} catch (SQLException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
return null;
}
//插入数据
public void insertNewDataBase(Map<String, List<HashMap<String, Object>>> map, Connection conTest) {
Statement stmtInsert = null;
try {
stmtInsert = conTest.createStatement();
conTest.setAutoCommit(false);
for (Entry<String, List<HashMap<String, Object>>> elh : map.entrySet()) {
String key = elh.getKey();
List<HashMap<String, Object>> lhso = elh.getValue();
StringBuilder sb = new StringBuilder();
StringBuilder sb1 = new StringBuilder();
for (int i = 0; i < lhso.size(); i++) {
HashMap<String, Object> mapColumn = lhso.get(i);
for (Entry<String, Object> eso : mapColumn.entrySet()) {
sb.append(eso.getKey());
sb.append(",");
if (eso.getValue() != null && (eso.getValue().getClass().getName().equals("java.lang.String"))) {
String result = "'" + eso.getValue() + "'";
sb1.append(result);
} else {
sb1.append(eso.getValue());
}
sb1.append(",");
}
sb.deleteCharAt(sb.length() - 1);
sb1.deleteCharAt(sb1.length() - 1);
String column = sb.toString();
String value = sb1.toString();
String sql = "insert into " + key + "(" + column + ")" + "values (" + value + ")";
stmtInsert.addBatch(sql);
//INSERT INTO table_name (列1, 列2,...) VALUES (值1, 值2,....) //这里使用delete清空StringBuilder,搜了一下,是清空中效率最高的
sb = sb.delete(0, sb.length());
sb1 = sb1.delete(0, sb1.length());
}
}
stmtInsert.executeBatch();
conTest.commit();
stmtInsert.close();
log.info("同步成功");
} catch (Exception