位置:首页 > 【小牛学堂原创】kafka入门教程 二 >

【小牛学堂原创】kafka入门教程 二

作者:小牛君|发布时间:2017-06-07

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

以下是本文正文:


2.  安装kafka集群

2.1.  上传kafka安装包并解压

将安装包解压到指定目录

tar -zxvf kafka_2.11-0.8.2.2.tgz -C $app/

 

2.2.  修改配置文件

先进入kafka安装目录

cd /usr/local/app/kafka_2.11-0.8.2.2/

修改配置文件

vi config/server.properties

 

修改broker.id(唯一的):

broker.id=0

修改kafka数据存放目录:

log.dirs=/usr/local/app/kafka_2.11-0.8.2.2/data

修改zk地址:

zookeeper.connect=mini1:2181,mini2:2181,mini3:2181

 

将安装包分发到其他机器上

scp kafka_2.11-0.8.2.2/ mini2:`pwd`

scp kafka_2.11-0.8.2.2/ mini3:`pwd`

修改两台机器的broker.id分别为1和2(每台机器的broker.id必须不一样)

 

2.3.  启动kafka

三台机器都运行

bin/kafka-server-start.sh -daemon config/server.properties

 

由于我把三台机器的broker.id分别设置为111,112,113所以在zookeeper生成了如下节点

2.4.  kafka-topics.sh API

--alter

Alter the configuration for the topic.

--config <name=value>

A topic configuration override for the topic being created or altered.The  following is a list of valid   configurations: 

        unclean.leader.election.enable

        delete.retention.ms           

        segment.jitter.ms             

        retention.ms                  

        flush.ms                      

        segment.bytes                 

        flush.messages                

        segment.ms                    

        retention.bytes               

        cleanup.policy                

        segment.index.bytes           

        min.cleanable.dirty.ratio     

        max.message.bytes             

        file.delete.delay.ms           

        min.insync.replicas           

        index.interval.bytes          

--create

Create a new topic.

--delete

Delete a topic

--delete-config <name>

A topic configuration override to be

removed for an existing topic (seethe list of configurations under the--config option).

--describe

 List details for the given topics.

--help

Print usage information.

--list

 

 

2.4.1.   创建topic

bin/kafka-topics.sh --create --zookeeper mini1:2181,mini2:2181,min3:2181 --replication-factor 3 --partitions 3 --topic test

 

2.4.2.   删除topic

bin/kafka-topics.sh --delete --zookeeper mini1:2181,mini2:2181,min3:2181 --topic test

Topic test is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

 

 

2.4.3.   列出所有topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

2.4.4.   查看指定topic的详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test      PartitionCount:3        ReplicationFactor:3     Configs:

Topic: test     Partition: 0    Leader: 111     Replicas: 111,112,113   Isr: 111,112,113

Topic: test     Partition: 1    Leader: 112     Replicas: 112,113,111   Isr: 112,113,111

Topic: test     Partition: 2    Leader: 113     Replicas: 113,111,112   Isr: 113,111,112

#Leader:表示当前Partition的主Partition在哪个brokerid上

#Replicas:表示当前分区的存在哪些brokerid上(这里面的值不会变)

#Isr:表示当前分区存活在哪些brokerid上

 

注意:当创建主题的时候会指定创建几个partition,并且还会指定这个partition存在于哪个broker上面

也就是Partition和broker之间会有一个对应关系,这个对应关系一经创建,不会发生变化,

假设Partition-0 的副本因子为2,分别存在于broker111 和broker112

假设后期broker111挂了,Partition-0就只在broker112中存在,就算集群中还有其他broker节点

                     这个Partition-0也不会再被分配到其他broker上面

如果想恢复之前确实的Partition-0,只有重启broker111。

 

2.4.5.   kafka生产者

bin/kafka-console-producer.sh --broker-list mini1:9092,mini2:9092,mini3:9092 --topic test

 

2.4.6.   kafka消费者

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

 


了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:Spark大数据交流学习群613807316

分享到: