欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

用Spark往Kafka里面写对象设计与实现

  SparkKafka都是比较常用的两个大数据框架,Spark里面提供了对Kafka读写的支持。默认情况下我们Kafka只能写Byte数组到Topic里面,如果我们想往Topic里面读写String类型的消息,可以分别使用Kafka里面内置的StringEncoder编码类和StringDecoder解码类。那如果我们想往Kafka里面写对象怎么办?
  别担心,Kafka中的kafka.serializer里面有Decoder和Encoder两个trait,这两个trait就是Kafka Topic消息相关的解码类和编码类,内置的StringDecoder和StringEncoder类分别都是继承那两个trait的。直接将String对象用给定的编码转换成Byte数组。来看下Decoder和Encoder两个trait的实现:

/**
 * A decoder is a method of turning byte arrays into objects.
 * An implementation is required to provide a constructor that
 * takes a VerifiableProperties instance.
 */
trait Decoder[T] {
  def fromBytes(bytes: Array[Byte]): T
}

/**
 * An encoder is a method of turning objects into byte arrays.
 * An implementation is required to provide a constructor that
 * takes a VerifiableProperties instance.
 */
trait Encoder[T] {
  def toBytes(t: T): Array[Byte]
}

  也就是说,我们自定义的编码和解码类只需要分别实现toBytes和fromBytes函数即可。那我们如何将对象转换成Byte数组,并且如何将Byte数组转换回对象呢?记得Java中写对象的类没?我们可以用ByteArrayOutputStream并结合ObjectOutputStream类将对象转换成Byte数组;并用ByteArrayInputStream结合ObjectInputStream类将Byte数组转换回对象。这不就实现了吗??废话不多说,来看看怎么实现:

class IteblogDecoder[T](props: VerifiableProperties = null) extends Decoder[T] {

    def fromBytes(bytes: Array[Byte]): T = {
      var t: T = null.asInstanceOf[T]
      var bi: ByteArrayInputStream = null
      var oi: ObjectInputStream = null
      try {
        bi = new ByteArrayInputStream(bytes)
        oi = new ObjectInputStream(bi)
        t = oi.readObject().asInstanceOf[T]
      }
      catch {
        case e: Exception => {
          e.printStackTrace(); null
        }
      } finally {
        bi.close()
        oi.close()
      }
      t
    }
}

class IteblogEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {

    override def toBytes(t: T): Array[Byte] = {
      if (t == null)
        null
      else {
        var bo: ByteArrayOutputStream = null
        var oo: ObjectOutputStream = null
        var byte: Array[Byte] = null
        try {
          bo = new ByteArrayOutputStream()
          oo = new ObjectOutputStream(bo)
          oo.writeObject(t)
          byte = bo.toByteArray
        } catch {
          case ex: Exception => return byte
        } finally {
          bo.close()
          oo.close()
        }
        byte
      }
    }
}

  这样我们就定义了自己的编码和解码器。那如何使用呢??假设我们有一个Person类。如下:

case class Person(var name: String, var age: Int)

  我们可以在发送数据这么使用:

def getProducerConfig(brokerAddr: String): Properties = {
    val props = new Properties()
    props.put("metadata.broker.list", brokerAddr)
    props.put("serializer.class", classOf[IteblogEncoder[Person]].getName)
    props.put("key.serializer.class", classOf[StringEncoder].getName)
    props
  }

def sendMessages(topic: String, messages: List[Person], brokerAddr: String) {
    val producer = new Producer[String, Person](
    new ProducerConfig(getProducerConfig(brokerAddr)))
    producer.send(messages.map {
      new KeyedMessage[String, Person](topic, "Iteblog", _)
    }: _*)
    producer.close()
}

def main(args: Array[String]) {
    val sparkConf = new S parkConf().setAppName(this.getClass.getSimpleName)
    val ssc = new StreamingContext(sparkConf, Milliseconds(500))
    val topic = args(0)
    val brokerAddr = ":9092"

    val data = List(Person("wyp", 23), Person("spark", 34), Person("kafka", 23),
         Person("iteblog", 23))
    sendMessages(topic, data, brokerAddr)
}

  在接收端可以这么使用


val sparkConf = new S parkConf().setAppName(this.getClass.getSimpleName)
val ssc = new StreamingContext(sparkConf, Milliseconds(500))
val (topic, groupId) = (args(0), args(1))

val kafkaParams = Map("zookeeper.connect" -> ":2181",
      "group.id" -> groupId,
      "auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, Person, StringDecoder,
   IteblogDecoder[Person]](ssc, kafkaParams, Map(topic -> 1), 
  StorageLevel.MEMORY_ONLY)
    stream.foreachRDD(rdd => {
      if (rdd.count() != 0) {
        rdd.foreach(item => if (item != null) println(item))
      } else {
        println("Empty rdd!!")
      }
    })
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  这样可以发送任意可序列化的对象了。下面是效果:

