SparkStreaming消费kafka数据

概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。

实例代码

package com.fwmagic.test

import com.alibaba.fastjson.{JSON, JSONException}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory

/**
  * created by fwmagic
  */
object RealtimeEtl {

  private val logger = LoggerFactory.getLogger(PVUV.getClass)

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hadoop")

    val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]")

    val spark = SparkSession.builder().config(conf).getOrCreate()

    val streamContext = new StreamingContext(spark.sparkContext, Seconds(5))

    //直连方式相当于跟kafka的Topic至直接连接
    //"auto.offset.reset:earliest(每次重启重新开始消费),latest(重启时会从最新的offset开始读取)
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "fwmagic",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("access")

    val kafkaDStream = KafkaUtils.createDirectStream[String, String](
      streamContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    //如果使用SparkStream和Kafka直连方式整合,生成的kafkaDStream必须调用foreachRDD
    kafkaDStream.foreachRDD(kafkaRDD => {
      if (!kafkaRDD.isEmpty()) {
        //获取当前批次的RDD的偏移量
        val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges

        //拿出kafka中的数据
        val lines = kafkaRDD.map(_.value())
        //将lines字符串转换成json对象
        val logBeanRDD = lines.map(line => {
          var logBean: LogBean = null
          try {
            logBean = JSON.parseObject(line, classOf[LogBean])
          } catch {
            case e: JSONException => {
              //logger记录
              logger.error("json解析错误!line:" + line, e)
            }
          }
          logBean
        })

        //过滤
        val filteredRDD = logBeanRDD.filter(_ != null)

        //将RDD转化成DataFrame,因为RDD中装的是case class
        import spark.implicits._

        val df = filteredRDD.toDF()

        df.show()
        //将数据写到hdfs中:hdfs://hd1:9000/360
        df.repartition(1).write.mode(SaveMode.Append).parquet(args(0))

        //提交当前批次的偏移量,偏移量最后写入kafka
        kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })

    //启动
    streamContext.start()
    streamContext.awaitTermination()
    streamContext.stop()

  }

}

case class LogBean(time:String,
                   longitude:Double,
                   latitude:Double,
                   openid:String,
                   page:String,
                   evnet_type:Int)

依赖环境(pom.xml)



    4.0.0

    com.fwmagic.360
    fwmagic-360
    1.0

    
        1.8
        1.8
        2.11.7
        2.2.2
        2.7.7
        UTF-8
    

    
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        

        
        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
        

        
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
        

        
        
            org.apache.spark
            spark-streaming_2.11
            ${spark.version}
        

        
            org.apache.spark
            spark-streaming-kafka-0-10_2.11
            ${spark.version}
        

        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
            com.alibaba
            fastjson
            1.2.39
        

    

    
        
            
                
                
                    net.alchim31.maven
                    scala-maven-plugin
                    3.2.2
                
                
                
                    org.apache.maven.plugins
                    maven-compiler-plugin
                    3.5.1
                
            
        
        
            
                net.alchim31.maven
                scala-maven-plugin
                
                    
                        scala-compile-first
                        process-resources
                        
                            add-source
                            compile
                        
                    
                    
                        scala-test-compile
                        process-test-resources
                        
                            testCompile
                        
                    
                
            

            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    
                        compile
                        
                            compile
                        
                    
                
            

            
            
                org.apache.maven.plugins
                maven-shade-plugin
                2.4.3
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    *:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                        
                    
                
            
        
    


当前名称:SparkStreaming消费kafka数据
分享地址:http://csdahua.cn/article/ggicpc.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流