欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:976
  2. 浏览总数:11,916,702
  3. 评论:3931
  4. 分类目录:106 个
  5. 注册用户数:6089
  6. 最后更新:2018年12月11日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

用Spark往Kafka里面写对象设计与实现

  SparkKafka都是比较常用的两个大数据框架,Spark里面提供了对Kafka读写的支持。默认情况下我们Kafka只能写Byte数组到Topic里面,如果我们想往Topic里面读写String类型的消息,可以分别使用Kafka里面内置的StringEncoder编码类和StringDecoder解码类。那如果我们想往Kafka里面写对象怎么办?
  别担心,Kafka中的kafka.serializer里面有Decoder和Encoder两个trait,这两个trait就是Kafka Topic消息相关的解码类和编码类,内置的StringDecoder和StringEncoder类分别都是继承那两个trait的。直接将String对象用给定的编码转换成Byte数组。来看下Decoder和Encoder两个trait的实现:

/**
 * A decoder is a method of turning byte arrays into objects.
 * An implementation is required to provide a constructor that
 * takes a VerifiableProperties instance.
 */
trait Decoder[T] {
  def fromBytes(bytes: Array[Byte]): T
}

/**
 * An encoder is a method of turning objects into byte arrays.
 * An implementation is required to provide a constructor that
 * takes a VerifiableProperties instance.
 */
trait Encoder[T] {
  def toBytes(t: T): Array[Byte]
}

  也就是说,我们自定义的编码和解码类只需要分别实现toBytes和fromBytes函数即可。那我们如何将对象转换成Byte数组,并且如何将Byte数组转换回对象呢?记得Java中写对象的类没?我们可以用ByteArrayOutputStream并结合ObjectOutputStream类将对象转换成Byte数组;并用ByteArrayInputStream结合ObjectInputStream类将Byte数组转换回对象。这不就实现了吗??废话不多说,来看看怎么实现:

class IteblogDecoder[T](props: VerifiableProperties = null) extends Decoder[T] {

    def fromBytes(bytes: Array[Byte]): T = {
      var t: T = null.asInstanceOf[T]
      var bi: ByteArrayInputStream = null
      var oi: ObjectInputStream = null
      try {
        bi = new ByteArrayInputStream(bytes)
        oi = new ObjectInputStream(bi)
        t = oi.readObject().asInstanceOf[T]
      }
      catch {
        case e: Exception => {
          e.printStackTrace(); null
        }
      } finally {
        bi.close()
        oi.close()
      }
      t
    }
}

class IteblogEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {

    override def toBytes(t: T): Array[Byte] = {
      if (t == null)
        null
      else {
        var bo: ByteArrayOutputStream = null
        var oo: ObjectOutputStream = null
        var byte: Array[Byte] = null
        try {
          bo = new ByteArrayOutputStream()
          oo = new ObjectOutputStream(bo)
          oo.writeObject(t)
          byte = bo.toByteArray
        } catch {
          case ex: Exception => return byte
        } finally {
          bo.close()
          oo.close()
        }
        byte
      }
    }
}

  这样我们就定义了自己的编码和解码器。那如何使用呢??假设我们有一个Person类。如下:

case class Person(var name: String, var age: Int)

  我们可以在发送数据这么使用:

def getProducerConfig(brokerAddr: String): Properties = {
    val props = new Properties()
    props.put("metadata.broker.list", brokerAddr)
    props.put("serializer.class", classOf[IteblogEncoder[Person]].getName)
    props.put("key.serializer.class", classOf[StringEncoder].getName)
    props
  }

def sendMessages(topic: String, messages: List[Person], brokerAddr: String) {
    val producer = new Producer[String, Person](
    new ProducerConfig(getProducerConfig(brokerAddr)))
    producer.send(messages.map {
      new KeyedMessage[String, Person](topic, "Iteblog", _)
    }: _*)
    producer.close()
}

def main(args: Array[String]) {
    val sparkConf = new S parkConf().setAppName(this.getClass.getSimpleName)
    val ssc = new StreamingContext(sparkConf, Milliseconds(500))
    val topic = args(0)
    val brokerAddr = "https://www.iteblog.com:9092"

    val data = List(Person("wyp", 23), Person("spark", 34), Person("kafka", 23),
         Person("iteblog", 23))
    sendMessages(topic, data, brokerAddr)
}

  在接收端可以这么使用


val sparkConf = new S parkConf().setAppName(this.getClass.getSimpleName)
val ssc = new StreamingContext(sparkConf, Milliseconds(500))
val (topic, groupId) = (args(0), args(1))

val kafkaParams = Map("zookeeper.connect" -> "https://www.iteblog.com:2181",
      "group.id" -> groupId,
      "auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, Person, StringDecoder,
   IteblogDecoder[Person]](ssc, kafkaParams, Map(topic -> 1), 
  StorageLevel.MEMORY_ONLY)
    stream.foreachRDD(rdd => {
      if (rdd.count() != 0) {
        rdd.foreach(item => if (item != null) println(item))
      } else {
        println("Empty rdd!!")
      }
    })
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  这样可以发送任意可序列化的对象了。下面是效果:

