目录
频道首页
📚 消息队列之kafka
收藏
0
xy20118 最近修改于 2024-07-18 17:42:49

前言

ZooKeeper集群介绍 image

集群角色 image

  • Leader(领导者)处理“读写”操作。事务请求“写操作”唯一调度者和处理者,保证集群事务处理顺序性。
  • Follower(跟随者)处理“读”操作,接到写请求转发给Leader节点处理。参与集群Leader选举
  • Observer(观察者) 非事务请求可以独立处理“读”操作,对于事务性请求会转发给leader处理。不参与提交和选举投票

第一次启动选举机制

(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;

(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2为主。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING

(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;

(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;

(5)服务器5启动,同4一样当小弟。

非第一次启动选举机制

- - 当ZooKeeper 集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
        - 服务器初始化启动
        - 服务器运行期间无法和Leader保持连接

而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
- 集群中本来就已经存在一个Leader。
    - 对于已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和 Leader机器建立连接,并进行状态同步即可。
- 集群中确实不存在Leader(假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。)
- 选举Leader规则
    - EPOCH大的直接胜出
    - EPOCH相同,事务id大的胜出
    - 事务id相同,服务器id大的胜出
SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑速度有关。
Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加

特性: 整个集群中只要有超过集群数量一半的 zookeeper 工作只正常的,那么整个集群对外就是可用的,假如有 2 台服务器做了一个 zookeeper集群,只要有任何一台故障或宕机,那么这个 zookeeper 集群就不可用了,因为剩下的一台没有超过集群一半的数量,但是假如有三台 zookeeper 组成一个集群,那么损坏一台就还剩两台,大于 3 台的一半,所以损坏一台还是可以正常运行的,但是再损坏一台就只剩一台集群就不可用了。那么要是 4 台组成一个 zookeeper集群,损坏一台集群肯定是正常的,那么损坏两台就还剩两台,那么 2 台不大于集群数量的一半,所以 3 台的 zookeeper 集群和 4 台的 zookeeper 集群损坏两台的结果都是集群不可用,以此类推 5 台和 6 台以及 7 台和 8 台都是同理,所以这也就是为什么集群一般都是奇数的原因

部署zookeeper集群

最低需要三台服务器

64.16   1
64.11   2 
64.17   3

全局操作

#首先安装java
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version

systemctl stop firewalld 
setenforce 0
#上传安装包
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /usr/local/ 
[root@xyy1 opt]# cd /usr/local/
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper 
cd /usr/local/zookeeper/conf/

cp zoo_sample.cfg zoo.cfg
#修改配置文件
vim zoo.cfg
tickTime=2000   #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=10    #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
syncLimit=5     #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/usr/local/zookeeper/data      ●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/usr/local/zookeeper/log   ●添加,指定存放日志的目录,目录需要单独创建
clientPort=2181   #客户端连:接端口
#添加集群信息
server.1=192.168.64.17:3188:3288
server.2=192.168.64.16:3188:3288
server.3=192.168.64.11:3188:3288

---------------------------------------------------------------------

#A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
server.A=B:C:D 
#B是这个服务器的地址。

#C是这个服务器Follower与集群中的Leader服务器交换信息的端口。

#D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

image

配置好的文件 scp传输到另两台服务器

[root@xyy1 zookeeper]# mkdir log  data 
scp -r /usr/local/zookeeper   192.168.64.11:/usr/local/
scp -r /usr/local/zookeeper   192.168.64.171:/usr/local/
#根据序号按照对应服务器执行
64.17  echo 1 > /usr/local/zookeeper/data/myid
64.16  echo 2 > /usr/local/zookeeper/data/myid
64.11  echo 3 > /usr/local/zookeeper/data/myid

#启动 建议相对路径进行启动
[root@xyy1 zookeeper]# cd bin/
[root@xyy1 bin]#  ./zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@xyy7 conf]# ss -natp |grep 2181
LISTEN     0      50          :::2181                    :::*                   users:(("java",pid=76095,fd=46))
ESTAB      0      0         ::ffff:127.0.0.1:2181                  ::ffff:127.0.0.1:39700               users:(("java",pid=76095,fd=66))
#查看到对应的端口号 既是启动成功 

kafka

Kafka 被称为下一代分布式消息系统,由 scala 和 Java 编写,是非营利性组织ASF(Apache Software Foundation,简称为 ASF)基金会中的一个开源项目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat 等开源软件都属于 Apache 基金会的开源软件,类似的消息系统还有 RbbitMQ、ActiveMQ、ZeroMQ

官网的解释是:Apache Kafka 是一个开源的分布式流处理平台 kafka最初的应用场景: 1、运营活动场景:记录用户的浏览、搜索、点击、活跃度等行为。 2、系统运维场景:监控服务器的 CPU、内存、请求耗时等性能指标

Kafka 从 0.8 版本开始,就已经在提供一些和数据处理有关的组件 1、Kafka Streams:一个轻量化的流计算库,性质类似于 Spark、Flink 2、Kafka Connect:一个数据同步工具,能将 Kafka 中的数据导入到关系数据库、Hadoop、搜索引擎中。

现在的kafka: 可以理解为一个存储系统 因为需要处理海量的数据 所以对数据进行分片存储, kafka采用了水平拆分方案 拆分后的数据子集叫做 Partition(分区),各个分区的数据合集即全量数据。

其中分区路由可以简单理解成一个 Hash 函数,生产者在发送消息时,完全可以自定义这个函数来决定分区规则。如果分区规则设定合理,所有消息将均匀地分配到不同的分区中。 通过这样两层关系,最终在 Topic 之下,就有了一个新的划分单位:Partition。先通过 Topic 对消息进行逻辑分类,然后通过 Partition 进一步做物理分片,最终多个 Partition 又会均匀地分布在集群中的每台机器上,从而很好地解决了存储的扩展性问题。

名词解释:

  • Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。
  • Topic :每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 topic,(物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 topic 即可生产或消费数据而不必关心数据存于何处),topic 在逻辑上对 record(记录、日志)进行分组保存,消费者需要订阅相应的 topic 才能消费 topic 中的消息。
  • Partition :是物理上的概念,每个 topic 包含一个或多个 partition,创建 topic 时可指定 parition 数量,每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件,为了实现实现数据的高可用,比如将分区 0的数据分散到不同的kafka节点,每一个分区都有一个broker作为leader 和一个 broker作为Follower。
  • Producer:负责发布消息到 Kafka broker
  • Consumer:消费消息,每个 consumer 属于一个特定的 consuer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的 group),使用consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息。
1. To **publish** (write) and **subscribe to** (read) streams of events, including continuous import/export of your data from other systems. (数据的发布和订阅能力(消息队列))
2. To **store** streams of events durably and reliably for as long as you w (数据的分布式存储能力(存储系统))
3. To **process** streams of events as they occur or retrospectively. (数据的实时处理能力(流处理引擎))

kafka集群

image 1、Producer:生产者,负责创建消息,然后投递到 Kafka 集群中,投递时需要指定消息所属的 Topic,同时确定好发往哪个 Partition。 2、Consumer:消费者,会根据它所订阅的 Topic 以及所属的消费组,决定从哪些 Partition 中拉取消息。 3、Broker:消息服务器,可水平扩展,负责分区管理、消息的持久化、故障自动转移等。 4、Zookeeper:负责集群的元数据管理等功能,比如集群中有哪些 broker 节点以及 Topic,每个 Topic 又有哪些 Partition 等。

部署kafka

同样的三台机器 64.16 64.11 64.17

[root@xyy1 opt]# tar xf kafka_2.13-2.7.1.tgz   -C /usr/local/
[root@xyy1 opt]# cd  /usr/local
[root@xyy1 local]# ln -s kafka_2.13-2.7.1/  kafka
[root@xyy1 local]# vim server.properties

image

#解释
broker.id=0
#21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2


listeners=PLAINTEXT://192.168.64.16:9092
#31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改


num.network.threads=3    #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改   不用修改
num.io.threads=8         #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数           不用修改
socket.send.buffer.bytes=102400       #48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400    #51行,接收套接字的缓冲区大小
socket.request.max.bytes=104857600    #54行,请求套接字的缓冲区大小



log.dirs=/usr/local/kafka/logs        
#60行,kafka运行日志存放的路径,也是数据存放的路径




zookeeper.connect=192.168.64.16:2181,192.168.64.11:2181,192.168.64.17:2181

文件传输至额外两台机器

#注意文件路径
[root@xyy1 local]# scp -r kafka/ 192.168.64.17:/usr/local/ 
scp -r kafka/ 192.168.64.11:/usr/local/ 

###########
注意修改
broker.id=2
broker.id=3

监听地址:
listeners=PLAINTEXT://IP:9092

启动以及验证kafka

[root@xyy1 config]# cd  /usr/local/kafka/bin/
[root@xyy1 bin]# ./kafka-server-start.sh  -daemon /usr/local/kafka/config/server.properties
[root@xyy1 bin]# ss -natp |grep 9092

image 创建topic

[root@xyy1 bin]# ./kafka-topics.sh --create --zookeeper 192.168.64.16:2181,192.168.64.11:2181,192.168.64.17:2181 --replication-factor 2 --partitions 3 --topic test
Created topic test.
#查看当前服务器的topic
[root@xyy1 bin]# ./kafka-topics.sh --list --zookeeper 192.168.64.16:2181,192.168.64.11:2181,192.168.64.17:2181
test
#查看topic详情
[root@xyy1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.64.16:2181,192.168.64.11:2181,192.168.64.17:2181

image image

发布消息

./kafka-console-producer.sh  --broker-list 192.168.64.16:9092,192.168.64.11:9092,192.168.64.17:9092 --topic  test 

image

查看消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.64.16:9092,192.168.64.11:9092,192.168.64.17:9092 --topic  test --from-beginning

image

删除topic

[root@xyy1 bin]# ./kafka-topics.sh --delete --zookeeper  192.168.64.16:2181,192.168.64.11:2181,192.168.64.17:2181    --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
内容大纲
批注笔记
📚 消息队列之kafka
ArticleBot
z
z
z
z
主页
会议室
Git管理
文章
云文档
看板