亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么進行Pulsar Kafka Client的簡單分析

發布時間:2021-12-15 09:19:02 來源:億速云 閱讀:143 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關怎么進行Pulsar Kafka Client的簡單分析,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

為了方便 Kafka 用戶使用 Pulsar,Pulsar 對 Kafka Client 做了一些封裝,讓 Kafka 用戶更方便的使用 Pulsar。

下面主要介紹 Kafka Client 如何將消息發送到 Pulsar, 并從 Pulsar 消費消息,以及如何使用 Pulsar Schema。    

?? 引入依賴

<dependency>  <groupId>org.apache.pulsar</groupId>  <artifactId>pulsar-client-kafka</artifactId>  <version>{project.version}</version></dependency>
依賴引入了 Kafka 的 0.10.2.1 版本的客戶端,還有 Pulsar 對 Kafka Client 封裝后的客戶端。  

?? 使用 Kafka Schema

>>> 添加生產者代碼

String topic = "persistent://public/default/test";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {    producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));}
producer.close();
在上述配置中 topic 是指 Pulsar 中的 Topic,接著使用 Kafka 的配置方式來初始化各種配置,包括 Server 地址、key 的序列化與 value 的序列化類,然后構造一個 ProducerRecord 的類將其發送出去。  

>>> 添加消費者代碼

String topic = "persistent://public/default/test";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());
@SuppressWarnings("resource")Consumer<Integer, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));
while (true) {    ConsumerRecords<Integer, String> records = consumer.poll(100);    records.forEach(record -> {        log.info("Received record: {}", record);    });
   // Commit last offset    consumer.commitSync();}
有些配置同生產者代碼的配置是類似的,例如 topic,Server 等。另外使用 Kafka 的 group.id 作為配置 Pulsar 中的訂閱名稱,關閉自動提交,在消費者端為 key 和 value 配置的是反序列化的類。然后同常規的消費者類似,開始消費消息。  

?? 使用 Pulsar Schema

在上述情況中使用的是 Kafka 的 Schema 來進行序列化與反序列化,當然也支持使用 Pulsar 的 Schema 來進行此過程。下面使用 AVRO 進行簡單的介紹。
首先定義 Schema 所需要使用的 pojo 類。  
@Data@ToString@EqualsAndHashCodepublic class Foo {    @Nullable    private String field1;    @Nullable    private String field2;    private int field3;}
@Data@ToString@EqualsAndHashCodepublic class Bar {    private boolean field1;}

>>> 生產者端代碼

String topic = "persistent://public/default/test-avro";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();bar.setField1(true);
Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);

Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {    producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));    log.info("Message {} sent successfully", i);}
producer.close();
可以看到大部分配置同上面使用 Kafka Client 的配置是類似的,但是中間加入了一些 Pulsar 的 Schema,使用 Foo 作為 key,使用 Bar 類作為 value。  

>>> 消費者端代碼

String topic = "persistent://public/default/test-avro";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();bar.setField1(true);
Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);
@SuppressWarnings("resource")Consumer<Foo, Bar> consumer = new PulsarKafkaConsumer<>(props, fooSchema, barSchema);consumer.subscribe(Arrays.asList(topic));
while (true) {    ConsumerRecords<Foo, Bar> records = consumer.poll(100);    records.forEach(record -> {        log.info("Received record: {}", record);    });
   // Commit last offset    consumer.commitSync();}
消費者端同樣是類似的配置,使用與生產者端相同的 Schema 進行數據的反序列化。      

以上就是怎么進行Pulsar Kafka Client的簡單分析,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

霍林郭勒市| 金乡县| 甘洛县| 黑水县| 曲靖市| 贺州市| 奉化市| 呼和浩特市| 松阳县| 科技| 乌什县| 保康县| 丰县| 剑河县| 平和县| 天全县| 南皮县| 都江堰市| 满洲里市| 大新县| 武乡县| 临泉县| 泽库县| 台东市| 大宁县| 霍州市| 长岭县| 虹口区| 新巴尔虎左旗| 临邑县| 玉林市| 屏南县| 无为县| 乌兰察布市| 平遥县| 民勤县| 西平县| 天水市| 璧山县| 泽州县| 建始县|