在某些情况下,我们可能会在Spring中将一些WEB上的信息发送到Kafka中,这时候我们就需要在Spring中编写Producer相关的代码了;不过高兴的是,Spring本身提供了操作Kafka的相关类库,我们可以直接通过xml文件配置然后直接在后端的代码中使用Kafka,非常地方便。本文将介绍如果在Spring中将消息发送到Kafka。在这之前,请将下面的依赖加入到你的 pom.xml 文件中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>1.0.0.M2</version>
</dependency>
在Spring中将消息发送到Kafka需要我们定义一个配置文件,Spring为我们提供了Outbound Channel Adapter,其主要将消息从Spring框架发送到Kafka,我们需要在xml文件中配置 int-kafka:outbound-channel-adapter 标签,这样可以使得Spring可以抽取到消息的Key、目标主题、分区等信息。本文将介绍如何把用户注册的信息发送到Kafka。这里用到的配置文件如下:
<int:publish-subscribe-channel id="inputToKafka"/>
<int-kafka:outbound-channel-adapter
kafka-producer-context-ref="kafkaProducerContext" auto-startup="true"
channel="inputToKafka" order="1">
</int-kafka:outbound-channel-adapter>
<int-kafka:producer-context id="kafkaProducerContext"
producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="www.iteblog.com:9092" key-class-type="java.lang.String"
value-class-type="com.iteblog.dao.User"
topic="user" compression-codec="none"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<bean id="producerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="topic.metadata.refresh.interval.ms">3600000</prop>
<prop key="message.send.max.retries">5</prop>
<prop key="send.buffer.bytes">5242880</prop>
</props>
</property>
</bean>
int:publish-subscribe-channel 标签定义了消息发送的通道;int-kafka:producer-context 标签里面可以定义producer 的context,在里面我们可以设置broker的地址,key和value的类型,需要发送消息的目标Topic等相关属性;我们可以在自定义的bean中定义Kafka Producer的相关属性,本文对应的是producerProperties,这里我们定义了 topic.metadata.refresh.interval.ms 等相关属性,更多的属性可以参见Kafka的官方文档。然后可以在 int-int-kafka:producer-context 标签通过 producer-properties 来引用。配置文件设置好之后,我们就可以编写Java代码了:
package com.iteblog.controller;
import com.iteblog.dao.User;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* Created by https://www.iteblog.com on 2016/10/21.
*/
@Controller
public class IteblogController {
@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;
private static final Log logger = LogFactory.getLog(IteblogController.class);
@RequestMapping(method = RequestMethod.POST, value = "/register")
@ResponseBody
public User registerJson(User user) {
logger.warn("User: " + user);
channel.send(MessageBuilder.withPayload(user).build());
return user;
}
}
withPayload 接受的就是消息的内容;MessageChannel 就是发送消息的通道,所有的消息都是通过这个通道发送到Kafka;User 类的代码如下:
package com.iteblog.dao;
/**
* Created by https://www.iteblog.com on 2016/10/21.
*/
public class User {
private String userName;
private String email;
public User() {
}
public User(String userName, String email) {
this.userName = userName;
this.email = email;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "User{" +
"userName='" + userName + '\'' +
", email='" + email + '\'' +
'}';
}
}
在发送消息的时候,我们还可以设置消息的Key,以及需要发送的Topic,如下:
channel.send(MessageBuilder.withPayload(user)
.setHeader("topic", "user")
.setHeader("messageKey", user.getUserName()).build());
这里通过设置topic、messageKey等Header信息,来分别制定目标主题的名称(当然,如果只有一个主题我们不需要手动指定,Spring会自定选择配置文件里面指定的Topic;如果有多个需要手动指定)和key的值。我们还可以设置消息的Key和Value编码格式,如下:
<bean id="userEncoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="com.iteblog.dao.User"></constructor>
</bean>
<bean id="keyEncoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="java.lang.String"></constructor>
</bean>
然后在配置文件里面配置:
<int-kafka:producer-context id="kafkaProducerContext"
producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="www.iteblog.com:9092" key-class-type="java.lang.String"
value-class-type="com.iteblog.dao.User"
value-encoder="userEncoder"
key-encoder="keyEncoder"
topic="user" compression-codec="none"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
如果我们得Topic有多个分区,我们还可以指定每条消息的分区ID计算规则,如下:
<bean id="iteblogPartitioner" class="org.springframework.integration.kafka.support.DefaultPartitioner"/>
我们使用了Spring默认分区类,也就是计算Key的hashCode再对分区数求模(Utils.abs(key.hashCode()) % numPartitions),然后我们可以在 int-kafka:producer-configuration 里面加上以下配置
partitioner="iteblogPartitioner"
到这里,我们就可以在Tomcat中启动上面的Web工程,然后访问https://www.iteblog.com/register并传入 userName 和 email 参数即可将消息发送到Kafka,完整的工程目录结构如下:
.
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── iteblog
│ ├── controller
│ │ └── IteblogController.java
│ └── dao
│ └── User.java
├── resources
│ ├── applicationContext.xml
│ ├── kafka-outbound-context.xml
│ └── log4j.properties
└── webapp
├── index.jsp
├── register.html
└── WEB-INF
└── web.xml
10 directories, 9 files
我将在后面的文章介绍如何在Spring中接收Kafka的消息,敬请关注。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【在Spring中使用Kafka:Producer篇】(https://www.iteblog.com/archives/1860.html)


