亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Pulsar IO 中怎么調用Schema

發布時間:2021-06-24 15:57:57 來源:億速云 閱讀:186 作者:Leah 欄目:大數據

這篇文章給大家介紹Pulsar IO 中怎么調用Schema ,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

Schema 是一種描述數據的數據   。例如,數據庫中表的信息和字段類型等都是 Schema。Pulsar 對 Schema 也有比較好的支持。    


>>> Schema 簡單應用 <<<  

 
在使用 pub/sub 生產和消費消息時,可以通過以下代碼使用 Schema:  
     
   
   
   
public class SensorReading {              public float temperature;          
             public SensorReading(float temperature) {                  this.temperature = temperature;              }          
             // A no-arg constructor is required              public SensorReading() {              }          
             public float getTemperature() {                  return temperature;              }          
             public void setTemperature(float temperature) {                  this.temperature = temperature;              }          }          Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))              .topic("my-topic")              .create();          Consumer consumer = client.newConsumer(JSONSchema.of(SensorReading.class))                  .topic("my-topic")                  .subscriptionName("my-subscription")                  .subscribe();      


通過以上操作,生產者和消費者可以識別出關于 SensorReading 這個類的含義。這是 Schema 在客戶端的應用,也是比較普遍的使用方法。  

 
前文已經提到,Source 和 Sink 是對 pub/sub 的封裝,因此,Schema 的應用也是基于以上原理。以下為詳細說明。  

 
>>> Source 中的 Schema <<<  

 
在內建的 Sink 中,實現了一個 Consumer,用于接收從 Pulsar 發來的數據。  
     
   
   
   if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {  
    
        schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true);  
    
    } else {  
    
        schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true);  
    
    }


getSerdeClassName    會獲取用戶指定的用于序列化與反序列化的類,通過指定 --   custom-serde-inputs    參數,從而構建真正的 Schema。  
     
   
   
   
case NONE:            return (Schema<T>) Schema.BYTES;          
         case AUTO_CONSUME:          case AUTO:            return (Schema<T>) Schema.AUTO_CONSUME();          
         case STRING:            return (Schema<T>) Schema.STRING;          
         case AVRO:            return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());          
         case JSON:            return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());          
         case KEY_VALUE:            return (Schema<T>)Schema.KV_BYTES();          
         case PROTOBUF:            return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap());          }      


關于Pulsar IO 中怎么調用Schema 就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

鸡东县| 乌恰县| 崇礼县| 泌阳县| 林甸县| 旺苍县| 扶绥县| 临泽县| 三明市| 师宗县| 商洛市| 稷山县| 定南县| 安康市| 甘肃省| 靖宇县| 聊城市| 余干县| 怀远县| 邯郸市| 灵台县| 三亚市| 曲阜市| 永康市| 武邑县| 杂多县| 嵊泗县| 平昌县| 江山市| 尼木县| 松滋市| 新平| 金山区| 吴旗县| 诏安县| 黄陵县| 厦门市| 揭西县| 巨鹿县| 霍山县| 芜湖县|