在《在Kafka中使用Avro编码消息:Producter篇》 和 《在Kafka中使用Avro编码消息:Consumer篇》 两篇文章里面我介绍了直接使用原生的 Kafka API生成和消费 Avro 类型的编码消息,本文将继续介绍如何通过 Spark 从 Kafka 中读取这些 Avro 格式化的消息。
其实在 Spark 中读取 Avro 的消息和之前在 Consumer 篇介绍的过程很类似,也是定义一个模式解析器,然后再定义一个 Injection ,这个对象就可以将读出来的 Avro 消息反序列化成我们需要的数据,我这废话就不多说了,直接贴代码吧:
package com.iteblog.avro; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; import kafka.serializer.DefaultDecoder; import kafka.serializer.StringDecoder; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; public class SparkAvroConsumer { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("kafka-spark") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); Set<String> topics = Collections.singleton("iteblog"); Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", "www.iteblog.com:9092"); JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics); directKafkaStream.foreachRDD(rdd -> { rdd.foreach(avroRecord -> { Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); GenericRecord record = recordInjection.invert(avroRecord._2).get(); System.out.println("str1= " + record.get("str1") + ", str2= " + record.get("str2") + ", int1=" + record.get("int1")); }); }); ssc.start(); ssc.awaitTermination(); } }
可以看出,这个代码结构在前面我们都见过了。USER_SCHEMA
就是咱们在 Producter 里面定义好的模式。上面的代码能够正确地读取并解析 Avro 格式的消息,但是我们细心的同学肯定会发现,我们每读一个avroRecord 就会初始化一个 schema
以及 recordInjection
,这肯定会有开销的, 正确的做法是只初始化一次,所以我们会想到将schema
以及 recordInjection
的初始化工作移到循环外面去:
Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); directKafkaStream.foreachRDD(rdd -> { rdd.foreach(avroRecord -> { GenericRecord record = recordInjection.invert(avroRecord._2).get(); System.out.println("str1= " + record.get("str1") + ", str2= " + record.get("str2") + ", int1=" + record.get("int1")); }); });
不过遗憾的是,上面的代码无法运行,会出现以下的异常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable ... Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericDatumReader ...
从上面可以看出,GenericDatumReader 类是无法序列化的。原因是,Spark 运行的时候会将 recordInjection
对象发送到所有的 workers 上面,这个过程需要序列化 recordInjection
对象,但是这个对象是无法序列化的,所以导致了上面的异常。不过我们可以通过下面的方法解决这个问题:
private static Injection<GenericRecord, byte[]> recordInjection; static { Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA); recordInjection = GenericAvroCodecs.toBinary(schema); } public static void main(String[] args) { ...
我们把 recordInjection
对象的初始化工作放到 main 函数的最外面,并且使用 static
括起来。这段代码的作用会使得每个 Worker 上的每个 JVM 进程初始化一个recordInjection
对象。然后我们使用这个对象去解析读取到的 Avro 格式对象:
directKafkaStream .map(message -> recordInjection.invert(message._2).get()) .foreachRDD(rdd -> { rdd.foreach(record -> { System.out.println("str1= " + record.get("str1") + ", str2= " + record.get("str2") + ", int1=" + record.get("int1")); }); });
整个过程就完了。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【在Kafka中使用Avro编码消息:Spark篇】(https://www.iteblog.com/archives/2263.html)