最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

新版apimapreducereduce结果写入mysql_MySQL

来源:动视网 责编:小采 时间:2020-11-09 19:21:21
文档

新版apimapreducereduce结果写入mysql_MySQL

新版apimapreducereduce结果写入mysql_MySQL:import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.text.ParseException;import java.text.SimpleDateFormat;import
推荐度:
导读新版apimapreducereduce结果写入mysql_MySQL:import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.text.ParseException;import java.text.SimpleDateFormat;import


import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;/** * 将mapreduce的结果数据写入mysql中 * * @author asheng */public class WriteDataToMysql {	/**	 * 重写DBWritable	 * 	 * @author asheng TblsWritable需要向mysql中写入数据	 */	public static class TblsWritable implements Writable, DBWritable {	String tbl_name;	String tbl_type;	public TblsWritable() {	}	public TblsWritable(String tbl_name, String tab_type) {	this.tbl_name = tbl_name;	this.tbl_type = tab_type;	}	@Override	public void write(PreparedStatement statement) throws SQLException {	statement.setString(1, this.tbl_name);	statement.setString(2, this.tbl_type);	}	@Override	public void readFields(ResultSet resultSet) throws SQLException {	this.tbl_name = resultSet.getString(1);	this.tbl_type = resultSet.getString(2);	}	@Override	public void write(DataOutput out) throws IOException {	out.writeUTF(this.tbl_name);	out.writeUTF(this.tbl_type);	}	@Override	public void readFields(DataInput in) throws IOException {	this.tbl_name = in.readUTF();	this.tbl_type = in.readUTF();	}	public String toString() {	return new String(this.tbl_name + " " + this.tbl_type);	}	}	public static class ConnMysqlMapper extends	Mapper	// TblsRecord是自定义的类型,也就是上面重写的DBWritable类	{	enum Counter {	LINESKIP,	}	private final static IntWritable one = new IntWritable(1);	public void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {	try {	String line = value.toString();	String[] strings = line.split("/t");	String initTime = strings[1];	String devType = strings[4];	if (initTime.length() == 19) {	SimpleDateFormat sdf = new SimpleDateFormat(	"yyyy-MM-dd HH:mm:ss");	Date date = sdf.parse(initTime);	context.write(new Text(initTime.substring(0, 10)),one);	} else {	// System.err.println(initTime);	context.getCounter(Counter.LINESKIP).increment(1);	}	// } catch (ArrayIndexOutOfBoundsException e) {	} catch (ArrayIndexOutOfBoundsException e) {	context.getCounter(Counter.LINESKIP).increment(1);	return;	} catch (ParseException e) {	context.getCounter(Counter.LINESKIP).increment(1);	return;	}	}	}	public static class ConnMysqlReducer extends	Reducer {	public void reduce(Text key, Iterable values, Context context)	throws IOException, InterruptedException {	int count = 0;	for (Iterator itr = values.iterator(); itr.hasNext(); itr	.next()) {	count++;	}	context.write(	new TblsWritable(key.toString(), String.valueOf(count)),	null);	}	}	public static void main(String args[]) throws IOException,	InterruptedException, ClassNotFoundException {	Configuration conf = new Configuration();	DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",	"jdbc:mysql://127.0.0.1:3306/XINGXUNTONG", "hadoop", "123456");	Job job = new Job(conf, "test mysql connection");	job.setJarByClass(WriteDataToMysql.class);	job.setMapperClass(ConnMysqlMapper.class);	job.setReducerClass(ConnMysqlReducer.class);	job.setOutputKeyClass(Text.class);	job.setOutputValueClass(IntWritable.class);	job.setInputFormatClass(TextInputFormat.class);	job.setOutputFormatClass(DBOutputFormat.class);	FileInputFormat.addInputPath(job, new Path(args[0]));	DBOutputFormat.setOutput(job, "test", "initTime", "new_user_total");	System.exit(job.waitForCompletion(true) ? 0 : 1);	}}

之所以写入mysql是因为我们平时处理的Tb级log文件处理结果却很小,写入关系数据库使查询和使用非常便利

文档

新版apimapreducereduce结果写入mysql_MySQL

新版apimapreducereduce结果写入mysql_MySQL:import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.text.ParseException;import java.text.SimpleDateFormat;import
推荐度:
标签: 数据 插入 结果
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top