最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

HBase之普通BulkLoad

来源:动视网 责编:小采 时间:2020-11-09 14:38:43
文档

HBase之普通BulkLoad

HBase之普通BulkLoad:为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。 Mapper: import com.google.common.base.Strings;import org.apache.hadoop.hbase.io.ImmutableBy
推荐度:
导读HBase之普通BulkLoad:为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。 Mapper: import com.google.common.base.Strings;import org.apache.hadoop.hbase.io.ImmutableBy


为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。 Mapper: import com.google.common.base.Strings;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.io.L

为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。

Mapper:

import com.google.common.base.Strings;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import yeepay.util.HBaseUtil;

public class LoadMapper extends Mapper {

 protected void map(LongWritable key, Text value, Context context) {

 try {

 String line = value.toString();

 if (Strings.isNullOrEmpty(line)) {
 return;
 }

 String[] arr = line.split("\t", 9);

 if (arr.length != 9) {
 throw new RuntimeException("line.splite() not == 9");
 }

 if (arr.length < 1) {
 return;
 }
 String k1 = arr[0];
 ImmutableBytesWritable keyH = new ImmutableBytesWritable(HBaseUtil.getRowKey(k1));
 context.write(keyH, new Text(line));
 } catch (Exception e) {
 throw new RuntimeException(e);
 }

 }


}

Reducer

import com.google.common.base.Splitter;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;

public class LoadReducer extends Reducer {

 final static String[] fileds = new String[]{
 "ID",
 "A_ACCOUNT_ID",
 "A_TRX_ID",
 "P_ID",
 "P_TRXORDER_ID",
 "P_FRP_ID",
 "O_PRODUCTCAT",
 "O_RECEIVER_ID",
 "O_REQUESTID"
 };

 @Override
 public void reduce(ImmutableBytesWritable rowkey, Iterable values, Context context) throws java.io.IOException, InterruptedException {

// super.setID(stringArray[0]);
// this.A_ACCOUNT_ID = stringArray[1];
// this.A_TRX_ID = stringArray[2];
// this.P_ID = stringArray[3];
// this.P_TRXORDER_ID = stringArray[4];
// this.P_FRP_ID = stringArray[5];
// this.O_PRODUCTCAT = stringArray[6];
// this.O_RECEIVER_ID = stringArray[7];
// this.O_REQUESTID = stringArray[8];

 try {
 Text vv = values.iterator().next();
 String vs = vv.toString();

 Splitter splitter = Splitter.on("\t").limit(9);

 Iterable iterable = splitter.split(vs);
 Iterator iterator = iterable.iterator();
// String[] arr = vs.split("\\t", 9);

 int i = 0;
// Put put = new Put(rowkey.get());

 /**
 * 值的写入必须按照顺序。
 */
 Map map = new TreeMap();
 while (iterator.hasNext()) {
 map.put(fileds[i++], iterator.next());
 }

 for (Map.Entry entry : map.entrySet()) {

 KeyValue kv = new KeyValue(rowkey.copyBytes(), Bytes.toBytes("f"), entry.getKey().getBytes(), 0L, entry.getValue().getBytes());
 context.write(rowkey, kv);


 }

 } catch (Exception e) {
 new RuntimeException(e);
 }


 }

}
Job&BulkLoad
package yeepay.load;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import yeepay.util.HdfsUtil;
import yeepay.util.YeepayConstant;

import java.util.Date;

public abstract class AbstractJobBulkLoad {
 public static Configuration conf = HBaseConfiguration.create();

 public void run(String[] args) throws Exception {
 if (args.length < 2) {
 System.err.println("please set input dir");
 System.exit(-1);
 return;
 }
 String txtPath = args[0];
 String tableName = args[1];
 Job job = new Job(conf, "txt2HBase");
 HTable htable = null;
 try {
 htable = new HTable(conf, tableName); //set table name
 // 根据region的数量来决定reduce的数量以及每个reduce覆盖的rowkey范围
 HFileOutputFormat.configureIncrementalLoad(job, htable);
 htable.close();
 job.setJarByClass(AbstractJobBulkLoad.class);
 FileSystem fs = FileSystem.get(conf);

 System.out.println("input file :" + txtPath);
 Path inputFile = new Path(txtPath);
 if (!fs.exists(inputFile)) {
 System.err.println("inputFile " + txtPath + " not exist.");
 throw new RuntimeException("inputFile " + txtPath + " not exist.");
 }
 FileInputFormat.addInputPath(job, inputFile);
//
 job.setMapperClass(getMapperClass());
 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 job.setMapOutputValueClass(Text.class);
 job.setInputFormatClass(TextInputFormat.class);
//
 job.setReducerClass(getReducerClass());
 Date now = new Date();
 Path output = new Path("/output/" + tableName + "/" + now.getTime());
 System.out.println("/output/" + tableName + "/" + now.getTime());
 FileOutputFormat.setOutputPath(job, output);
 job.waitForCompletion(true);
 //执行BulkLoad
 HdfsUtil.chmod(conf, output.toString());
 HdfsUtil.chmod(conf, output + "/" + YeepayConstant.COMMON_FAMILY);
 htable = new HTable(conf, tableName);
 new LoadIncrementalHFiles(conf).doBulkLoad(output, htable);
 htable.close();
 System.out.println("HFile data load success!");
 System.out.println(getJobName() + " end!");

 } catch (Throwable t) {
 throw new RuntimeException(t);
 }
 }

 protected abstract Class getMapperClass();

 protected abstract Class getReducerClass();


 protected abstract String getTableName();

 protected abstract String getJobName();
}

文档

HBase之普通BulkLoad

HBase之普通BulkLoad:为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。 Mapper: import com.google.common.base.Strings;import org.apache.hadoop.hbase.io.ImmutableBy
推荐度:
标签: 普通 保持 为了
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top