亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java使用Kafka的方法

發布時間:2021-07-05 17:46:34 來源:億速云 閱讀:517 作者:chen 欄目:大數據

本篇內容主要講解“Java使用Kafka的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java使用Kafka的方法”吧!

1、maven依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2、Producer

2.1、producer發送消息

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * @author Thomas
 * @Description:最簡單的kafka producer
 * @date 22:18 2019-7-5
 */
public class ProducerDemo {
    public static void main(String[] args) {
        Properties properties =new Properties();
        //zookeeper服務器集群地址,用逗號隔開
        properties.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //自定義producer攔截器
        properties.put("interceptor.classes", "com.lt.kafka.producer.MyProducerInterceptor");
        //自定義消息路由規則(消息發送到哪一個Partition中)
        //properties.put("partitioner.class", "com.lt.kafka.producer.MyPartition");

        Producer<string, string=""> producer = null;
        try {
            producer = new KafkaProducer<string, string="">(properties);
            for (int i = 20; i < 40; i++) {
                String msg = "This is Message:" + i;

                /**
                 * kafkaproducer中會同時調用自己的callback的onCompletion方法和producerIntercepter的onAcknowledgement方法。
                 * 關鍵源碼:Callback interceptCallback = this.interceptors == null
                 * callback : new InterceptorCallback<>(callback,
                 * this.interceptors, tp);
                 */
                producer.send(new ProducerRecord<string, string="">("leixiang", msg),new MyCallback());
            }
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            if(producer!=null)
                producer.close();
        }
    }
}

2.2、自定義producer攔截器

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
 * @author Thomas
 * @Description:自定義producer攔截器
 * @date 22:21 2019-7-5
 */
public class MyProducerInterceptor implements ProducerInterceptor<string,string>{
    /**
     * 打印配置相關信息
     */
    public void configure(Map<string,> configs) {
        // TODO Auto-generated method stub
        System.out.println(configs.toString());
    }

    /**
     * producer發送信息攔截方法
     */
    public ProducerRecord<string,string> onSend(ProducerRecord<string, string=""> record) {
        System.out.println("攔截處理前=============");
        String topic=record.topic();
        String value=record.value();
        System.out.println("攔截處理前的消息====:"+value);
        ProducerRecord<string,string> record2=new ProducerRecord<string, string="">(topic, value+" (intercepted)");
        System.out.println("攔截處理后的消息:"+record2.value());
        System.out.println("攔截處理后===============");
        return record2;
    }

    /**
     * 消息確認回調函數,和callback的onCompletion方法相似。
     * 在kafkaProducer中,如果都設置,兩者都會調用。
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (metadata != null)
            System.out.println("MyProducerInterceptor onAcknowledgement:RecordMetadata=" + metadata.toString());
        if (exception != null)
            exception.printStackTrace();
    }
    /**
     * interceptor關閉回調
     */
    public void close() {
        System.out.println("MyProducerInterceptor is closed!");
    }
}

2.3、自定義消息路由規則

自定義路由規則,可以根據自己的需要定義消息發送到哪個分區。自定義路由規則需要實現Partitioner。

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
/**
 * @author Thomas
 * @Description:
 * @date 22:24 2019-7-5
 */
public class MyPartition implements Partitioner {
    public void configure(Map<string,> arg0) {
        // TODO Auto-generated method stub
    }

    public void close() {
        // TODO Auto-generated method stub
    }

    public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) {
        // TODO Auto-generated method stub
        return 0;
    }
}

3、Consumer

3.1、自動提交

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
 * @author Thomas
 * @Description:
 * @date 22:26 2019-7-5
 */
