没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
第 1 页 共 57 页 出自石山园,博客地址:http://www.cnblogs.com/shishanyuan
MapReduce 应用案例
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明
显位置给出原文连接,博主为石山园,博客地址为 http://www.cnblogs.com/shishanyuan 。
该系列课程是应邀实验楼整理编写的,这里需要赞一下实验楼提供了学习的新方式,可以边看博
客边上机实验,课程地址为
【注】该系列所使用到安装包、测试数据和代码均可在百度网盘下载,具体地址为
http://pan.baidu.com/s/10PnDs,下载该
https://www.shiyanlou.com/courses/237
PDF 文件
1 环境说明
部署节点操作系统为 CentOS,防火墙和 SElinux 禁用,创建了一个 shiyanlou 用户并在系
统根目录下创建/app 目录,用于存放 Hadoop 等组件运行包。因为该目录用于安装 hadoop
等组件程序,用户对 shiyanlou 必须赋予 rwx 权限(一般做法是 root 用户在根目录下创建/app
目录,并修改该目录拥有者为 shiyanlou(chown –R shiyanlou:shiyanlou /app)。
Hadoop 搭建环境:
虚拟机操作系统: CentOS6.6 64 位,单核,1G 内存
JDK:1.7.0_55 64 位
Hadoop:1.1.2
2 准备测试数据
测试数据包括两个文件 dept(部门)和 emp(员工),其中各字段用逗号分隔:
dept 文件内容:
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
emp 文件内容:
7369,SMITH,CLERK,7902,17-12 月-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2 月-81,1600,300,30
第 2 页 共 57 页 出自石山园,博客地址:http://www.cnblogs.com/shishanyuan
7521,WARD,SALESMAN,7698,22-2 月-81,1250,500,30
7566,JONES,MANAGER,7839,02-4 月-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9 月-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5 月-81,2850,,30
7782,CLARK,MANAGER,7839,09-6 月-81,2450,,10
7839,KING,PRESIDENT,,17-11 月-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9 月-81,1500,0,30
7900,JAMES,CLERK,7698,03-12 月-81,950,,30
7902,FORD,ANALYST,7566,03-12 月-81,3000,,20
7934,MILLER,CLERK,7782,23-1 月-82,1300,,10
在/home/shiyanlou/install-pack/class6 目录可以找到这两个文件,把这两个文件上传到
HDFS 中/class6/input 目录中,执行如下命令:
cd /home/shiyanlou/install-pack/class6
hadoop fs -mkdir -p /class6/input
hadoop fs -copyFromLocal dept /class6/input
hadoop fs -copyFromLocal emp /class6/input
hadoop fs -ls /class6/input
第 3 页 共 57 页 出自石山园,博客地址:http://www.cnblogs.com/shishanyuan
3 应用案例
3.1 测试例子 1:求各个部门的总工资
3.1.1 问题分析
MapReduce 中的 join 分为好几种,比如有最常见的 reduce side join、map side join 和
semi join 等。reduce join 在 shuffle 阶段要进行大量的数据传输,会造成大量的网络 IO 效率
低下,而 map side join 在处理多个小表关联大表时非常有用 。
Map side join 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个
表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个 map task
内存中存在一份(比如存放到 hash table 中),然后只扫描大表:对于大表中的每一条记录
key/value,在 hash table 中查找是否有相同的 key 的记录,如果有,则连接后输出即可。为
了支持文件的复制,Hadoop 提供了一个类 DistributedCache,使用该类的方法如下:
(1)用户使用静态方法 DistributedCache.addCacheFile()指定要复制的文件,它的参数
是文件的 URI (如果是 HDFS 上的文件,可以这样:
hdfs://jobtracker:50030/home/XXX/file)。 JobTracker 在作业启动之前会获取这个
URI 列表,并将相应的文件拷贝到各个 TaskTracker 的本地磁盘上。
(2)用户使用 DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的
文件读写 API 读取相应的文件。
在下面代码中,将会把数据量小的表(部门 dept)缓存在内存中,在 Mapper 阶段对员工部
门编号映射成部门名称,该名称作为 key 输出到 Reduce 中,在 Reduce 中计算按照部门计算
各个部门的总工资。
第 4 页 共 57 页 出自石山园,博客地址:http://www.cnblogs.com/shishanyuan
3.1.2 处理流程图
3.1.3 测试代码
Q1SumDeptSalary.java 代码(vi 编辑代码是不能存在中文):
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
第 5 页 共 57 页 出自石山园,博客地址:http://www.cnblogs.com/shishanyuan
public class Q1SumDeptSalary extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用于缓存 dept文件中的数据
private Map<String, String> deptMap = new HashMap<String, String>();
private String[] kv;
// 此方法会在Map方法执行之前执行且执行一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String deptIdName = null;
for (Path path : paths) {
// 对部门文件字段进行拆分并缓存到deptMap中
if (path.toString().contains("dept")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (deptIdName = in.readLine())) {
// 对部门文件字段进行拆分并缓存到deptMap中
// 其中Map中key为部门编号,value为所在部门名称
deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
// 对员工文件字段进行拆分
剩余56页未读,继续阅读
资源评论
ansoncloud
- 粉丝: 5
- 资源: 31
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功