package com.angejia.dw.service.property;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import com.angejia.dw.common.util.DateUtil;
import com.angejia.dw.common.util.FileUtil;
import com.angejia.dw.common.util.mysql.JavaMysqlClient;
import com.angejia.dw.hadoop.hive.HiveClient;
import com.angejia.dw.service.Conf;
/**
* 房源主题 Build 数据
*
* @author Jason
*/
public class PropertyInventoryService {
// Hive
private HiveClient sparkHiveClient;
// 业务 Mysql
private JavaMysqlClient productJavaMysqlClient;
// 业务 Mysql
private JavaMysqlClient dwJavaMysqlClient;
// date file 日期映射
private String datePointFile;
private String datePoint;
private String curDate;
/**
* 初始化
*
* @param env
* "dev" 表示测试环境, "online" 表示线上环境
*/
public PropertyInventoryService(String env, String datePointFile) {
/**
* 配置文件
*/
Conf conf = new Conf();
conf.setEnv(env);
/**
* 初始化 业务 mysql 连接
*/
String productMysqHost = conf.getProductMysqDBInfo().get("host");
String productMysqAccount = conf.getProductMysqDBInfo().get("account");
String productMysqPassword = conf.getProductMysqDBInfo().get("password");
String productMysqDefaultDB = conf.getProductMysqDBInfo().get("defaultDB");
// 连接
this.productJavaMysqlClient = new JavaMysqlClient(
"jdbc:mysql://" + productMysqHost + ":3306/" + productMysqDefaultDB
+ "?useUnicode=true&characterEncoding=utf-8",
productMysqAccount,
productMysqPassword);
/**
* 初始化 dw mysql 连接
*/
String dwMysqHost = conf.getDwMysqDBInfo().get("host");
String dwMysqAccount = conf.getDwMysqDBInfo().get("account");
String dwMysqPassword = conf.getDwMysqDBInfo().get("password");
String dwMysqDefaultDB = conf.getDwMysqDBInfo().get("defaultDB");
// 连接
this.dwJavaMysqlClient = new JavaMysqlClient(
"jdbc:mysql://" + dwMysqHost + ":3306/" + dwMysqDefaultDB + "?useUnicode=true&characterEncoding=utf-8",
dwMysqAccount,
dwMysqPassword);
/**
* 初始化 sparkHive 连接
*/
String sparkThriftServerUrl = conf.getSparkConf().get("sparkThriftServerUrl");
String sparkThriftServerUser = conf.getSparkConf().get("sparkThriftServerUser");
String sparkThriftServerPass = conf.getSparkConf().get("sparkThriftServerPass");
try {
this.sparkHiveClient = new HiveClient(sparkThriftServerUrl, sparkThriftServerUser, sparkThriftServerPass);
} catch (SQLException e) {
e.printStackTrace();
}
// 读取日期文件中的日期
this.datePointFile = datePointFile;
this.datePoint = FileUtil.fileInputStream(this.datePointFile);
this.curDate = DateUtil.getCurTime(DateUtil.SIMPLE_FORMAT);
}
/**
* build 安个家房源
*
* @param buildNum
* 每次处理条数
* @throws SQLException
*/
private void buildInventoryRun(int buildNum) {
// 获取记录条数
int mysqlSourceCount = 0;
String countInventorySql = countInventorySqltmpl.replace("${date}", datePoint);
// 获取记录条数
mysqlSourceCount = productJavaMysqlClient.count(countInventorySql);
// 分页次数
int pageNum = mysqlSourceCount / buildNum;
// 每次批量写入数据
for (int i = 0; i <= pageNum; i++) {
// 开始数
int offset = i * buildNum;
this.buildInventory(datePoint, offset, buildNum);
}
}
/**
* build 数据实体
*
* @param date
* 日期 "2016-01-01 00:00:00"
* @param offset
* 查询条数位置
* @param limit
* 每次查询数据量
* @throws SQLException
*/
private void buildInventory(String date, Integer offset, Integer limit) {
String readInventorySql = readInventorySqltmpl.replace("${date}", date)
.replace("${offset}", offset.toString())
.replace("${limit}", limit.toString());
// 查询用到的字段
String mysqlSourceFields = "id,city_id,district_id,block_id,community_id,bedrooms,price,price_tier,area,"
+ "status,is_real,survey_status,provider_id,"
+ "created_at,updated_at";
System.out.println(readInventorySql);
// mysql 最新更新房源数据
List<Map<String, String>> mysqlSourceData = productJavaMysqlClient.select(readInventorySql, mysqlSourceFields);
// mysql 房源 ids
List<String> inventoryIdsArr = new ArrayList<String>();
for (int i = 0; i < mysqlSourceData.size(); i++) {
Map<String, String> row = mysqlSourceData.get(i);
// 房源 id
String inventoryId = row.get("id");
inventoryIdsArr.add(inventoryId);
}
if (inventoryIdsArr.size() == 0)
return;
String inventoryIdsStr = StringUtils.join(inventoryIdsArr, ",");
// Hive 房源数据数据, 通过 房源 ids 获取数据
Map<String, Map<String, String>> hiveSourceDataFomatKey = new HashMap<String, Map<String, String>>();
try {
// sql
String readHiveInventorySql = readHiveInventorySqltmpl.replace("${ids}", inventoryIdsStr);
// 字段
String hiveSourceFields = "inventory_id,inventory_type,inventory_type_id";
System.out.println(readHiveInventorySql);
// 查询
List<Map<String, String>> hiveSourceData = this.sparkHiveClient.select(readHiveInventorySql,
hiveSourceFields);
// 按照 inventoryId 为 key 重新组织数据
for (int i = 0; i <= hiveSourceData.size() - 1; i++) {
Map<String, String> row = hiveSourceData.get(i);
String inventoryId = row.get("inventory_id");
hiveSourceDataFomatKey.put(inventoryId, row);
}
} catch (SQLException e) {
e.printStackTrace();
}
StringBuilder executeSql = new StringBuilder("REPLACE INTO "
+ "dw_service.proerty_inventory_index("
+ "inventory_id,city_id,district_id,block_id,community_id,bedrooms,price,"
+ "price_tier,inventory_type,inventory_type_id,is_real,status,survey_status,"
+ "updated_at,is_marketing,provider_id"
+ ") VALUES ");
// build 数据
for (int i = 0; i < mysqlSourceData.size(); i++) {
// mysql 源中的数据
Map<String, String> row = mysqlSourceData.get(i);
String inventoryId = row.get("id");
String cityId = row.get("city_id");
String districtId = row.get("district_id");
String blockId = row.get("block_id");
String communityId = row.get("community_id");
// 户型
String bedrooms = row.get("bedrooms");
// 房源价格
String price = row.get("price");
// 价格段
String priceTier = row.get("price_tier");
// 在售: 2 在卖
String status = row.get("status");
// 实堪: 2 已实勘
String surveyStatus = row.get("survey_status");
String isReal = row.get("is_real");
String providerId = row.get("provider_id");
// 修改时间
String updatedAt = row.get("updated_at");
// hive 中的数据
// 房源类型