HBase和HDFS数据互导程序-创新互联

下面说说JAVA API提供的这些类的功能和他们之间有什么样的联系。

创新互联是一家集网站建设,千阳企业网站建设,千阳品牌网站建设,网站定制,千阳网站建设报价,网络营销,网络优化,千阳网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。

1.HBaseConfiguration

关系:org.apache.hadoop.hbase.HBaseConfiguration

作用:通过此类可以对HBase进行配置

用法实例: Configuration config = HBaseConfiguration.create();

说明: HBaseConfiguration.create()默认会从classpath中查找 hbase-site.xml中的配置信息,初始化 Configuration。

2.HBaseAdmin 类

关系:org.apache.hadoop.hbase.client.HBaseAdmin

作用:提供接口关系HBase数据库中的表信息

用法:HBaseAdmin admin = new HBaseAdmin(config);

3.Descriptor类

关系:org.apache.hadoop.hbase.HTableDescriptor

作用:HTableDescriptor类包含了表的名字以及表的列族信息

用法:HTableDescriptor htd =new HTableDescriptor(tablename);

            构造一个表描述符指定TableName对象。

           Htd.addFamily(new HColumnDescriptor(“myFamily”));

            将列家族给定的描述符

4.HTable

关系:org.apache.hadoop.hbase.client.HTable

作用:HTable和 HBase的表通信

用法:HTable tab = new HTable(config,Bytes.toBytes(tablename));

         ResultScanner sc = tab.getScanner(Bytes.toBytes(“familyName”));

说明:获取表内列族 familyNme的所有数据。

5.Put

关系:org.apache.hadoop.hbase.client.Put

作用:获取单个行的数据

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

         Put put = new Put(row);

         p.add(family,qualifier,value);

说明:向表 tablename添加 “family,qualifier,value”指定的值。

6.Get

关系:org.apache.hadoop.hbase.client.Get

作用:获取单个行的数据

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

         Get get = new Get(Bytes.toBytes(row));

         Result result = table.get(get);

说明:获取 tablename表中 row行的对应数据

7.ResultScanner

关系:Interface

作用:获取值的接口

用法:ResultScanner scanner = table.getScanner(Bytes.toBytes(family));

         For(Result rowResult : scanner){

                  Bytes[] str = rowResult.getValue(family,column);

}

说明:循环获取行中列值。

例1 HBase之读取HDFS数据写入HBase

package org.hadoop.hbase; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCountHbaseWriter {  public static class WordCountHbaseMapper extends    Mapper {   private final static IntWritable one = new IntWritable(1);   private Text word = new Text();   public void map(Object key, Text value, Context context)     throws IOException, InterruptedException {    StringTokenizer itr = new StringTokenizer(value.toString());    while (itr.hasMoreTokens()) {     word.set(itr.nextToken());     context.write(word, one);// 输出    }   }  }  public static class WordCountHbaseReducer extends    TableReducer {   public void reduce(Text key, Iterable values,     Context context) throws IOException, InterruptedException {    int sum = 0;    for (IntWritable val : values) {// 遍历求和     sum += val.get();    }    Put put = new Put(key.getBytes());//put实例化,每一个词存一行    //列族为content,列修饰符为count,列值为数目    put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));    context.write(new ImmutableBytesWritable(key.getBytes()), put);// 输出求和后的   }  }    public static void main(String[] args){   String tablename = "wordcount";   Configuration conf = HBaseConfiguration.create();     conf.set("hbase.zookeeper.quorum", "192.168.1.139");     conf.set("hbase.zookeeper.property.clientPort", "2191");   HBaseAdmin admin = null;   try {    admin = new HBaseAdmin(conf);    if(admin.tableExists(tablename)){     System.out.println("table exists!recreating.......");     admin.disableTable(tablename);     admin.deleteTable(tablename);    }    HTableDescriptor htd = new HTableDescriptor(tablename);    HColumnDescriptor tcd = new HColumnDescriptor("content");    htd.addFamily(tcd);//创建列族    admin.createTable(htd);//创建表    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();       if (otherArgs.length != 1) {         System.err.println("Usage: WordCountHbaseWriter ");         System.exit(2);       }       Job job = new Job(conf, "WordCountHbaseWriter");   job.setNumReduceTasks(2);       job.setJarByClass(WordCountHbaseWriter.class);    //使用WordCountHbaseMapper类完成Map过程;       job.setMapperClass(WordCountHbaseMapper.class);       TableMapReduceUtil.initTableReducerJob(tablename, WordCountHbaseReducer.class, job);       //设置任务数据的输入路径;       FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    //设置了Map过程的输出类型,其中设置key的输出类型为Text;       job.setOutputKeyClass(Text.class);    //设置了Map过程的输出类型,其中设置value的输出类型为IntWritable;       job.setOutputValueClass(IntWritable.class);    //调用job.waitForCompletion(true) 执行任务,执行成功后退出;       System.exit(job.waitForCompletion(true) ? 0 : 1);   } catch (Exception e) {    e.printStackTrace();   } finally{    if(admin!=null)     try {      admin.close();     } catch (IOException e) {      e.printStackTrace();     }   }     } }

例2 HBase之读取HBase数据写入HDFS

package org.hadoop.hbase; import java.io.IOException; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCountHbaseReader {    public static class WordCountHbaseReaderMapper extends      TableMapper{     @Override     protected void map(ImmutableBytesWritable key,Result value,Context context)             throws IOException, InterruptedException {         StringBuffer sb = new StringBuffer("");         for(Entry entry:value.getFamilyMap("content".getBytes()).entrySet()){             String str =  new String(entry.getValue());             //将字节数组转换为String类型             if(str != null){                 sb.append(new String(entry.getKey()));                 sb.append(":");                 sb.append(str);             }             context.write(new Text(key.get()), new Text(new String(sb)));         }     } }  public static class WordCountHbaseReaderReduce extends Reducer{      private Text result = new Text();      @Override      protected void reduce(Text key, Iterable values,Context context)              throws IOException, InterruptedException {          for(Text val:values){              result.set(val);              context.write(key, result);          }      }  }    public static void main(String[] args) throws Exception {      String tablename = "wordcount";      Configuration conf = HBaseConfiguration.create();      conf.set("hbase.zookeeper.quorum", "192.168.1.139");      conf.set("hbase.zookeeper.property.clientPort", "2191");            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();      if (otherArgs.length != 1) {        System.err.println("Usage: WordCountHbaseReader ");        System.exit(2);      }      Job job = new Job(conf, "WordCountHbaseReader");      job.setJarByClass(WordCountHbaseReader.class);      //设置任务数据的输出路径;      FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));      job.setReducerClass(WordCountHbaseReaderReduce.class);      Scan scan = new Scan();      TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);      //调用job.waitForCompletion(true) 执行任务,执行成功后退出;      System.exit(job.waitForCompletion(true) ? 0 : 1);  } }

程序中用到hadoop的相关JAR包(如下图)及hbase所有jar包

HBase和HDFS数据互导程序

如果上面的API还不能满足你的要求,可以到下面这个网站里面Hbase全部API介绍

http://www.yiibai.com/hbase/

 

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


网站名称:HBase和HDFS数据互导程序-创新互联
链接地址:http://csdahua.cn/article/cegiii.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流