没有合适的资源?快使用搜索试试~ 我知道了~
随着互联网信息技术日新月异的发展,一个海量数据爆炸的时代已经到来。如何有效地处理、分析这些海量的数据资源,成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器。可以说,如果不能很好的快速处理分析这些海量的数据资源,将很快被市场无情地所淘汰。当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析。此外还有针对实时在线流式数据处理方面的,同样也是分布式的计算框架Storm,也能很好的满足数据实时性分析、处理的要求。最后还有SpringBatch,这个完全面向批处理的框架,可以大规模的应用于企业级的
资源推荐
资源详情
资源评论
异步并行批处理框架设计的一些思考异步并行批处理框架设计的一些思考
随着互联网信息技术日新月异的发展,一个海量数据爆炸的时代已经到来。如何有效地处理、分析这些海量的数据
资源,成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器。可以说,如果不能很好的快速处理分析这些海量的
数据资源,将很快被市场无情地所淘汰。当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式
计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析。此外还有针对实时在线
流式数据处理方面的,同样也是分布式的计算框架Storm,也能很好的满足数据实时性分析、处理的要求。最后还有
Spring Batch,这个完全面向批处理的框架,可以大规模的应用于企业级的海量数据处理。
在这里,我就不具体展开说明这些框架如何部署、以及如何开发使用的详细教程说明。我想在此基础上更进一
步:我们能否借鉴这些开源框架背后的技术背景,为服务的企业或者公司,量身定制一套符合自身数据处理要求的批
处理框架。
首先我先描述一下,目前我所服务的公司所面临的一个用户数据存储处理的一个现状背景。目前移动公司一个省
内在网用户数据规模达到几千万的规模数量级,而且每个省已经根据地市区域对用户数据进行划分,我们把这批数据
存储在传统的关系型数据库上面(基于Oracle,地市是分区)。移动公司的计费结算系统会根据用户手机话费的余额
情况,实时的通知业务处理系统,给手机用户进行停机、复机的操作。业务处理系统收到计费结算系统的请求,会把
要处理的用户数据往具体的交换机网元上派发不同的交换机指令,这里简单的可以称为Hlr停复机指令(下面开始本文
都简称Hlr指令)。目前面临的现状是,在日常情况下,传统的C++多进程的后台处理程序还能勉强的“准实时”地处理这
些数据请求,但是,如果一旦到了每个月的月初几天,要处理的数据量往往会暴增,而C++后台程序处理的效率并不
高。这时问题来了,往往会有用户投诉,自己缴费了,为什么没有复机?或者某些用户明明已经欠费了,但是还没有
及时停机。这样的结果会直接降低客户对移动运营商支撑的满意度,于此同时,移动运营商本身也可能流失这些客户
资源。
自己认真评估了一下,造成上述问题的几个瓶颈所在。
1.一个省所有的用户数据都放在数据库的一个实体表中,数据库服务器,满打满算达到顶级小型机配置,也可能无法
满足月初处理量激增的性能要求,可以说频繁的在一台服务器上读写IO开销非常巨大,整个服务器处理的性能低下。
2.处理这些数据的时候,会同步地往交换机物理设备上发送Hlr指令,在交换机没有处理成功这个请求指令的时候,只
能阻塞等待,进一步造成后续待处理数据的积压。
针对上述的问题,本人想到了几个优化方案。
1.数据库中的实体表,能不能根据用户的归属地市进行表实体的拆分。即把一台或者几台服务器的压力,进行水平拆
分。一台数据库服务器就重点处理某一个或者几个地市的数据请求?降低IO开销。
2.由于交换机处理Hlr指令的时候,存在阻塞操作,我们能不能改成:通过异步返回处理的方式,把处理任务队列中的
任务先下达通知给交换机,然后交换机通过异步回调机制,反向通知处理模块,汇报任务的执行情况。这样处理模块
就从主动的任务轮询等待,变成等待交换机执行结果的异步通知,这样它就可以专注地进行处理数据的派发,不会受
到某几个任务处理时长的限制,从而影响到后面整批次的数据处理。
数据库的实体表由于进行水平拆解,能不能做到并行加载?这样就会大大节约串行数据加载的处理时长。
3.并行加载出来的待处理数据最好能放到一个批处理框架里面,批处理框架能很好地根据要处理数据的情况,进行配
置参数调整,从而很好地满足实时性的要求。比如月初期间,可以加大处理参数的值,提高处理效率。平常的时候,
可以适当降低处理参数的取值,降低系统的CPU/IO开销。
基于以上几点考虑,得出如下图所示的设计方案的组件图:
下面就具体说明一下,其中关键模块如何协同工作的。
1.异步并行查询加载模块BatchQueryLoader:支持传入多个数据源对象,同时利用google-guava库中对于Future接口
的扩展ListenableFuture,来实现批量查询数据的并行加载。Future接口主要是用来表示异步计算的结果,并且计算完
成的时候,只能用get()方法获取结果,get方法里面其中有一个方法是可以设置超时时间的。在并行加载模块里面,批
量并行地加载多个数据源里面的实体表中的数据,并最终反馈加载的结果集合。并行数据加载和串行数据加载所用的
耗时可以简单用下面的图例来说明:串行加载的总耗时是每个数据源加载耗时的总和。而并行加载的总耗时,取决于
最大加载的那个数据源耗时时长。(注:我们把每天要进行停复机处理的用户数据通过采集程序,分地市分布采集到
水平分库的notify_users提醒用户表)
2.并行异步批处理模块BatchTaskReactor:内部是通过线程池机制来实现的,接受异步并行查询加载模块
BatchQueryLoader得到的加载结果数据,放入线程池中进行任务的异步派发,它最终就是通过Hlr派单指令异步任务执
行HlrBusinessEventTask模块下发指令任务,然后自己不断的从阻塞队列中获取,待执行的任务列表进行任务的分
派。与此同时,他通过Future接口,异步得到HlrBusinessEventTask派发指令的执行反馈结果。
3.批量处理线程池运行参数配置加载BatchTaskConfigurationLoader:加载线程池运行参数的配置,把结果通知并行异
步批处理模块BatchTaskReactor,配置文件batchtask-configuration.xml的内容如下所示。
<?xml version="1.0" encoding="GBK"?>
<batchtask>
<!-- 批处理异步线程池参数配置 -->
<jobpool name="newlandframework_batchtask">
<attribute name="corePoolSize" value="15" />
<attribute name="maxPoolSize" value="30" />
<attribute name="keepAliveTime" value="1000" />
<attribute name="workQueueSize" value="200" />
</jobpool>
</batchtask>
其中corePoolSize表示保留的线程池大小,workQueueSize表示的是阻塞队列的大小,maxPoolSize表示的是线程池的
最大大小,keepAliveTime指的是空闲线程结束的超时时间。其中创建线程池方法ThreadPoolExecutor里面有个参数是
unit,它表示一个枚举,即keepAliveTime的单位。说了半天,这几个参数到底什么关系呢?我举一个例子说明一下,
当出现需要处理的任务的时候,ThreadPoolExecutor会分配corePoolSize数量的线程池去处理,如果不够的话,会把
任务放入阻塞队列,阻塞队列的大小是workQueueSize,当然这个时候还可能不够,怎么办。只能叫来“临时工线程”帮
忙处理一下,这个时候“临时工线程”的数量是maxPoolSize-corePoolSize,当然还会继续不够,这个时候
ThreadPoolExecutor线程池会采取4种处理策略。
现在具体说一下是那些处理策略。首先是ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时
RejectedExecutionException。然后是ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本
身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。其次是,ThreadPoolExecutor.DiscardPolicy 中,
不能执行的任务将被删除。最后是ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作
队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。如果要处理的任务没有那么多
了,ThreadPoolExecutor线程池会根据keepAliveTime设置的时间单位来回收多余的“临时工线程”。你可以把
keepAliveTime理解成专门是为maxPoolSize-corePoolSize的“临时工线程”专用的。
线程池参数的设定。正常情况下我们要如何设置线程池的参数呢?我们应该这样设置:I、workQueueSize阻塞队列的
大小至少大于等于corePoolSize的大小。II、maxPoolSize线程池的大小至少大于等于corePoolSize的大小。III、
corePoolSize是你期望处理的默认线程数,个人觉得线程池机制的话,至少大于1吧?不然的话,你这个线程池等于单
线程处理任务了,这样就失去了线程池设计存在的意义了。
JMX(Java Management Extensions)批处理任务监控模块BatchTaskMonitor:实时地监控线程池BatchTaskReactor
中任务的执行处理情况(具体就是任务成功/失败情况)。
介绍完毕了几个核心模块主要的功能,那下面就依次介绍一下主要模块的详细设计思路。
我们把每天要进行停复机处理的用户数据通过采集程序,采集到notify_users表。首先定义的是,我们要处理采集的通
知用户数据对象的结构描述,它对应水平分库的表notify_users的JavaBean对象。notify_users的表结构为了演示起
见,简单设计如下(基于Oracle数据库):
create table notify_users
(
home_city number(3) /*手机用户的归属地市编码*/,
msisdn number(15) /*手机号码*/,
user_id number(15) /*手机用户的用户标识*/
);
对应JavaBean实体类NotifyUsers,具体代码定义如下:
/**
* @filename:NotifyUsers.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:要进行批处理通知的用户对象
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.model;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
public class NotifyUsers {
public NotifyUsers() {
}
// 用户归属地市编码(这里具体是:591表示福州/592表示厦门)
private Integer homeCity;
// 用户的手机号码
private Integer msisdn;
// 用户标识
private Integer userId;
public Integer getHomeCity() {
return homeCity;
}
public void setHomeCity(Integer homeCity) {
this.homeCity = homeCity;
}
public Integer getMsisdn() {
return msisdn;
}
public void setMsisdn(Integer msisdn) {
this.msisdn = msisdn;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("homeCity", homeCity).append("userId", userId)
.append("msisdn", msisdn).toString();
}
}
异步并行查询加载模块BatchQueryLoader的类图结构:
我们通过并行查询加载模块BatchQueryLoader调用异步并行查询执行器BatchQueryExecutor,来并行地加载不同数据
源的查询结果集合。StatementWrapper则是对JDBC里面Statement的封装。具体代码如下所示:
/**
* @filename:StatementWrapper.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Statement封装类
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import java.sql.Connection;
import java.sql.Statement;
public class StatementWrapper {
private final String sql;
private final Statement statement;
private final Connection con;
public StatementWrapper(String sql, Statement statement, Connection con) {
this.sql = sql;
this.statement = statement;
this.con = con;
}
public String getSql() {
return sql;
}
public Statement getStatement() {
return statement;
}
public Connection getCon() {
return con;
}
}
定义两个并行加载的异常类BatchQueryInterruptedException、BatchQueryExecutionException
剩余48页未读,继续阅读
资源评论
weixin_38631599
- 粉丝: 9
- 资源: 943
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功