Empty rdd!!
(Iteblog,Person(wyp,23))
(Iteblog,Person(spark,34))
(Iteblog,Person(kafka,23))
(Iteblog,Person(iteblog,23))
Empty rdd!!
Empty rdd!!
Empty rdd!!
Empty rdd!!

  在例子中我们只是简单的将接收到的消息打印到控制台。如果没有接收到消息,则打印Empty rdd!!。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【用Spark往Kafka里面写对象设计与实现】(https://www.iteblog.com/archives/1296.html)
喜欢 (12)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(11)个小伙伴在吐槽
  1. producer.send(messages.map { }: _*)大哥这块代码没看懂 {new KeyedMessage[String, Person](topic, "Iteblog", _)} 后面这个: _*怎么理解
    可惜不2015-11-04 10:53 回复
    • 这个是scala语法, :_* 作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:_*)就是将1 to 5当作参数序列处理。可以看下下面的例子:
      object Test1 extends App { def printAll(strings: String*) { println("------") strings.foreach(println) } printAll() printAll("foo") printAll("foo", "bar") printAll("foo", "bar", "baz") // the special ":_*" syntax val fruits = List("apple", "banana", "cherry") printAll("numbers", fruits : _*) }
      w3970907702015-11-04 11:08 回复
  2. 您好,我用java实现了一个类似的功能,但是spark streaming却不能接收kafka中自定义的对象,能否帮忙看下是什么问题呢?具体情况我在帖子中描述了,http://www....com/thread-13405-1-1.html, 谢谢
    逸云丫丫2015-05-29 15:46 回复
    • 不能接收是不能解析接收到的内容,还是根本就没读到数?如果是前者,那么很有可能是因为你的Encoder没起作用;如果是后者,那么你得看看你Kafka里面是否真的有内容。除了这些,你运行程序里面是否有什么异常信息呢?
      w3970907702015-05-29 17:22 回复
      • 日志上出现了以下错误:
        15/05/29 18:06:24 INFO ConsumerFetcherThread: [ConsumerFetcherThread-0_cdhdatanode2-1432893983783-f24fb853-0-0], Starting 15/05/29 18:06:24 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1432893983849] Added fetcher for partitions ArrayBuffer([[kafka,0], initOffset 118 to broker id:0,host:cdhdatanode2,port:9092] )15/05/29 18:06:24 ERROR KafkaReceiver: Error handling message; exitingorg.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 6B696C6C at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:168) at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193) at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:62) at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:1) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:134) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
        我使用的SerializationUtils.serialize和SerializationUtils.deserialize进行序列化和反序列化,为什么会报上面的错误?
        逸云丫丫2015-05-29 18:09 回复
        • 从你的错误看,是反序列化的时候出错;好像是在解析‘;’符号出现问题的,我建议你看下你日志数据本身是否有问题,是否有字符集的问题。
          w3970907702015-05-29 18:15 回复
          • 好的,谢谢,我再检查下
            逸云丫丫2015-05-29 18:23
          • 您好,我修改了字符集之后,可以正常接收到数据,但是解码的时候出现了新的问题:15/05/30 11:09:03 ERROR KafkaReceiver: Error handling message; exitingorg.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.chinatime.spark.streaming.kafka.simpleTest.Student at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166) at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193) at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:66) at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:1) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:134) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)我是使用如下命令提交的任务,而且上面所说的Student.class就在KafkaTest.jar包中bin/spark-submit --jars /opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar --class com.chinatime.spark.streaming.kafka.simpleTest.KafkaTest /tmp/KafkaTest.jar并且在构建sparkContext时,使用sparkConf.setJars(new String[] { "/tmp/KafkaTest.jar", "/opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar"});添加了jar在Application UI中的Classpath Entries中看到如下信息http://192.168.0.53:60922/jars/KafkaTest.jar Added By Userhttp://192.168.0.53:60922/jars/spark-streaming-kafka-assembly_2.10-1.3.1.jar Added By User对于上面的问题,您有什么建议吗?谢谢^_^
            逸云丫丫2015-05-30 11:21
          • 您好,我使用您文章中提到的“用ByteArrayOutputStream并结合ObjectOutputStream类将对象转换成Byte数组;并用ByteArrayInputStream结合ObjectInputStream类将Byte数组转换回对象”方法替换了SerializationUtils.serialize和SerializationUtils.deserialize,可以正常解码了,可这是为什么呢?
            逸云丫丫2015-05-30 11:53
  3. 感謝分享!另外,在文章右上角的twitter連結,都會連結到google+若能提供正確tiwwter之連結分享,將會更方便!TKS!
    allenchen2015-03-30 17:00 回复
    • hi,非常感谢你的建议,在国内那两个网站是'不存在'的,所以当时就随便添加了。
      w3970907702015-03-30 21:29 回复