在《在Kafka中使用Avro编码消息:Producter篇》 和 《在Kafka中使用Avro编码消息:Consumer篇》 两篇文章里面我介绍了直接使用原生的 Kafka API生成和消费 Avro 类型的编码消息,本文将继续介绍如何通过 Spark 从 Kafka 中读取这些 Avro 格式化的消息。
其实在 Spark 中读取 Avro 的消息和之前在 Consumer 篇介绍的过程很类似,也是定义一个模式解析器,然后再定义一个 Injection ,这个对象就可以将读出来的 Avro 消息反序列化成我们需要的数据,我这废话就不多说了,直接贴代码吧:
package com.iteblog.avro;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class SparkAvroConsumer {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("kafka-spark")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Set<String> topics = Collections.singleton("iteblog");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "www.iteblog.com:9092");
JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
rdd.foreach(avroRecord -> {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
GenericRecord record = recordInjection.invert(avroRecord._2).get();
System.out.println("str1= " + record.get("str1")
+ ", str2= " + record.get("str2")
+ ", int1=" + record.get("int1"));
});
});
ssc.start();
ssc.awaitTermination();
}
}
可以看出,这个代码结构在前面我们都见过了。USER_SCHEMA 就是咱们在 Producter 里面定义好的模式。上面的代码能够正确地读取并解析 Avro 格式的消息,但是我们细心的同学肯定会发现,我们每读一个avroRecord 就会初始化一个 schema 以及 recordInjection ,这肯定会有开销的, 正确的做法是只初始化一次,所以我们会想到将schema 以及 recordInjection 的初始化工作移到循环外面去:
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
directKafkaStream.foreachRDD(rdd -> {
rdd.foreach(avroRecord -> {
GenericRecord record = recordInjection.invert(avroRecord._2).get();
System.out.println("str1= " + record.get("str1")
+ ", str2= " + record.get("str2")
+ ", int1=" + record.get("int1"));
});
});
不过遗憾的是,上面的代码无法运行,会出现以下的异常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable ... Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericDatumReader ...
从上面可以看出,GenericDatumReader 类是无法序列化的。原因是,Spark 运行的时候会将 recordInjection 对象发送到所有的 workers 上面,这个过程需要序列化 recordInjection 对象,但是这个对象是无法序列化的,所以导致了上面的异常。不过我们可以通过下面的方法解决这个问题:
private static Injection<GenericRecord, byte[]> recordInjection;
static {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
recordInjection = GenericAvroCodecs.toBinary(schema);
}
public static void main(String[] args) {
...
我们把 recordInjection 对象的初始化工作放到 main 函数的最外面,并且使用 static 括起来。这段代码的作用会使得每个 Worker 上的每个 JVM 进程初始化一个recordInjection 对象。然后我们使用这个对象去解析读取到的 Avro 格式对象:
directKafkaStream
.map(message -> recordInjection.invert(message._2).get())
.foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println("str1= " + record.get("str1")
+ ", str2= " + record.get("str2")
+ ", int1=" + record.get("int1"));
});
});
整个过程就完了。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【在Kafka中使用Avro编码消息:Spark篇】(https://www.iteblog.com/archives/2263.html)


