虚位以待(AD)
虚位以待(AD)
首页 > 软件编程 > Java编程 > Java使用kafka发送和生产消息的示例

Java使用kafka发送和生产消息的示例
类别:Java编程   作者:码皇   来源:互联网   点击:

本篇文章主要介绍了Java使用kafka发送和生产消息的示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

1. maven依赖包

    <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>

2. 生产者代码

    package com.lnho.example.kafka;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class KafkaProducerExample {
    public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "master:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<>(props);
    for(int i = 0;
    i < 100;
    i++) producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));
    producer.close();
    }
    }

3. 消费者代码

    package com.lnho.example.kafka;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.util.Arrays;
    import java.util.Properties;
    public class KafkaConsumerExample {
    public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "master:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic1"));
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value());
    }
    }
    }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

您可能感兴趣的文章:

  • Kafka利用Java实现数据的生产和消费实例教程
  • Kafka使用Java客户端进行访问的示例代码
  • Java API方式调用Kafka各种协议的方法
相关热词搜索: kafka 发送消息 java kafka发送消息 Java