Kafka消息队列的搭建与基础使用
一、Kafka消息队列
1、为什么需要消息队列?
-
解耦
-
冗余
-
扩展性
-
灵活性 & 峰值处理能力
-
可恢复性
-
顺序保证
-
缓冲
-
异步通信
2、消息队列的模式
1、点对点模式
-
一对一,消费者主动拉取数据,消息收到后消息清除
2、发布/订阅模式
-
一对多,数据产生后,推送给所有订阅者
3、什么是Kafka
在流式计算中Kafka一般用力缓存数据,Storm通过消防Kafka的数据进行计算。
(1)Apache Kafka是一个开源消息系统,又Scala写成,是由Apache软件基金会开发的一个开源消息系统项目
(2)Kafka最初是由 LinkedIn 公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
(3)Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接收者为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)称为broker。
(4)无论是Kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
二、Kafka集群部署与启动
1、环境准备
1、集群规划
master | slave1 | slave2 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
2、Jar包下载
3、上传Jar包,并配置kafka bin目录的环境变量
4、修改配置文件
-
修改$KAFKA_HOME/conf中的server.properties
broker.id 每一台机器都要不同
添加一条
delete.topic.enable=true
修改数据存放的位置
log.dirs=/usr/local/src/kafka/logs
修改zk集群的连接地址和端口’
zookeeper.connect=master:2181,slave1:2181,slave2:2181
分发到slave1和slave2,并修改broker.id
-
在ZOOKEEPER_HOME/创建data文件夹
创建一个myid文件 设置id
-
将ZOOKEEPER_HOME/conf下的zoo_sample.cfg 改为zoo.cfg
分发到各个节点,并修改myid
5、启动zk集群和kafka集群
zk启动 三台机器
zkServer.sh start
kafka启动 三台机器
kafka-server-start.sh config/server.properties
三、操作Kafa集群
1、创建一个Topic
kafka-topics.sh --create --zookeeper master:2181 --partitions 2 --replication-factor 2 --topic first
--create 创建一个topic
--zookeeper 指定zookeeper的主机名和端口
--partitions 指定这个任务有几个分区
--replication-factor 指定这个任务有几个副本
--topic 指定这个任务的主题名字
2、查看Kafka集群中有多少个Topic
kafka-topics.sh --list --zookeeper master:2181
3、连接生产者控制台和消费消费者控制台
连接生产者,连接的是kafka集群
kafka-console-producer.sh --broker-list master:9092 --topic frist
连接消费者,连接的是zk集群
kafka-console-consumer.sh --bootstrap-server master:9092 --topic first --from-beginning
4、删除Topic
kafka-topics.sh --delete --zookeeper 主机名和端口 --topic 主题名字