亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么在Java中利用kafka發送消息

發布時間:2021-04-08 17:21:56 來源:億速云 閱讀:398 作者:Leah 欄目:編程語言

這期內容當中小編將會給大家帶來有關怎么在Java中利用kafka發送消息,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1. maven依賴包

<dependency> 
 <groupId>org.apache.kafka</groupId> 
 <artifactId>kafka-clients</artifactId> 
 <version>0.9.0.1</version> 
</dependency>

2. 生產者代碼

package com.lnho.example.kafka;  
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.ProducerRecord;   
import java.util.Properties;   
public class KafkaProducerExample { 
 public static void main(String[] args) { 
  Properties props = new Properties(); 
  props.put("bootstrap.servers", "master:9092"); 
  props.put("acks", "all"); 
  props.put("retries", 0); 
  props.put("batch.size", 16384); 
  props.put("linger.ms", 1); 
  props.put("buffer.memory", 33554432); 
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");   
  Producer<String, String> producer = new KafkaProducer<>(props); 
  for(int i = 0; i < 100; i++) 
   producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));   
  producer.close(); 
 } 
}

3. 消費者代碼

package com.lnho.example.kafka;   
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import java.util.Arrays; 
import java.util.Properties;   
public class KafkaConsumerExample { 
 public static void main(String[] args) { 
  Properties props = new Properties(); 
  props.put("bootstrap.servers", "master:9092"); 
  props.put("group.id", "test"); 
  props.put("enable.auto.commit", "true"); 
  props.put("auto.commit.interval.ms", "1000"); 
  props.put("session.timeout.ms", "30000"); 
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
  consumer.subscribe(Arrays.asList("topic1")); 
  while (true) { 
   ConsumerRecords<String, String> records = consumer.poll(100); 
   for (ConsumerRecord<String, String> record : records) 
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); 
  } 
 } 
}

上述就是小編為大家分享的怎么在Java中利用kafka發送消息了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

山阴县| 民县| 龙海市| 德阳市| 吉隆县| 保康县| 枣阳市| 晋城| 巩留县| 长沙县| 肃宁县| 盐边县| 婺源县| 松桃| 准格尔旗| 沙坪坝区| 怀来县| 安达市| 巨野县| 乐陵市| 南川市| 唐河县| 阳江市| 澄迈县| 榆林市| 宝鸡市| 福贡县| 越西县| 东宁县| 宁武县| 商洛市| 天峨县| 锡林郭勒盟| 离岛区| 安化县| 阿城市| 子长县| 庄河市| 常德市| 资讯| 黑山县|