欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

在Kafka中使用Avro编码消息:Consumer篇

我在《在Kafka中使用Avro编码消息:Producter篇》文章中简单介绍了如何发送 Avro 类型的消息到 Kafka。本文接着上文介绍如何从 Kafka 读取 Avro 格式的消息。关于 Avro 我这就不再介绍了。

kafka avro
如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

从 Kafka 中读取 Avro 格式的消息

从 Kafka 中读取 Avro 格式的消息和读取其他类型的类型一样,都是创建相关的流,然后迭代:

ConsumerConnector consumer = ...;
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
    ....
}

关键在于如何将读出来的 Avro 类型字节数组转换成我们要的数据。这里还是使用到我们之前介绍的模式解释器:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

上面的 USER_SCHEMA 就是我们在《在Kafka中使用Avro编码消息:Producter篇》文章中介绍的消息模式,我们创建了一个 recordInjection 对象,这个对象就可以利用刚刚解析好的模式将读出来的字节数组反序列化成我们写入的数据:

GenericRecord record = recordInjection.invert(message).get();

然后我们就可以通过下面方法获取写入的数据:

record.get("str1")
record.get("str2")
record.get("int1")

到这里我们差不多就可以完整地写出读取 Avro 数据的 Consumer 了,下面用到的 Kafka 版本是 0.8.2.2:

package com.iteblog.avro;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by https://www.iteblog.com on 2017-09-20.
 */
public class AvroKafkaConsumer {
    Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer");
    private final ConsumerConnector consumer;
    private final String topic;

    public AvroKafkaConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {
        Map<String, Integer> topicCount = new HashMap<>();
        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator it = stream.iterator();
            while (it.hasNext()) {
                MessageAndMetadata messageAndMetadata = it.next();
                String key = new String((byte[])messageAndMetadata.key());
                byte[] message = (byte[]) messageAndMetadata.message();

                Schema.Parser parser = new Schema.Parser();
                Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA);
                Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
                GenericRecord record = recordInjection.invert(message).get();
                logger.info("key=" + key + ", str1= " + record.get("str1")
                        + ", str2= " + record.get("str2")
                        + ", int1=" + record.get("int1"));
            }
        }
        consumer.shutdown();
    }

    public static void main(String[] args) {
        AvroKafkaConsumer simpleConsumer = 
                new AvroKafkaConsumer("www.iteblog.com:2181", "testgroup", "iteblog");
        simpleConsumer.testConsumer();
    }
}

运行

运行的时候同样需要用到 avro-1.8.1.jar,slf4j-api-1.7.21.jar,log4j-1.2.17.jar,slf4j-log4j12-1.7.7.jar 以及 scala-library.jar等相关Jar包,具体运行方式如下:

CLASSPATH=$CLASSPATH:
  
for i in /home/iteblog/lib/*.jar ; do
    CLASSPATH=$CLASSPATH:$i
done
 
java -cp $CLASSPATH:flink-kafka-1.0-SNAPSHOT.jar com.iteblog.avro.AvroKafkaConsumer

当我们运行这个程序的时候,我们就可以打印出如下的数据:

09:33:11,917 INFO  AvroKafkaConsumer           - key=0, str1= Str 1-0, str2= Str 2-0, int1=0
09:33:11,918 INFO  AvroKafkaConsumer           - key=5, str1= Str 1-5, str2= Str 2-5, int1=5
09:33:11,919 INFO  AvroKafkaConsumer           - key=1, str1= Str 1-1, str2= Str 2-1, int1=1
09:33:11,920 INFO  AvroKafkaConsumer           - key=4, str1= Str 1-4, str2= Str 2-4, int1=4
09:33:11,921 INFO  AvroKafkaConsumer           - key=2, str1= Str 1-2, str2= Str 2-2, int1=2
09:33:11,922 INFO  AvroKafkaConsumer           - key=6, str1= Str 1-6, str2= Str 2-6, int1=6
09:33:11,923 INFO  AvroKafkaConsumer           - key=3, str1= Str 1-3, str2= Str 2-3, int1=3
09:33:11,923 INFO  AvroKafkaConsumer           - key=7, str1= Str 1-7, str2= Str 2-7, int1=7
09:33:11,924 INFO  AvroKafkaConsumer           - key=8, str1= Str 1-8, str2= Str 2-8, int1=8
09:33:11,926 INFO  AvroKafkaConsumer           - key=9, str1= Str 1-9, str2= Str 2-9, int1=9

这些数据就是我们在上文的 Producter 程序中生成的。

Kafka 0.9.x 版本实现

如果咱们想在 Kafka 0.9.x 版本里面读取数据,可以如下使用:

package com.iteblog.avro;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by https://www.iteblog.com on 2017-09-20.
 */
public class AvroKafkaConsumer09 {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer");
        Properties props = new Properties();
        props.put("bootstrap.servers", "www.iteblog.com:2181");
        props.put("group.id", "testgroup");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
        String topic = "iteblog";

        consumer.subscribe(Collections.singletonList(topic));
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                for (ConsumerRecord<String, byte[]> record : records) {
                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                    logger.info("topic = %s, partition = %s, offset = %d, customer = %s,country = %s\n", 
                            record.topic(), record.partition(), record.offset(), " + 
                            record.key(), genericRecord.get("str1"));
                }
            }
        } finally {
            consumer.close();
        }
    }
}

下篇文章我将介绍如何通过 Spark 从 Kafka 读取 Avro 的数据,敬请关注。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【在Kafka中使用Avro编码消息:Consumer篇】(https://www.iteblog.com/archives/2210.html)
喜欢 (16)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!