虚位以待(AD)
虚位以待(AD)
首页 > 软件编程 > Java编程 > Spring Boot集成Kafka的示例代码

Spring Boot集成Kafka的示例代码
类别:Java编程   作者:码皇   来源:互联网   点击:

本篇文章主要介绍了Spring Boot集成Kafka的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记

系统环境

使用远程服务器上搭建的kafka服务

  1. Ubuntu 16.04 LTS
  2. kafka_2.12-0.11.0.0.tgz
  3. zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:

    <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.laravelshao.springboot</groupId> <artifactId>spring-boot-integration-kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-boot-integration-kafka</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>

2.添加配置信息,这里使用yml文件

    spring: kafka: bootstrap-servers:X.X.X.X:9092 producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: test auto-offset-reset: earliest value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka

3.创建消息对象

    public class Message {
    private Integer id;
    private String msg;
    public Message() {
    }
    public Message(Integer id, String msg) {
    this.id = id;
    this.msg = msg;
    }
    public Integer getId() {
    return id;
    }
    public void setId(Integer id) {
    this.id = id;
    }
    public String getMsg() {
    return msg;
    }
    public void setMsg(String msg) {
    this.msg = msg;
    }
    @Override public String toString() {
    return "Message{
    " + "id=" + id + ", msg='" + msg + ''' + '}
    ';
    }
    }

4.创建生产者

    package com.laravelshao.springboot.kafka;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    /** * Created by shaoqinghua on 2018/3/23. */@Componentpublic class Producer {
    private static Logger log = LoggerFactory.getLogger(Producer.class);
    @Autowired private KafkaTemplate kafkaTemplate;
    public void send(String topic, Message message) {
    kafkaTemplate.send(topic, message);
    log.info("Producer->topic:{
    }
    , message:{
    }
    ", topic, message);
    }
    }

5.创建消费者,使用@ KafkaListener注解监听主题

    package com.laravelshao.springboot.kafka;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    /** * Created by shaoqinghua on 2018/3/23. */@Componentpublic class Consumer {
    private static Logger log = LoggerFactory.getLogger(Consumer.class);
    @KafkaListener(topics = "test_topic") public void receive(ConsumerRecord<String, Message> consumerRecord) {
    log.info("Consumer->topic:{
    }
    , value:{
    }
    ", consumerRecord.topic(), consumerRecord.value());
    }
    }

6.发送消费测试

    package com.laravelshao.springboot;
    import com.laravelshao.springboot.kafka.Message;
    import com.laravelshao.springboot.kafka.Producer;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ApplicationContext;
    @SpringBootApplicationpublic class IntegrationKafkaApplication {
    public static void main(String[] args) throws InterruptedException {
    ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);
    Producer producer = context.getBean(Producer.class);
    for (int i = 1;
    i < 10;
    i++) {
    producer.send("test_topic", new Message(i, "test topic message " + i));
    Thread.sleep(2000);
    }
    }
    }

可以依次看到发送消息,消费消息

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class "com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
 at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
 at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.lang.Thread.run(Thread.java:745)

解决方法:将当前包添加到kafka信任的包路径下

    spring: kafka: consumer: properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

您可能感兴趣的文章:

  • Spring boot集成Kafka+Storm的示例代码
  • springboot 1.5.2 集成kafka的简单例子
  • spring boot与kafka集成的简单实例
  • 在springboot中对kafka进行读写的示例代码
  • spring boot整合spring-kafka实现发送接收消息实例代码
  • spring boot 与kafka集成的示例代码
相关热词搜索: Spring Boot集成Kafka Spring Boot Kafka集