package big;
import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class TestQ {
private boolean readFileFromFile(String localPath){
boolean flag = true;
List<String[]> retList = new ArrayList<String[]>();// 数据list
int i = 0;
int startFlg = 0;
try {
File fin = new File(localPath);
if (null == fin) {
System.out.println("readFileFromFile:读取文件出错");
flag = false;
}
BigFileRead bFileRead = new BigFileRead(fin,"gbk");
String line = "";
String lineTmp = "";
byte[] resultTmp = null;
byte[] resultT = null;
byte[] resultB = null;
while (bFileRead.hasNext()) {
resultT = bFileRead.next();
lineTmp = new String(resultT);
//每行的数据字段之前是用tab符分割的,但是如果最后是空的,人为加个空格,否则长度不对
if (lineTmp.lastIndexOf("\t") == lineTmp.length()-1) {
lineTmp += " ";
resultTmp = new byte[resultT.length+1];
System.arraycopy(resultT, 0, resultTmp,0,resultT.length);
resultTmp[resultTmp.length-1] = 32;//加空格
}else{
resultTmp = new byte[resultT.length];
System.arraycopy(resultT, 0, resultTmp,0,resultT.length);
}
//因为文本每行都是21个字段,如果少了,即被分割了,需要拼接
if (startFlg > 0 && lineTmp.indexOf("\n")==-1 && line.split("\t").length < 21) {
byte[] lastB= new byte[resultB.length+resultTmp.length];
System.arraycopy(resultB, 0, lastB, 0, resultB.length);
System.arraycopy(resultTmp, 0, lastB, resultB.length, resultTmp.length);
line = new String(lastB);//拼接被分割的一行数据
}else {
line = lineTmp;
resultB = resultTmp;
}
String[] results = line.split("\t");
if (results.length >= 21) {
i++;
System.out.println(results.length+"==="+i+","+line);
retList.add(results);
if (i == 1000) {// 1000条入一次库,防止数据量大,内存溢出
insertJfFtp(retList);
i = 0;
retList.clear();
}
}
startFlg++;
}
if (retList != null && retList.size() > 0) {
insertJfFtp(retList);
}
}catch (Exception e) {
e.printStackTrace();
flag = false;
}finally{
}
return flag;
}
/**
* 获取本地数据连接
*/
protected Connection getLocalConnection() throws Exception {
Connection conn = null;
String url = "jdbc:oracle:thin:@127.0.0.1:1521:bidev";
String user = "aa";
String password = "aa";
try {
Class.forName("oracle.jdbc.driver.OracleDriver").newInstance();
conn = DriverManager.getConnection(url, user, password);
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
/**
* FTP上的预警数据入库
* @param retList
* @param task
*/
public void insertJfFtp(List<String[]> dataList) {
int k = 1;
Connection con = null;
PreparedStatement pstmt = null;
String sql = "insert into tablea(partition_key, stat_month,"
+ "inter_user_id, login_name, surname, area_id, supply_type,user_type, community, "
+ "distric_id, distric_name, address, alarm_type, prodid, prodname, band_privid,"
+ "band_privname, band_priv_end, band_priv_enddate, req_type, is_handle)"
+ " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
try {
con = getLocalConnection();
pstmt = con.prepareStatement(sql);
for (int i = 0; i < dataList.size(); i++) {
k = 1;
String [] tmp = dataList.get(i);
for (int j = 0; j < tmp.length && j < 21; j++) {
pstmt.setString(k++, tmp[j]);
}
pstmt.addBatch();
}
pstmt.executeBatch();
con.commit();
} catch (Exception e) {
System.out.println("DBJob->insertJfFtp:"
+ e.getMessage());
e.printStackTrace();
try {
con.rollback();
} catch (SQLException e1) {
e1.printStackTrace();
}
} finally {
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
TestQ aimport=new TestQ();
aimport.readFileFromFile("E:\\NL_DATA\\jiake\\I_309056_201412_0001.TXT");
}
}
- 1
- 2
- 3
- 4
前往页