没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
试读
26页
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN 网址:https://blog.csdn.net/chenwewi520feng/article/details/130454036 本文介绍MapReduce常见的基本用法。 前提是hadoop环境可正常运行。 本文分为五个部分,即介绍自定义序列化、排序、分区、分组和topN。
资源推荐
资源详情
资源评论
@TOC
本文介绍MapReduce常见的基本用法。
前提是hadoop环境可正常运行。
本文分为五个部分,即介绍自定义序列化、排序、分区、分组和topN。
一、pom.xml与测试数据说明、日志配置
1、pom.xml
2、数据字段说明
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-core
-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>2.5.6</version>
</dependency>
date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死
亡病例)
3、日志配置
log4j.properties文件放在resources目录下。log4j.properties内容如下:
# Define some default values that can be overridden by system properties
hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log
# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter
# Logging Threshold
log4j.threshold=ALL
# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
#
# Rolling File Appender - cap space usage at 5gb.
#
hadoop.log.maxfilesize=256MB
hadoop.log.maxbackupindex=20
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
# Daily Rolling File Appender
#
log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
二、序列化
1、需求
统计美国2021-01-28,每个州state累计确诊案例数、累计死亡案例数
2、实现说明
自定义对象CovidBean,用于封装每个州的确诊病例数和死亡病例数。
以州作为map阶段输出的key,以CovidBean作为value,这样属于同一个州的数据就会变成一组进行
reduce处理,进行累加即可得出每个州累计确诊病例。
3、实现
1)、bean
# Rollover at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
#
# TaskLog Appender
#
log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
import org.apache.hadoop.io.Writable;
import lombok.Data;
@Data
public class CovidBean implements Writable {
private String state;
private long cases;
private long deaths;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(state);
2)、Mapper
out.writeLong(cases);
out.writeLong(deaths);
}
@Override
public void readFields(DataInput in) throws IOException {
this.state = in.readUTF();
this.cases = in.readLong();
this.deaths = in.readLong();
}
public String toString() {
return this.cases + "," + this.deaths;
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.hadoop.mr.covid.bean.CovidBean;
//KEYIN, VALUEIN, KEYOUT, VALUEOUT
public class EachStateMapper extends Mapper<LongWritable, Text, Text, CovidBean>
{
Text outKey = new Text();
CovidBean outValue = new CovidBean();
// 2021-01-28,Autauga,Alabama,01001,5554,69
/**
* LongWritable key 行的偏移量
* Text value 每行值
* Context context 上下文
*/
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
//根据每行的数据标志进行截取
String values[] = value.toString().split(",");
//输出key赋值
outKey.set(values[2]);
//输出value赋值
outValue.setState(values[2]);
outValue.setCases(Long.parseLong(values[values.length - 2]));
outValue.setDeaths(Long.parseLong(values[values.length - 1]));
//将输出key-value输出
context.write(outKey, outValue);
}
}
剩余25页未读,继续阅读
资源评论
一瓢一瓢的饮alanchanchn
- 粉丝: 2845
- 资源: 69
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功