Kafka消息队列的搭建与基础使用

一、Kafka消息队列

1、为什么需要消息队列?

  • 解耦

  • 冗余

  • 扩展性

  • 灵活性 & 峰值处理能力

  • 可恢复性

  • 顺序保证

  • 缓冲

  • 异步通信

2、消息队列的模式

1、点对点模式

  • 一对一,消费者主动拉取数据,消息收到后消息清除

2、发布/订阅模式

  • 一对多,数据产生后,推送给所有订阅者

3、什么是Kafka

在流式计算中Kafka一般用力缓存数据,Storm通过消防Kafka的数据进行计算。

(1)Apache Kafka是一个开源消息系统,又Scala写成,是由Apache软件基金会开发的一个开源消息系统项目

(2)Kafka最初是由 LinkedIn 公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

(3)Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接收者为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)称为broker。

(4)无论是Kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

二、Kafka集群部署与启动

1、环境准备

1、集群规划

master slave1 slave2
zk zk zk
kafka kafka kafka

2、Jar包下载

3、上传Jar包,并配置kafka bin目录的环境变量

4、修改配置文件

  • 修改$KAFKA_HOME/conf中的server.properties

    broker.id 每一台机器都要不同

    添加一条

    delete.topic.enable=true

    修改数据存放的位置

    log.dirs=/usr/local/src/kafka/logs

    修改zk集群的连接地址和端口’

    zookeeper.connect=master:2181,slave1:2181,slave2:2181

分发到slave1和slave2,并修改broker.id

  • 在ZOOKEEPER_HOME/创建data文件夹

    创建一个myid文件 设置id

  • 将ZOOKEEPER_HOME/conf下的zoo_sample.cfg 改为zoo.cfg

    分发到各个节点,并修改myid

5、启动zk集群和kafka集群

zk启动 三台机器

zkServer.sh start

kafka启动 三台机器

kafka-server-start.sh config/server.properties

三、操作Kafa集群

1、创建一个Topic

kafka-topics.sh --create --zookeeper master:2181 --partitions 2 --replication-factor 2 --topic first

--create 创建一个topic

--zookeeper 指定zookeeper的主机名和端口

--partitions 指定这个任务有几个分区

--replication-factor 指定这个任务有几个副本

--topic 指定这个任务的主题名字

2、查看Kafka集群中有多少个Topic

kafka-topics.sh --list --zookeeper master:2181

3、连接生产者控制台和消费消费者控制台

连接生产者,连接的是kafka集群

kafka-console-producer.sh --broker-list master:9092 --topic frist

连接消费者,连接的是zk集群

kafka-console-consumer.sh --bootstrap-server master:9092 --topic first --from-beginning

4、删除Topic

kafka-topics.sh --delete --zookeeper 主机名和端口 --topic 主题名字


本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>