欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

在Kafka中使用Avro编码消息:Spark篇

《在Kafka中使用Avro编码消息:Producter篇》《在Kafka中使用Avro编码消息:Consumer篇》 两篇文章里面我介绍了直接使用原生的 Kafka API生成和消费 Avro 类型的编码消息,本文将继续介绍如何通过 SparkKafka 中读取这些 Avro 格式化的消息。

在Kafka中使用Avro编码消息:Spark篇
如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

其实在 Spark 中读取 Avro 的消息和之前在 Consumer 篇介绍的过程很类似,也是定义一个模式解析器,然后再定义一个 Injection ,这个对象就可以将读出来的 Avro 消息反序列化成我们需要的数据,我这废话就不多说了,直接贴代码吧:

package com.iteblog.avro;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class SparkAvroConsumer {


    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("kafka-spark")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        Set<String> topics = Collections.singleton("iteblog");
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "www.iteblog.com:9092");

        JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);

        directKafkaStream.foreachRDD(rdd -> {
            rdd.foreach(avroRecord -> {
                Schema.Parser parser = new Schema.Parser();
                Schema schema = parser.parse(USER_SCHEMA);
                Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
                GenericRecord record = recordInjection.invert(avroRecord._2).get();

                System.out.println("str1= " + record.get("str1")
                        + ", str2= " + record.get("str2")
                        + ", int1=" + record.get("int1"));
            });
        });

        ssc.start();
        ssc.awaitTermination();
    }
}

可以看出,这个代码结构在前面我们都见过了。USER_SCHEMA 就是咱们在 Producter 里面定义好的模式。上面的代码能够正确地读取并解析 Avro 格式的消息,但是我们细心的同学肯定会发现,我们每读一个avroRecord 就会初始化一个 schema 以及 recordInjection ,这肯定会有开销的, 正确的做法是只初始化一次,所以我们会想到将schema 以及 recordInjection 的初始化工作移到循环外面去:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

directKafkaStream.foreachRDD(rdd -> {
    rdd.foreach(avroRecord -> {
        GenericRecord record = recordInjection.invert(avroRecord._2).get();
        System.out.println("str1= " + record.get("str1")
                + ", str2= " + record.get("str2")
                + ", int1=" + record.get("int1"));
    });
});

不过遗憾的是,上面的代码无法运行,会出现以下的异常:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericDatumReader
...

从上面可以看出,GenericDatumReader 类是无法序列化的。原因是,Spark 运行的时候会将 recordInjection 对象发送到所有的 workers 上面,这个过程需要序列化 recordInjection 对象,但是这个对象是无法序列化的,所以导致了上面的异常。不过我们可以通过下面的方法解决这个问题:

private static Injection<GenericRecord, byte[]> recordInjection;

static {
    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    recordInjection = GenericAvroCodecs.toBinary(schema);
}

public static void main(String[] args) {
    ...

我们把 recordInjection 对象的初始化工作放到 main 函数的最外面,并且使用 static 括起来。这段代码的作用会使得每个 Worker 上的每个 JVM 进程初始化一个recordInjection 对象。然后我们使用这个对象去解析读取到的 Avro 格式对象:

directKafkaStream
        .map(message -> recordInjection.invert(message._2).get())
        .foreachRDD(rdd -> {
            rdd.foreach(record -> {
                System.out.println("str1= " + record.get("str1")
                        + ", str2= " + record.get("str2")
                        + ", int1=" + record.get("int1"));
            });
        });

整个过程就完了。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【在Kafka中使用Avro编码消息:Spark篇】(https://www.iteblog.com/archives/2263.html)
喜欢 (19)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!