Empty rdd!!
(Iteblog,Person(wyp,23))
(Iteblog,Person(spark,34))
(Iteblog,Person(kafka,23))
(Iteblog,Person(iteblog,23))
Empty rdd!!
Empty rdd!!
Empty rdd!!
Empty rdd!!

  在例子中我们只是简单的将接收到的消息打印到控制台。如果没有接收到消息,则打印Empty rdd!!。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【用Spark往Kafka里面写对象设计与实现】(https://www.iteblog.com/archives/1296.html)
喜欢 (16)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(11)个小伙伴在吐槽
  1. producer.send(messages.map {
    }: _*)
    大哥这块代码没看懂 {new KeyedMessage[String, Person](topic, "Iteblog", _)} 后面这个: _*怎么理解

    可惜不2015-11-04 10:53 回复
    • 这个是scala语法, :_* 作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:_*)就是将1 to 5当作参数序列处理。可以看下下面的例子:

      object Test1 extends App {
      
        def printAll(strings: String*) {
          println("------")
          strings.foreach(println)
        }
      
        printAll()
        printAll("foo")
        printAll("foo", "bar")
        printAll("foo", "bar", "baz")
      
        // the special ":_*" syntax
        val fruits = List("apple", "banana", "cherry")
        printAll("numbers", fruits : _*)
        
      }
      

      w3970907702015-11-04 11:08 回复
  2. 您好,我用java实现了一个类似的功能,但是spark streaming却不能接收kafka中自定义的对象,能否帮忙看下是什么问题呢?具体情况我在帖子中描述了,http://www....com/thread-13405-1-1.html, 谢谢

    逸云丫丫2015-05-29 15:46 回复
    • 不能接收是不能解析接收到的内容,还是根本就没读到数?如果是前者,那么很有可能是因为你的Encoder没起作用;如果是后者,那么你得看看你Kafka里面是否真的有内容。
      除了这些,你运行程序里面是否有什么异常信息呢?

      w3970907702015-05-29 17:22 回复
      • 日志上出现了以下错误:

        15/05/29 18:06:24 INFO ConsumerFetcherThread: [ConsumerFetcherThread-0_cdhdatanode2-1432893983783-f24fb853-0-0], Starting 
        15/05/29 18:06:24 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1432893983849] Added fetcher for partitions ArrayBuffer([[kafka,0], initOffset 118 to broker id:0,host:cdhdatanode2,port:9092] )
        15/05/29 18:06:24 ERROR KafkaReceiver: Error handling message; exiting
        org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 6B696C6C
        	at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:168)
        	at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
        	at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:62)
        	at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:1)
        	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
        	at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:134)
        	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        	at java.lang.Thread.run(Thread.java:745)
        


        我使用的SerializationUtils.serialize和SerializationUtils.deserialize进行序列化和反序列化,为什么会报上面的错误?

        逸云丫丫2015-05-29 18:09 回复
        • 从你的错误看,是反序列化的时候出错;好像是在解析‘;’符号出现问题的,我建议你看下你日志数据本身是否有问题,是否有字符集的问题。

          w3970907702015-05-29 18:15 回复
          • 好的,谢谢,我再检查下

            逸云丫丫2015-05-29 18:23
          • 您好,我修改了字符集之后,可以正常接收到数据,但是解码的时候出现了新的问题:
            15/05/30 11:09:03 ERROR KafkaReceiver: Error handling message; exiting
            org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.chinatime.spark.streaming.kafka.simpleTest.Student
            at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166)
            at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
            at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:66)
            at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:1)
            at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
            at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:134)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
            at java.util.concurrent.FutureTask.run(FutureTask.java:262)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:745)
            我是使用如下命令提交的任务,而且上面所说的Student.class就在KafkaTest.jar包中
            bin/spark-submit --jars /opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar --class com.chinatime.spark.streaming.kafka.simpleTest.KafkaTest /tmp/KafkaTest.jar
            并且在构建sparkContext时,使用sparkConf.setJars(new String[] { "/tmp/KafkaTest.jar", "/opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar"});添加了jar
            在Application UI中的Classpath Entries中看到如下信息
            http://192.168.0.53:60922/jars/KafkaTest.jar Added By User
            http://192.168.0.53:60922/jars/spark-streaming-kafka-assembly_2.10-1.3.1.jar Added By User
            对于上面的问题,您有什么建议吗?谢谢^_^

            逸云丫丫2015-05-30 11:21
          • 您好,我使用您文章中提到的“用ByteArrayOutputStream并结合ObjectOutputStream类将对象转换成Byte数组;并用ByteArrayInputStream结合ObjectInputStream类将Byte数组转换回对象”方法替换了SerializationUtils.serialize和SerializationUtils.deserialize,可以正常解码了,可这是为什么呢?

            逸云丫丫2015-05-30 11:53
  3. 感謝分享!
    另外,在文章右上角的twitter連結,都會連結到google+
    若能提供正確tiwwter之連結分享,將會更方便!TKS!

    allenchen2015-03-30 17:00 回复
    • hi,非常感谢你的建议,在国内那两个网站是'不存在'的,所以当时就随便添加了。

      w3970907702015-03-30 21:29 回复