Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase

简介: 环境为CDH5.8,开发工具为IDEA,大数据目前最新的API,送给大家避免踩坑!!

亲自摸索,送给大家,原创文章,转载注明哦。



import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.OutputFormat
/**
  * Created by sunyulong on 16/9/19.
  */
object OBDSQL extends App{
  //kafka topic
  val topics = List(("aaa",1)).toMap
  //zookeeper
  val zk = "10.1.11.71,10.1.11.72,10.1.11.73"
  val conf = new SparkConf() setMaster "yarn-cluster" setAppName "SparkStreamingETL"
  //create streaming context
  val ssc = new StreamingContext(conf , Seconds(1))
  //get every lines from kafka
  val lines = KafkaUtils.createStream(ssc,zk,"sparkStreaming",topics).map(_._2)
  //get spark context
  val sc = ssc.sparkContext
  //get sql context
  val sqlContext = new SQLContext(sc)
  //process every rdd AND save as HTable
  lines.foreachRDD(rdd => {
    //case class implicits
    import sqlContext.implicits._
    //filter empty rdd
    if (!rdd.isEmpty) {
      //register a temp table
      rdd.map(_.split(",")).map(p => Persion(p(0), p(1).trim.toDouble, p(2).trim.toInt, p(3).trim.toDouble)).toDF.registerTempTable("oldDriver")
      //use spark SQL
      val rs = sqlContext.sql("select count(1) from oldDriver")
      //create hbase conf
      val hconf = HBaseConfiguration.create()
      hconf.set("hbase.zookeeper.quorum",zk)
      hconf.set("hbase.zookeeper.property.clientPort", "2181")
      hconf.set("hbase.defaults.for.version.skip", "true")
      hconf.set(TableOutputFormat.OUTPUT_TABLE, "obd_pv")
      hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]])
      val jobConf = new JobConf(hconf)
      //convert every line to hbase lines
      rs.rdd.map(line => (System.currentTimeMillis(),line(0))).map(line =>{
        //create hbase put
        val put = new Put(Bytes.toBytes(line._1))
        //add column
        put.addColumn(Bytes.toBytes("pv"),Bytes.toBytes("pv"),Bytes.toBytes(line._2.toString))
        //retuen type
        (new ImmutableBytesWritable,put)
      }).saveAsNewAPIHadoopDataset(jobConf)     //save as HTable
    }
  })
  //streaming start
  ssc start()
  ssc awaitTermination()
}

//the entity of persion for SparkSQL
case class Persion(gender: String, tall: Double, age: Int, driverAge: Double)


相关文章
|
5月前
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
39 0
|
4月前
|
消息中间件 分布式计算 大数据
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
66 0
|
1月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
47 5
|
3月前
|
消息中间件 分布式计算 Kafka
Spark与Kafka的集成与流数据处理
Spark与Kafka的集成与流数据处理
|
3月前
|
消息中间件 分布式计算 Kafka
使用Kafka与Spark Streaming进行流数据集成
使用Kafka与Spark Streaming进行流数据集成
|
4月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
43 0
|
4月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
71 0
|
4月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
60 0
|
4月前
|
消息中间件 存储 数据采集
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
51 0
|
4月前
|
存储 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(一)案例需求
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(一)案例需求
56 0