位置:首页 > Kafka的使用 >

Kafka的使用

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

以下是本文正文:


1.   kafka的使用

1.1.  kafka-topics.sh API

--alter

Alter the configuration for the topic.

--config <name=value>

A topic configuration override for the   topic being created or altered.The    following is a list of valid

--create

Create a new topic.

--delete

Delete a topic

--delete-config <name>

A topic configuration override to be

removed for an existing topic (seethe   list of configurations under the--config option).

--describe

 List details for the given topics.

--help

Print usage information.

--list

List all available topics.

--partitions   <Integer: # of partitions>

The number of partitions for the topic

    being created or altered (WARNING:   

    If partitions are increased for a    

    topic that has a key, the partition

    logic or ordering of the messages    

    will be affected                     

--zookeeper   <urls>

REQUIRED: The connection string for  

    the zookeeper connection in the form

    host:port. Multiple URLS can be      

    given to allow fail-over.            

 

1.1.1.创建topic

bin/kafka-topics.sh --create --zookeeper mini1:2181,mini2:2181,mini3:2181 --replication-factor 3 --partitions 3 --topic test

 

replication-factor不能大于broker数,否则会报出如下错误:

image.png

Error while executing topic command   replication factor: 4 larger than available brokers: 3

kafka.admin.AdminOperationException:   replication factor: 4 larger than available brokers: 3

          at   kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)

          at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)

          at   kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)

          at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)

          at kafka.admin.TopicCommand.main(TopicCommand.scala)

 

 

1.1.2.删除topic

bin/kafka-topics.sh --delete --zookeeper mini1:2181,mini2:2181,mini3:2181 --topic test

Topic test is marked for deletion.

Note: This will have no impact if   delete.topic.enable is not set to true.

默认只会把这个主题标记为删除状态,这个时候,这个主题还是可以继续使用。

如果想要彻底删除,需要修改参数:

delete.topic.enable=true

 

1.1.3.列出所有topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

1.1.4.查看指定topic的详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test      PartitionCount:3        ReplicationFactor:3     Configs:

Topic: test     Partition: 0    Leader: 111     Replicas: 111,112,113   Isr: 111,112,113

Topic: test     Partition: 1    Leader: 112     Replicas: 112,113,111   Isr: 112,113,111

Topic: test     Partition: 2    Leader: 113     Replicas: 113,111,112   Isr: 113,111,112

#Leader:表示当前Partition的主Partition在哪个brokerid

#Replicas:表示当前分区的存在哪些brokerid(这里面的值不会变)

#Isr:表示当前分区存活在哪些brokerid

 

注意:当创建主题的时候会指定创建几个partition,并且还会指定这个partition存在于哪个broker上面

也就是Partitionbroker之间会有一个对应关系,这个对应关系一经创建,不会发生变化,

假设Partition-0 的副本因子为2,分别存在于broker111 broker112

假设后期broker111挂了,Partition-0就只在broker112中存在,就算集群中还有其他broker节点

                     这个Partition-0也不会再被分配到其他broker上面

如果想恢复之前确实的Partition-0,只有重启broker111

 

1.1.5.查看指定消费者组的消费情况

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper mini1:2181  --topic topic1 -group group1

 

1.1.6.topic分区平衡

手工执行

bin/kafka-preferred-replica-election.sh --zookeeper mini1:2181

 

自动均衡策略(默认已经有):

auto.leader.rebalance.enable=true

leader.imbalance.check.interval.seconds 默认值:300

 

 

 

1.1.7.增加主题topic

bin/kafka-topics.sh --alter --zookeeper mini1:2181 --partitions 4 --topic topic1

 

1.1.8.kafka生产者

bin/kafka-console-producer.sh --broker-list mini1:9092,mini2:9092,mini3:9092 --topic test

 

1.1.9.kafka消费者

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

 

1.2.  性能评估

1.2.1.生产性能

kafka的安装目录的bin里有性能的评估工具bin/kafka-producer-perf-test.sh

测试:

bin/kafka-producer-perf-test.sh --broker-list 192.168.1.171:9092,192.168.1.172:9092,192.168.1.173:9092 --topics hello --messages 1000000 --threads 3

 

主要输出4项指标:

总共发送消息量(以MB为单位),每秒发送消息量(MB/second),发送消息总数,每秒发送消息数(records/second

start.time, end.time, compression,   message.size, batch.size, total.data.sent.in.MB, MB.sec,   total.data.sent.in.nMsg, nMsg.sec

2017-05-17 09:18:31:760, 2017-05-17   09:20:59:429, 0, 100, 200, 95.37, 0.6458, 999999, 6771.8953

start.time起始时间

end.time结束时间

compression压缩

message.size消息大小

batch.size每次发现消息条数

total.data.sent.in.MB总共发送多大的数据(MB)

MB.sec每秒多少MB数据

total.data.sent.in.nMsg, nMsg.sec

 

1.2.2.消费性能

bin/kafka-consumer-perf-test.sh --zookeeper mini1:2181,mini2:2181,mini3:2181 --topic test --messages 1000000 --threads 3

start.time, end.time, fetch.size,   data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec

2017-05-17 09:48:40:374, 2017-05-17   09:48:52:157, 1048576, 95.3673, 14.0598, 999999, 147427.2446

 

1.3.  kafka java API

1.3.1.pom依赖

<dependency>

           <groupId>org.apache.kafka</groupId>

           <artifactId>kafka_2.11</artifactId>

           <version>0.8.2.2</version>

</dependency>

 

1.3.2.生产者

Properties originalProps = new Properties();
originalProps.put(
"metadata.broker.list", "mini1:9092");
originalProps.put(
"producer.type", "sync");
originalProps.put(
"compression.codec", "none");
originalProps.put(
"serializer.class", "kafka.serializer.StringEncoder");
 
// 1.获取可以操作的对象
 
Producer<String, String> producer = new Producer<String, String>(
     
new ProducerConfig(originalProps));
     
// 2.发送数据
 
for (int i = 11; i < 20; i++) {
   KeyedMessage<String, String> message =
new KeyedMessage<String, String>(
        
"test", i + "");
   producer.send(message);
}
 
// 3.关闭
 
producer.close();
 

 

 

1.3.3.消费者

Properties originalProps = new Properties();
originalProps.put(
"metadata.broker.list", "mini1:9092,mini2:9092,mini3:9092");
originalProps.put(
"producer.type", "sync");
originalProps.put(
"compression.codec", "none");
originalProps.put(
"serializer.class", "kafka.serializer.StringEncoder");
 
// 1.获取可以操作的对象
 
Producer<String, String> producer = new Producer<String, String>(
     
new ProducerConfig(originalProps));
     
// 2.发送数据
 
for (int i = 50; i < 60; i++) {
   KeyedMessage<String, String> message =
new KeyedMessage<String, String>(
        
"topic1", i + "", i + "");
   producer.send(message);
}
 
// 3.关闭
 
producer.close();
 

 

 

 

 

 



了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

分享到: