您好,登錄后才能下訂單哦!
[TOC]
前面的《Kafka筆記整理(一)》中有提到消費者的消費形式,說明如下:
1、每個consumer屬于一個consumer group,可以指定組id。group.id
2、消費形式:
組內:組內的消費者消費同一份數據;同時只能有一個consumer消費一個Topic中的1個partition;
一個consumer可以消費多個partitions中的消息。所以,對于一個topic,同一個group中推薦不能有多于
partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息。
組間:每個消費組消費相同的數據,互不影響。
3、在一個consumer多個線程的情況下,一個線程相當于一個消費者。
例如:partition為3,一個consumer起了3個線程消費,另一個后來的consumer就無法消費。
下面就來驗證Kafka的消費形式,不過需要說明的是,在消費者的程序代碼中,可以指定消費者的group.id(我們下面將會在配置文件中指定)。
而在使用kafka的shell命令時,其實也是可以指定配置文件來指定消費者的group.id的,如果不指定,那么kafka將會隨機生成一個group.id(kafka-console-consumer.sh中的kafka.tools.ConsoleConsumer類,如果沒有指定group.id,其策略是隨機生成)。
在后面的程序代碼中,會使用同一group.id開啟4個消費的線程(因為我們創建的topic有3個partition),然后在終端中通過kafka shell來開啟另外一個消費者,進而達到驗證kafka消費形式的目的。
另外,在測試中使用的topic如下:
$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
Topic:hadoop PartitionCount:3 ReplicationFactor:3 Configs:
Topic: hadoop Partition: 0 Leader: 103 Replicas: 103,101,102 Isr: 103,101,102
Topic: hadoop Partition: 1 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
Topic: hadoop Partition: 2 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101
即partition為3,副本因為也為3.
package com.uplooking.bigdata.kafka.producer;
import com.uplooking.bigdata.kafka.constants.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
/**
* 通過這個KafkaProducerOps向Kafka topic中生產相關的數據
* <p>
* Producer
*/
public class KafkaProducerOps {
public static void main(String[] args) throws IOException {
/**
* 專門加載配置文件
* 配置文件的格式:
* key=value
*
* 在代碼中要盡量減少硬編碼
* 不要將代碼寫死,要可配置化
*/
Properties properties = new Properties();
InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
properties.load(in);
/**
* 兩個泛型參數
* 第一個泛型參數:指的就是kafka中一條記錄key的類型
* 第二個泛型參數:指的就是kafka中一條記錄value的類型
*/
String[] girls = new String[]{"姚慧瑩", "劉向前", "周 新", "楊柳"};
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
Random random = new Random();
int start = 1;
for (int i = start; i <= start + 20; i++) {
String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
String key = i + "";
String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~";
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(topic, key, value);
producer.send(producerRecord);
}
producer.close();
}
}
package com.uplooking.bigdata.kafka.consumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.*;
/**
* 從kafka topic中消費數據
*/
public class KafkaConsumerOps {
public static void main(String[] args) throws IOException {
//線程池
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("外部開始時間:" + System.currentTimeMillis());
for (int i =0; i < 4; i++){
ScheduledFuture<?> schedule = service.schedule(
new ConsumerThread(),
5L,
TimeUnit.SECONDS);
}
}
}
class ConsumerThread implements Runnable {
public void run() {
System.out.println("線程ID:" + Thread.currentThread().getId() + "線程開始時間:" + System.currentTimeMillis());
/**
* 兩個泛型參數
* 第一個泛型參數:指的就是kafka中一條記錄key的類型
* 第二個泛型參數:指的就是kafka中一條記錄value的類型
*/
Properties properties = new Properties();
try {
properties.load(KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
Collection<String> topics = Arrays.asList("hadoop");
//消費者訂閱topic
consumer.subscribe(topics);
ConsumerRecords<String, String> consumerRecords = null;
while (true) {
//接下來就要從topic中拉取數據
consumerRecords = consumer.poll(1000);
//遍歷每一條記錄
for (ConsumerRecord consumerRecord : consumerRecords) {
long offset = consumerRecord.offset();
Object key = consumerRecord.key();
Object value = consumerRecord.value();
int partition = consumerRecord.partition();
System.out.println("CurrentThreadID: " + Thread.currentThread().getId() + "\toffset: " + offset + "\tpartition: " + partition + "\tkey: " + key + "\tvalue: " + value);
}
}
}
}
package com.uplooking.bigdata.kafka.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Random;
/**
* 創建自定義的分區,根據數據的key來進行劃分
* <p>
* 可以根據key或者value的hashCode
* 還可以根據自己業務上的定義將數據分散在不同的分區中
* 需求:
* 根據用戶輸入的key的hashCode值和partition個數求模
*/
public class MyKafkaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {
}
/**
* 根據給定的數據設置相關的分區
*
* @param topic 主題名稱
* @param key key
* @param keyBytes 序列化之后的key
* @param value value
* @param valueBytes 序列化之后的value
* @param cluster 當前集群的元數據信息
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer partitionNums = cluster.partitionCountForTopic(topic);
int targetPartition = -1;
if (key == null || keyBytes == null) {
targetPartition = new Random().nextInt(10000) % partitionNums;
} else {
int hashCode = key.hashCode();
targetPartition = hashCode % partitionNums;
System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
}
return targetPartition;
}
public void close() {
}
}
package com.uplooking.bigdata.kafka.constants;
public interface Constants {
/**
* 生產的key對應的常量
*/
String KAFKA_PRODUCER_TOPIC = "producer.topic";
}
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
#####設置自定義的topic
producer.topic=hadoop
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181
bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
主要是kafka-clients的依賴:
<dependencies>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
</dependencies>
先在終端啟動一個消費者,注意由于沒有指定配置文件,所以其group.id是隨機生成的:
$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
接下來分別執行消費者的代碼和生產者的代碼,然后觀察各個終端的輸出。
生產者程序的終端輸出如下:
key: 1, value: 今天的<--劉向前-->很美很美哦~, hashCode: 49, partition: 1
key: 2, value: 今天的<--劉向前-->很美很美哦~, hashCode: 50, partition: 2
key: 3, value: 今天的<--劉向前-->很美很美哦~, hashCode: 51, partition: 0
key: 4, value: 今天的<--楊柳-->很美很美哦~, hashCode: 52, partition: 1
key: 5, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 53, partition: 2
key: 6, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 54, partition: 0
key: 7, value: 今天的<--楊柳-->很美很美哦~, hashCode: 55, partition: 1
key: 8, value: 今天的<--劉向前-->很美很美哦~, hashCode: 56, partition: 2
key: 9, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 57, partition: 0
key: 10, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1567, partition: 1
key: 11, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1568, partition: 2
key: 12, value: 今天的<--周 新-->很美很美哦~, hashCode: 1569, partition: 0
key: 13, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1570, partition: 1
key: 14, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1571, partition: 2
key: 15, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1572, partition: 0
key: 16, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1573, partition: 1
key: 17, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1574, partition: 2
key: 18, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1575, partition: 0
key: 19, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1576, partition: 1
key: 20, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1598, partition: 2
key: 21, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1599, partition: 0
消費者程序的終端輸出如下:
外部開始時間:1521991118178
線程ID:20線程開始時間:1521991123182
線程ID:21線程開始時間:1521991123182
線程ID:23線程開始時間:1521991123182
線程ID:22線程開始時間:1521991123182
CurrentThreadID: 22 offset: 78 partition: 1 key: 1 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 22 offset: 79 partition: 1 key: 4 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 22 offset: 80 partition: 1 key: 7 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 22 offset: 81 partition: 1 key: 10 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 22 offset: 82 partition: 1 key: 13 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 23 offset: 81 partition: 0 key: 3 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 23 offset: 82 partition: 0 key: 6 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 23 offset: 83 partition: 0 key: 9 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 23 offset: 84 partition: 0 key: 12 value: 今天的<--周 新-->很美很美哦~
CurrentThreadID: 23 offset: 85 partition: 0 key: 15 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 23 offset: 86 partition: 0 key: 18 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 22 offset: 83 partition: 1 key: 16 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 23 offset: 87 partition: 0 key: 21 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 21 offset: 78 partition: 2 key: 2 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 22 offset: 84 partition: 1 key: 19 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 21 offset: 79 partition: 2 key: 5 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 21 offset: 80 partition: 2 key: 8 value: 今天的<--劉向前-->很美很美哦~
CurrentThreadID: 21 offset: 81 partition: 2 key: 11 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 21 offset: 82 partition: 2 key: 14 value: 今天的<--姚慧瑩-->很美很美哦~
CurrentThreadID: 21 offset: 83 partition: 2 key: 17 value: 今天的<--楊柳-->很美很美哦~
CurrentThreadID: 21 offset: 84 partition: 2 key: 20 value: 今天的<--姚慧瑩-->很美很美哦~
消費者shell的終端輸出如下:
$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
今天的<--劉向前-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--周 新-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--楊柳-->很美很美哦~
今天的<--姚慧瑩-->很美很美哦~
今天的<--劉向前-->很美很美哦~
今天的<--楊柳-->很美很美哦~
因為使用kafka shell的消費者的group.id是隨機生成的,所以其肯定可以消費到topic下partition的消息,這是屬于組間的消費。
而由于在消費者的程序代碼中,4個線程都是使用同一個group.id的(都是使用consumer.properties這個配置文件),按照理論知識的理解,因為topic hadoop只有3個partition,所以只能有3個線程即3個consumer進行消息的消費,而觀察輸出,通過線程ID,發現確實只有三個線程消費了topic中的消息,這也驗證了kafka組內消息的消費形式。
參考文檔:https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing
在kafka的安裝目錄的bin里有性能的評估工具bin/kafka-producer-perf-test.sh
,主要輸出4項指標,總共發送消息量(以MB為單位),每秒發送消息量(MB/second),發送消息總數,每秒發送消息數(records/second)。
測試如下:
[uplooking@uplooking01 ~]$ kafka-producer-perf-test.sh --topic flume-kafka --num-records 1000000 --producer-props bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092 --throughput 10000 --record-size 100
49972 records sent, 9994.4 records/sec (0.95 MB/sec), 3.1 ms avg latency, 258.0 max latency.
50200 records sent, 10040.0 records/sec (0.96 MB/sec), 2.4 ms avg latency, 141.0 max latency.
50020 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.
50010 records sent, 10000.0 records/sec (0.95 MB/sec), 2.3 ms avg latency, 127.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 24.0 max latency.
50020 records sent, 10004.0 records/sec (0.95 MB/sec), 2.4 ms avg latency, 186.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 15.1 ms avg latency, 466.0 max latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 11.1 ms avg latency, 405.0 max latency.
50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.
50030 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 20.0 max latency.
50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 30.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.
49990 records sent, 9998.0 records/sec (0.95 MB/sec), 1.4 ms avg latency, 49.0 max latency.
50033 records sent, 10006.6 records/sec (0.95 MB/sec), 37.9 ms avg latency, 617.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.5 ms avg latency, 74.0 max latency.
50007 records sent, 10001.4 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.
50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.8 ms avg latency, 132.0 max latency.
50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 15.0 max latency.
50020 records sent, 10000.0 records/sec (0.95 MB/sec), 1.9 ms avg latency, 121.0 max latency.
1000000 records sent, 9999.200064 records/sec (0.95 MB/sec), 4.96 ms avg latency, 617.00 ms max latency, 1 ms 50th, 3 ms 95th, 105 ms 99th, 541 ms 99.9th.
參數說明如下:
--num-records 1000000 總共生產的消息數量
--throughput 10000 每秒需要生產的消息數量
--record-size 100 每條消息的大小,單位為字節
[uplooking@uplooking01 ~]$ kafka-consumer-perf-test.sh --topic flume-kafka --messages 1000000 --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092 --threads 3 --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760
上面的測試為需要消費一百萬條消息,輸出的參數說明如下:
開始時間 結束時間 消費消息總大小 每秒消費大小 消費消息總條數 每秒消費條數
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。