My software versions
Spark 1.6.1, HBase 1.2.1, run on EMR 4.7.1
Spark and HBase installation:
http://dmitrypukhov.pro/install-apache-spark-on-ubuntu/,
http://dmitrypukhov.pro/install-hbase-on-linux-dev/
Configure Spark
Edit spark-defaults.conf and ensure spark.driver.extraClassPath and spark.executor.extraClassPath contain path to hbase libraries. For me it is /usr/lib/hbase/lib/*
My extra class pathes:
1 2 3 4 5 6 7 |
spark.driver.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.j ar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/ auxlib/*:/usr/lib/hbase/lib/* spark.executor.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.j ar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/ auxlib/*:/usr/lib/hbase/lib/* |
Start spark-shell and do all the following there:
Import necessary classes:
1 2 3 4 5 6 7 8 9 10 |
import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; |
Configure access to HBase
1 2 3 4 |
val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "myExistingTable") // If your HBase is not default, set HBase zookeeper conf.set("hbase.zookeeper.quorum", "192.168.0.10") |
Create Spark RDD on HBase table
1 2 3 4 5 |
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) rdd.count() rdd.map(t => (t._1.toString(), t._2.toString())).first() |
Use Spark SQL to query HBase
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// My data contain values in "pd:data" // The values are in json format val jsonRdd = sqlContext.jsonRDD(( rdd.map(t=>t._2) .map(r=>Bytes.toString(r.getValue(Bytes.toBytes("pd"),Bytes.toBytes("data"))) )) jsonRdd.printSchema() // Use spark sql to query the data jsonRdd.registerTempTable("pd") // My table contains a column numberOfDevices // I want to sum it up sqlContext.sql("select sum(numberOfDevices) from pd").collect() |