public class AutoCommitConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
        props.put("group.id", "leixiang");
        props.put("enable.auto.commit", "true");
        //想要讀取之前的數據,必須加上
        //props.put("auto.offset.reset", "earliest");
        /* 自動確認offset的時間間隔 */
        props.put("auto.commit.interval.ms", "1000");
        /*
         * 一旦consumer和kakfa集群建立連接,
         * consumer會以心跳的方式來高速集群自己還活著,
         * 如果session.timeout.ms 內心跳未到達服務器,服務器認為心跳丟失,會做rebalence
         */
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //配置自定義的攔截器,可以在攔截器中引入第三方插件實現日志記錄等功能。
        //props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor");

        @SuppressWarnings("resource")
        KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props);
        try {
            /* 消費者訂閱的topic, 可同時訂閱多個 ,用逗號隔開*/
            consumer.subscribe(Arrays.asList("leixiang"));
            while (true) {
                //輪詢數據。如果緩沖區中沒有數據,輪詢等待的時間為毫秒。如果0,立即返回緩沖區中可用的任何記錄,則返回空
                ConsumerRecords<string, string=""> records = consumer.poll(100);
                for (ConsumerRecord<string, string=""> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
                        record.value());
            }
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
}

3.2、手動提交

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
 * @author Thomas
 * @Description:
 * @date 22:28 2019-7-5
 */
public class ManualCommitConsumerDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
        props.put("group.id", "leixiang");
        props.put("enable.auto.commit", "false");//手動確認
        /* 自動確認offset的時間間隔 */
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");//想要讀取之前的數據,必須加上
        /*
         * 一旦consumer和kakfa集群建立連接,
         * consumer會以心跳的方式來高速集群自己還活著,
         * 如果session.timeout.ms 內心跳未到達服務器,服務器認為心跳丟失,會做rebalence
         */
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //配置自定義的攔截器,可以在攔截器中引入第三方插件實現日志記錄等功能。
        props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor");

        KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props);
        /* 消費者訂閱的topic, 可同時訂閱多個 ,用逗號隔開*/
        consumer.subscribe(Arrays.asList("leixiang"));
        while (true) {
            ConsumerRecords<string, string=""> records = consumer.poll(100);
            for (ConsumerRecord<string, string=""> record : records) {
                //處理消息
                saveMessage(record);
                //手動提交,并且設置Offset提交回調方法
                //consumer.commitAsync(new MyOffsetCommitCallback());
                consumer.commitAsync();
            }
        }
    }

    public static void saveMessage(ConsumerRecord<string, string=""> record){
        System.out.printf("處理消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
                record.value());
    }
}

自定義Consumer攔截器

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
/**
 * @author Thomas
 * @Description:
 * @date 22:29 2019-7-5
 */
public class MyConsumerInterceptor implements ConsumerInterceptor<string, string="">{
public void configure(Map<string,> configs) {
        System.out.println("MyConsumerInterceptor configs>>>"+configs.toString());
        }

public ConsumerRecords<string, string=""> onConsume(ConsumerRecords<string, string=""> records) {
        System.out.println("onConsume");
        return records;
        }

public void onCommit(Map<topicpartition, offsetandmetadata=""> offsets) {
        System.out.println("onCommit");
        }

public void close() {
        System.out.println("MyConsumerInterceptor is closed!");
        }
}

自定義Offset提交回調方法

import java.util.Map;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
/**
 * @author Thomas
 * @Description:
 * @date 22:31 2019-7-5
 */
public class MyOffsetCommitCallback implements OffsetCommitCallback {

    public void onComplete(Map<topicpartition, offsetandmetadata=""> offsets, Exception exception) {
        if (offsets != null)
            System.out.println("offsets>>>" + offsets.toString());
        if (exception != null)
            exception.printStackTrace();
    }
}

到此,相信大家對“Java使用Kafka的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

台山市| 哈尔滨市| 牟定县| 增城市| 太和县| 镇原县| 长治市| 桃园县| 英超| 许昌县| 邮箱| 鱼台县| 海门市| 河源市| 丰台区| 沈阳市| 岳西县| 潜江市| 卓尼县| 呼伦贝尔市| 赤峰市| 原阳县| 韶山市| 阿合奇县| 襄垣县| 开原市| 安西县| 阿拉尔市| 铜鼓县| 丰镇市| 措勤县| 乌鲁木齐县| 怀柔区| 洪雅县| 老河口市| 尤溪县| 运城市| 黄平县| 平谷区| 神池县| 义乌市|