最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

sqoopclientjavaapi将mysql的数据导到hdfs

来源:动视网 责编:小采 时间:2020-11-09 16:43:45
文档

sqoopclientjavaapi将mysql的数据导到hdfs

sqoopclientjavaapi将mysql的数据导到hdfs:mysqlsqoophdfshadoop package com.hadoop.recommend; import org.apache.sqoop.client.SqoopClient;import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MFromConfig;import org.apache.sqoop.model.MJob; import org.apache.sqoop
推荐度:
导读sqoopclientjavaapi将mysql的数据导到hdfs:mysqlsqoophdfshadoop package com.hadoop.recommend; import org.apache.sqoop.client.SqoopClient;import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MFromConfig;import org.apache.sqoop.model.MJob; import org.apache.sqoop
 mysqlsqoophdfshadoop

 package com.hadoop.recommend;
 import org.apache.sqoop.client.SqoopClient;import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MFromConfig;import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MLinkConfig;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.model.MToConfig;
 import org.apache.sqoop.submission.counter.Counter;
 import org.apache.sqoop.submission.counter.CounterGroup;
 import org.apache.sqoop.submission.counter.Counters;
 import org.apache.sqoop.validation.Status;
 public class MysqlToHDFS { 
 public static void main(String[] args) { 
 sqoopTransfer(); 
 } 
 public static void sqoopTransfer() { 
 //初始化 
 String url = "http://master:12000/sqoop/"; 
 SqoopClient client = new SqoopClient(url); 
 //创建一个源链接 JDBC 
 long fromConnectorId = 2; 
 MLink fromLink = client.createLink(fromConnectorId); 
 fromLink.setName("JDBC connector"); 
 fromLink.setCreationUser("hadoop"); 
 MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig(); 
 fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://master:3306/hive"); 
 fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); 
 fromLinkConfig.getStringInput("linkConfig.username").setValue("root"); 
 fromLinkConfig.getStringInput("linkConfig.password").setValue(""); 
 Status fromStatus = client.saveLink(fromLink); 
 if(fromStatus.canProceed()) { 
 System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId()); 
 } else { 
 System.out.println("创建JDBC Link失败"); 
 } 
 //创建一个目的地链接HDFS 
 long toConnectorId = 1; 
 MLink toLink = client.createLink(toConnectorId); 
 toLink.setName("HDFS connector"); 
 toLink.setCreationUser("hadoop"); 
 MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig(); 
 toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://master:9000/"); 
 Status toStatus = client.saveLink(toLink); 
 if(toStatus.canProceed()) { 
 System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId()); 
 } else { 
 System.out.println("创建HDFS Link失败"); 
 } 
 //创建一个任务 
 long fromLinkId = fromLink.getPersistenceId(); 
 long toLinkId = toLink.getPersistenceId(); 
 MJob job = client.createJob(fromLinkId, toLinkId); 
 job.setName("MySQL to HDFS job"); 
 job.setCreationUser("hadoop"); 
 //设置源链接任务配置信息 
 MFromConfig fromJobConfig = job.getFromJobConfig(); 
 fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop"); 
 fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop"); 
 fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id"); 
 MToConfig toJobConfig = job.getToJobConfig(); 
 toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/user/hdfs/recommend"); 
 MDriverConfig driverConfig = job.getDriverConfig(); 
 driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3"); 
 Status status = client.saveJob(job); 
 if(status.canProceed()) { 
 System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId()); 
 } else { 
 System.out.println("JOB创建失败。"); 
 } 
 //启动任务 
 long jobId = job.getPersistenceId(); 
 MSubmission submission = client.startJob(jobId); 
 System.out.println("JOB提交状态为 : " + submission.getStatus()); 
 while(submission.getStatus().isRunning() && submission.getProgress() != -1) { 
 System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100)); 
 //三秒报告一次进度 
 try { 
 Thread.sleep(3000); 
 } catch (InterruptedException e) { 
 e.printStackTrace(); 
 } 
 } 
 System.out.println("JOB执行结束... ..."); 
 System.out.println("Hadoop任务ID为 :" + submission.getExternalId()); 
 Counters counters = submission.getCounters(); 
 if(counters != null) { 
 System.out.println("计数器:"); 
 for(CounterGroup group : counters) { 
 System.out.print("\t"); 
 System.out.println(group.getName()); 
 for(Counter counter : group) { 
 System.out.print("\t\t"); 
 System.out.print(counter.getName()); 
 System.out.print(": "); 
 System.out.println(counter.getValue()); 
 } 
 } 
 } 
 if(submission.getExceptionInfo() != null) { 
 System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo()); 
 } 
 System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕"); 
 }
 }

报了这个错失咋回事??

文档

sqoopclientjavaapi将mysql的数据导到hdfs

sqoopclientjavaapi将mysql的数据导到hdfs:mysqlsqoophdfshadoop package com.hadoop.recommend; import org.apache.sqoop.client.SqoopClient;import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MFromConfig;import org.apache.sqoop.model.MJob; import org.apache.sqoop
推荐度:
标签: 数据 API java
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top