package mysqlAndHdfs;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Function: 测试 mr 与 mysql 的数据交互,此测试用例将一个表中的数据复制到另一张表中 实际当中,可能只需要从 mysql 读,或者写到
* mysql 中。
*
* @author administrator
*
*/
public class AppMySQL2MR {
public static class StudentinfoRecord implements Writable, DBWritable {
int id;
String name;
public StudentinfoRecord() {
}
public String toString() {
return new String(this.id + " " + this.name);
}
@Override
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}
}
// 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:
// Caused by: java.lang.NoSuchMethodException: DBInputMapper.<init>()
// http://stackoverflow.com/questions/7154125/custom-mapreduce-input-format-cant-find-constructor
// 网上脑残式的转帖,没见到一个写对的。。。
public static class DBInputMapper extends Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
@Override
public void map(LongWritable key, StudentinfoRecord value, Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(value.id), new Text(value.toString()));
}
}
public static class MyReducer extends Reducer<LongWritable, Text, StudentinfoRecord, Text> {
@Override
public void reduce(LongWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String[] splits = values.iterator().next().toString().split(" ");
StudentinfoRecord r = new StudentinfoRecord();
r.id = Integer.parseInt(splits[0]);
r.name = splits[1];
context.write(r, new Text(r.name));
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
// 这句话很关键
conf.set("mapred.job.tracker", "127.0.0.1:9001");
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://127.0.0.1:3306/houzw?serverTimezone=UTC", "root", "root");
// Job job = copyMysqltoMysql(conf);
// System.exit(job.waitForCompletion(true) ? 0 : 1);
// Job job1 = hdfs2MysqlAndHdfs(conf);
// System.exit(job1.waitForCompletion(true) ? 0 : 1);
}
@SuppressWarnings("unused")
private static Job hdfs2MysqlAndHdfs(Configuration conf) throws IOException {
@SuppressWarnings("deprecation")
Job job1 = new Job(conf, "Mysql2Mr");
// job.setJarByClass(Mysql2Mr.class);
job1.setMapOutputKeyClass(LongWritable.class);
job1.setMapOutputValueClass(Text.class);
job1.setMapperClass(DBInputMapper.class);
job1.setReducerClass(MyReducer.class);
job1.setOutputKeyClass(LongWritable.class);
job1.setOutputValueClass(Text.class);
job1.setOutputFormatClass(DBOutputFormat.class);
// job1.setInputFormatClass(DBInputFormat.class);
Path outputDir1 = new Path("hdfs://localhost:9000/user/output/mysql/data1/" + 111111 + "/");
Path outputDir = new Path("hdfs://localhost:9000/user/output/mysql/data1/" + 22222 + "/");
FileOutputFormat.setOutputPath(job1, outputDir);
FileInputFormat.setInputPaths(job1, outputDir1);
String[] fields1 = { "id", "name" };
// 从 t 表读数据
DBInputFormat.setInput(job1, StudentinfoRecord.class, "t", null, "id", fields1);
// mapreduce 将数据输出到 t2 表
DBOutputFormat.setOutput(job1, "t3", "id", "name");
return job1;
}
@SuppressWarnings("unused")
private static Job copyMysqltoMysql(Configuration conf) throws IOException {
@SuppressWarnings("deprecation")
Job job = new Job(conf, "Mysql2Mr");
// job.setJarByClass(Mysql2Mr.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(DBInputMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(DBOutputFormat.class);
// job.setOutputFormatClass(CustomeDBOutputFormat.class);
job.setInputFormatClass(DBInputFormat.class);
Path outputDir = new Path("hdfs://localhost:9000/user/output/mysql/data/" + 111111 + "/");
FileOutputFormat.setOutputPath(job, outputDir);
String[] fields = { "id", "name" };
// 从 t 表读数据
DBInputFormat.setInput(job, StudentinfoRecord.class, "t", null, "id", fields);
// mapreduce 将数据输出到 t2 表
DBOutputFormat.setOutput(job, "t2", "id", "name");
// CustomeDBOutputFormat.setOutput(job, "t3", "id", "name");
// MultipleTextOutputFormat
return job;
}
}
评论0
最新资源