亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java中的Kafka 怎么利用API進行調用

發布時間:2020-12-04 15:41:27 來源:億速云 閱讀:146 作者:Leah 欄目:編程語言

今天就跟大家聊聊有關Java中的Kafka 怎么利用API進行調用,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

1.客戶端創建對應協議的請求

2.客戶端發送請求給對應的broker

3.broker處理請求,并發送response給客戶端

雖然Kafka提供的大量的腳本工具用于各種功能的實現,但很多時候我們還是希望可以把某些功能以編程的方式嵌入到另一個系統中。這時使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個范例,同時也會針對“創建topic”和“查看位移”這兩個主要功能給出對應的例子。 需要提前說明的是,本文給出的范例并沒有考慮Kafka集群開啟安全的情況。另外Kafka的KIP4應該一直在優化命令行工具以及各種管理操作,有興趣的讀者可以關注這個KIP。

本文中用到的API依賴于kafka-clients,所以如果你使用Maven構建的話,請加上:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.2.0</version>
</dependency>

如果是gradle,請加上:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底層框架

/**
   * 發送請求主方法
   * @param host     目標broker的主機名
   * @param port     目標broker的端口
   * @param request    請求對象
   * @param apiKey    請求類型
   * @return       序列化后的response
   * @throws IOException
   */
  public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
    Socket socket = connect(host, port);
    try {
      return send(request, apiKey, socket);
    } finally {
      socket.close();
    }
  }

  /**
   * 發送序列化請求并等待response返回
   * @param socket      連向目標broker的socket
   * @param request      序列化后的請求
   * @return         序列化后的response
   * @throws IOException
   */
  private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
  }

  /**
   * 發送序列化請求給socket
   * @param socket      連向目標broker的socket
   * @param request      序列化后的請求
   * @throws IOException
   */
  private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
  }

  /**
   * 從給定socket處獲取response
   * @param socket      連向目標broker的socket
   * @return         獲取到的序列化后的response
   * @throws IOException
   */
  private byte[] getResponse(Socket socket) throws IOException {
    DataInputStream dis = null;
    try {
      dis = new DataInputStream(socket.getInputStream());
      byte[] response = new byte[dis.readInt()];
      dis.readFully(response);
      return response;
    } finally {
      if (dis != null) {
        dis.close();
      }
    }
  }

  /**
   * 創建Socket連接
   * @param hostName     目標broker主機名
   * @param port       目標broker服務端口, 比如9092
   * @return         創建的Socket連接
   * @throws IOException
   */
  private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
  }

  /**
   * 向給定socket發送請求
   * @param request    請求對象
   * @param apiKey    請求類型, 即屬于哪種請求
   * @param socket    連向目標broker的socket
   * @return       序列化后的response
   * @throws IOException
   */
  private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
  }

有了這些方法的鋪墊,我們就可以創建具體的請求了。

創建topic

/**
   * 創建topic
   * 由于只是樣例代碼,有些東西就硬編碼寫到程序里面了(比如主機名和端口),各位看官自行修改即可
   * @param topicName       topic名
   * @param partitions      分區數
   * @param replicationFactor   副本數
   * @throws IOException
   */
  public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
    Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
    // 插入多個元素便可同時創建多個topic
    topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
    int creationTimeoutMs = 60000;
    CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
    CreateTopicsResponse.parse(response, request.version());
  }

查看位移

/**
   * 獲取某個consumer group下的某個topic分區的位移
   * @param groupID      group id
   * @param topic       topic名
   * @param parititon     分區號
   * @throws IOException
   */
  public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
        .setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
    System.out.println(partitionData.offset);
  }
/**
   * 獲取某個consumer group下所有topic分區的位移信息
   * @param groupID      group id
   * @return         (topic分區 --> 分區信息)的map
   * @throws IOException
   */
  public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
  }

okay, 上面就是“創建topic”和“查看位移”的樣例代碼,各位看官可以參考著這兩個例子構建其他類型的請求。

看完上述內容,你們對Java中的Kafka 怎么利用API進行調用有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

景泰县| 阿图什市| 赣州市| 紫金县| 崇明县| 凉城县| 巴马| 黄冈市| 林口县| 泸溪县| 义乌市| 宜州市| 新绛县| 土默特右旗| 连城县| 开平市| 泰来县| 柘城县| 鄂州市| 蒲城县| 阿克陶县| 巴林右旗| 建德市| 张家港市| 武城县| 固镇县| 枣阳市| 民县| 明水县| 弥渡县| 佛山市| 阳原县| 都匀市| 鸡西市| 聂荣县| 穆棱市| 金阳县| 专栏| 太仓市| 潼关县| 万安县|