package com.angejia.dw.service.property;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import com.angejia.dw.common.util.DateUtil;
import com.angejia.dw.common.util.FileUtil;
import com.angejia.dw.common.util.mysql.JavaMysqlClient;
import com.angejia.dw.hadoop.hive.HiveClient;
import com.angejia.dw.service.Conf;
/**
* 房源主题 Build 数据
*
* @author Jason
*/
public class PropertyInventoryService {
// Hive
private HiveClient sparkHiveClient;
// 业务 Mysql
private JavaMysqlClient productJavaMysqlClient;
// 业务 Mysql
private JavaMysqlClient dwJavaMysqlClient;
// date file 日期映射
private String datePointFile;
private String datePoint;
private String curDate;
/**
* 初始化
*
* @param env
* "dev" 表示测试环境, "online" 表示线上环境
*/
public PropertyInventoryService(String env, String datePointFile) {
/**
* 配置文件
*/
Conf conf = new Conf();
conf.setEnv(env);
/**
* 初始化 业务 mysql 连接
*/
String productMysqHost = conf.getProductMysqDBInfo().get("host");
String productMysqAccount = conf.getProductMysqDBInfo().get("account");
String productMysqPassword = conf.getProductMysqDBInfo().get("password");
String productMysqDefaultDB = conf.getProductMysqDBInfo().get("defaultDB");
// 连接
this.productJavaMysqlClient = new JavaMysqlClient(
"jdbc:mysql://" + productMysqHost + ":3306/" + productMysqDefaultDB
+ "?useUnicode=true&characterEncoding=utf-8",
productMysqAccount,
productMysqPassword);
/**
* 初始化 dw mysql 连接
*/
String dwMysqHost = conf.getDwMysqDBInfo().get("host");
String dwMysqAccount = conf.getDwMysqDBInfo().get("account");
String dwMysqPassword = conf.getDwMysqDBInfo().get("password");
String dwMysqDefaultDB = conf.getDwMysqDBInfo().get("defaultDB");
// 连接
this.dwJavaMysqlClient = new JavaMysqlClient(
"jdbc:mysql://" + dwMysqHost + ":3306/" + dwMysqDefaultDB + "?useUnicode=true&characterEncoding=utf-8",
dwMysqAccount,
dwMysqPassword);
/**
* 初始化 sparkHive 连接
*/
String sparkThriftServerUrl = conf.getSparkConf().get("sparkThriftServerUrl");
String sparkThriftServerUser = conf.getSparkConf().get("sparkThriftServerUser");
String sparkThriftServerPass = conf.getSparkConf().get("sparkThriftServerPass");
try {
this.sparkHiveClient = new HiveClient(sparkThriftServerUrl, sparkThriftServerUser, sparkThriftServerPass);
} catch (SQLException e) {
e.printStackTrace();
}
// 读取日期文件中的日期
this.datePointFile = datePointFile;
this.datePoint = FileUtil.fileInputStream(this.datePointFile);
this.curDate = DateUtil.getCurTime(DateUtil.SIMPLE_FORMAT);
}
/**
* build 安个家房源
*
* @param buildNum
* 每次处理条数
* @throws SQLException
*/
private void buildInventoryRun(int buildNum) {
// 获取记录条数
int mysqlSourceCount = 0;
String countInventorySql = countInventorySqltmpl.replace("${date}", datePoint);
// 获取记录条数
mysqlSourceCount = productJavaMysqlClient.count(countInventorySql);
// 分页次数
int pageNum = mysqlSourceCount / buildNum;
// 每次批量写入数据
for (int i = 0; i <= pageNum; i++) {
// 开始数
int offset = i * buildNum;
this.buildInventory(datePoint, offset, buildNum);
}
}
/**
* build 数据实体
*
* @param date
* 日期 "2016-01-01 00:00:00"
* @param offset
* 查询条数位置
* @param limit
* 每次查询数据量
* @throws SQLException
*/
private void buildInventory(String date, Integer offset, Integer limit) {
String readInventorySql = readInventorySqltmpl.replace("${date}", date)
.replace("${offset}", offset.toString())
.replace("${limit}", limit.toString());
// 查询用到的字段
String mysqlSourceFields = "id,city_id,district_id,block_id,community_id,bedrooms,price,price_tier,area,"
+ "status,is_real,survey_status,provider_id,"
+ "created_at,updated_at";
System.out.println(readInventorySql);
// mysql 最新更新房源数据
List<Map<String, String>> mysqlSourceData = productJavaMysqlClient.select(readInventorySql, mysqlSourceFields);
// mysql 房源 ids
List<String> inventoryIdsArr = new ArrayList<String>();
for (int i = 0; i < mysqlSourceData.size(); i++) {
Map<String, String> row = mysqlSourceData.get(i);
// 房源 id
String inventoryId = row.get("id");
inventoryIdsArr.add(inventoryId);
}
if (inventoryIdsArr.size() == 0)
return;
String inventoryIdsStr = StringUtils.join(inventoryIdsArr, ",");
// Hive 房源数据数据, 通过 房源 ids 获取数据
Map<String, Map<String, String>> hiveSourceDataFomatKey = new HashMap<String, Map<String, String>>();
try {
// sql
String readHiveInventorySql = readHiveInventorySqltmpl.replace("${ids}", inventoryIdsStr);
// 字段
String hiveSourceFields = "inventory_id,inventory_type,inventory_type_id";
System.out.println(readHiveInventorySql);
// 查询
List<Map<String, String>> hiveSourceData = this.sparkHiveClient.select(readHiveInventorySql,
hiveSourceFields);
// 按照 inventoryId 为 key 重新组织数据
for (int i = 0; i <= hiveSourceData.size() - 1; i++) {
Map<String, String> row = hiveSourceData.get(i);
String inventoryId = row.get("inventory_id");
hiveSourceDataFomatKey.put(inventoryId, row);
}
} catch (SQLException e) {
e.printStackTrace();
}
StringBuilder executeSql = new StringBuilder("REPLACE INTO "
+ "dw_service.proerty_inventory_index("
+ "inventory_id,city_id,district_id,block_id,community_id,bedrooms,price,"
+ "price_tier,inventory_type,inventory_type_id,is_real,status,survey_status,"
+ "updated_at,is_marketing,provider_id"
+ ") VALUES ");
// build 数据
for (int i = 0; i < mysqlSourceData.size(); i++) {
// mysql 源中的数据
Map<String, String> row = mysqlSourceData.get(i);
String inventoryId = row.get("id");
String cityId = row.get("city_id");
String districtId = row.get("district_id");
String blockId = row.get("block_id");
String communityId = row.get("community_id");
// 户型
String bedrooms = row.get("bedrooms");
// 房源价格
String price = row.get("price");
// 价格段
String priceTier = row.get("price_tier");
// 在售: 2 在卖
String status = row.get("status");
// 实堪: 2 已实勘
String surveyStatus = row.get("survey_status");
String isReal = row.get("is_real");
String providerId = row.get("provider_id");
// 修改时间
String updatedAt = row.get("updated_at");
// hive 中的数据
// 房源类型
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
基于 spark 推荐系统, 基于内容标签 CBCF 实现,协同过滤 UBCF 实现,协同过滤 IBCF 实现.zip (79个子文件)
code_resource_010
src
test
scala
com
angejia
dw
recommend
inventory
portrait
InventoryPortraitCommonTest.scala 1KB
user
portrait
UserPortraitTest.scala 5KB
main
resources
conf_dev.properties 864B
conf_online.properties 1KB
log4j.properties 973B
java
com
angejia
dw
hadoop
hive
HiveClient.java 5KB
service
user
UserService.java 68B
property
model
Inventory.java 76B
PropertyInventoryService.java 19KB
Conf.java 3KB
common
util
DateUtil.java 6KB
mysql
JavaMysqlClient.java 5KB
DebugUtil.java 551B
FileUtil.java 3KB
parse
ParseMobileAgent.java 2KB
ParseMobileToken.java 2KB
PropertyUtil.java 5KB
JavaJsonUtil.java 3KB
scala
com
angejia
dw
recommend
inventory
InventoryItemCFBak.scala 5KB
InventoryItemCFTest.scala 5KB
InventoryIBCFspark.scala 7KB
InventoryItemCF.scala 7KB
InventoryIBCF.scala 3KB
portrait
MarketingInventoryPortrait.scala 3KB
InventoryPortraitCommon.scala 3KB
IBCF.scala 8KB
UBCF.scala 9KB
extract
ExtractFileToKafka.scala 6KB
community
CommunityIBCF.scala 3KB
Conf.scala 3KB
user
UserUBCF.scala 8KB
UserUBCF20160517.scala 14KB
portrait
UserPortraitTags.scala 23KB
UserPortraitBrowse.scala 5KB
UserPortraitCommon.scala 8KB
UserPortraitLinkInventory.scala 8KB
UserPortraitMemberDemand.scala 15KB
UserPortraitAttenuation.scala 12KB
UserPortraitMemberDemand_20160808.scala 9KB
UserPortraitFilter.scala 6KB
UserPortraitrModelState.scala 5KB
UserPortraitTagConf.scala 5KB
UserPortraitNeeds.scala 9KB
UserPortrait.scala 13KB
UserPortraitVisitItem.scala 8KB
UserPortraitLikeInventory.scala 12KB
hadoop
spark
CollaborativeFiltering.scala 6KB
CollaborativeFilteringTest.scala 6KB
hbase
HBaseClient.scala 13KB
hdfs
HDFSClient.scala 3KB
HDFSClientTest.scala 2KB
kafka
KafkaProducer.scala 5KB
KafkaConsumer.scala 4KB
common
util
RegexUtil.scala 2KB
ListenerFile.scala 5KB
JsonUtil.scala 7KB
mysql
MysqlClient.scala 2KB
ScFileUtil.scala 401B
ScriptUtil.scala 1KB
logs
UbaWebActionLogStreaming.scala 159B
UbaWebVisitLogStreaming.scala 157B
UbaAppActionLogStreaming.scala 159B
build.sbt 5KB
.gitignore 85B
project
plugins.sbt 408B
scripts
CbcfService.sh 517B
inventory
InventoryIBCF.sh 993B
InventoryPortraitClean.sh 765B
PropertyInventoryIndex.sh 848B
CommunityIbcfService.sh 649B
extract
AccessLogToKafka.sh 796B
InventoryPortraitCleanService.sh 355B
community
CommunityIBCF.sh 951B
InventoryIbcfService.sh 649B
user
UserPortrait.sh 990B
UserPortraitAttenuation.sh 753B
UserUbcf.sh 906B
UserUbcfService.sh 340B
UserPortraitAttenuationService.sh 344B
共 79 条
- 1
资源评论
LeapMay
- 粉丝: 3w+
- 资源: 2305
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功