public class SparkHBaseTest {
public static void main(String[] args) throws Exception {
//JARS中必须包含HBase的jar
JavaSparkContext sc = new JavaSparkContext("spark://hostname:7077", "HBaseTest",
System.getenv("SPARK_HOME"), System.getenv("JARS"));
Configuration conf = HBaseConfiguration.create();
// Other options for configuring scan behavior are available. More information available at
//在项目classpath下放上hadoop以及hbase的配置文件
String tableName = "TestTableName";
conf.set(TableInputFormat.INPUT_TABLE, tableName);
1 个回复
Bob - 同程旅游大数据+BI 架构师 2016-02-25 回答
赞同来自:
import spark.api.java.JavaSparkContext;
import spark.api.java.function.FlatMapFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Result;
public class SparkHBaseTest {
public static void main(String[] args) throws Exception {
//JARS中必须包含HBase的jar
JavaSparkContext sc = new JavaSparkContext("spark://hostname:7077", "HBaseTest",
System.getenv("SPARK_HOME"), System.getenv("JARS"));
Configuration conf = HBaseConfiguration.create();
// Other options for configuring scan behavior are available. More information available at
//在项目classpath下放上hadoop以及hbase的配置文件
String tableName = "TestTableName";
conf.set(TableInputFormat.INPUT_TABLE, tableName);
HBaseAdmin admin = new HBaseAdmin(conf);
if(!admin.isTableAvailable(tableName)) {
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
admin.createTable(tableDesc);
}
//获得HBase的查询结果
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(
conf,
TableInputFormat.class,
ImmutableBytesWritable.class,
Result.class);
//对结果进行操作 hBaseRDD
System.exit(0);
}
}