Kafka cluster construction and necessary knowledge and meeting

Kafka cluster deployment and startup

In this article, we will start by demonstrating how to build a Kafka cluster, and then briefly introduce some basic knowledge points about Kafka cluster. However, this paper only introduces the cluster, and does not explain the basic concept of Kafka too much. It is assumed that the reader has some basic knowledge of Kafka.

First, we need to understand some mechanisms of Kafka cluster:

  • Kafka is a natural cluster support, even a node is actually a cluster mode
  • Kafka cluster relies on Zookeeper for coordination, and in the early versions of Kafka, a lot of data is stored in Zookeeper
  • As long as Kafka nodes are registered on the same Zookeeper, they represent the same cluster
  • Kafka uses brokerId to distinguish different nodes in a cluster

The cluster topology of Kafka is as follows:

Several roles in the Kafka cluster:

  • Broker: generally refers to the deployment node of Kafka
  • Leader: used to process requests such as message receiving and consumption. That is to say, the producer push es messages to the leader, and the consumer also pols messages from the leader
  • Follower: mainly used to back up message data. A leader will have multiple followers
In this example, in order to be closer to the actual deployment, four virtual machines are used for demonstration:
Machine IP effect role brokerId
192.168.99.1 Deploy Kafka node broker server 0
192.168.99.2 Deploy Kafka node broker server 1
192.168.99.3 Deploy Kafka node broker server 2
192.168.99.4 Deploy Zookeeper node Cluster Coordinator

Zookeeper installation

Kafka is based on zookeeper to achieve distributed coordination, so it is necessary to build zookeeper node before building Kafka node. Zookeeper and Kafka depend on JDK. I have installed JDK in advance:

[root@192.168.99.4 ~]# java --version
java 11.0.5 2019-10-15 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.5+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.5+10-LTS, mixed mode)
[root@txy-server2 ~]#

After preparing the JDK environment, go to the download address of Zookeeper's official website, copy the download link:

Then use wget command in Linux to download, as follows:

[root@192.168.99.4 ~]# cd /usr/local/src
[root@192.168.99.4 /usr/local/src]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz

Extract and download the compressed package, and move and rename the extracted Directory:

[root@192.168.99.4 /usr/local/src]# tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
[root@192.168.99.4 /usr/local/src]# mv apache-zookeeper-3.6.1-bin ../zookeeper

Go to the configuration file directory of Zookeeper, and_ sample.cfg This sample configuration file is copied and named zoo.cfg , which is the default profile name of Zookeeper:

[root@192.168.99.4 /usr/local/src]# cd ../zookeeper/conf/
[root@192.168.99.4 /usr/local/zookeeper/conf]# ls
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@192.168.99.4 /usr/local/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg

Modify the dataDir configuration item in the configuration file to specify a directory with large disk space:

[root@192.168.99.4 /usr/local/zookeeper/conf]# vim zoo.cfg
# Specify the data storage directory of Zookeeper, similar to the dataDir of MySQL
dataDir=/data/zookeeper
[root@192.168.99.4 /usr/local/zookeeper/conf]# mkdir -p /data/zookeeper
  • If it's just for learning and using, this step can be ignored, and the default configuration can be used

Next, you can enter the bin directory and use the startup script to start Zookeeper, as shown in the following example:

[root@192.168.99.4 /usr/local/zookeeper/conf]# cd ../bin/
[root@192.168.99.4 /usr/local/zookeeper/bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@192.168.99.4 /usr/local/zookeeper/bin]#

After the startup is completed, you can judge whether the startup is successful by checking whether the port number is monitored normally. As follows, the startup is successful:

[root@192.168.99.4 ~]# netstat -lntp |grep 2181
tcp6       0      0 :::2181       :::*         LISTEN      7825/java
[root@192.168.99.4 ~]#

If your machine has a firewall enabled, you need to open the Zookeeper port. Otherwise, other nodes cannot register:

[root@192.168.99.4 ~]# firewall-cmd --zone=public --add-port=2181/tcp --permanent
[root@192.168.99.4 ~]# firwall-cmd --reload

Kafka installation

After Zookeeper is installed, Kafka can be installed. For the same routine, first go to Kafka's official website to download the address and copy the download link:

Then use wget command in Linux to download, as follows:

[root@192.168.99.1 ~]# cd /usr/local/src
[root@192.168.99.1 /usr/local/src]# wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz

Decompress and download the compressed package, and move and rename the decompressed Directory:

[root@192.168.99.1 /usr/local/src]# tar -xvf kafka_2.13-2.5.0.tgz
[root@192.168.99.1 /usr/local/src]# mv kafka_2.13-2.5.0 ../kafka

Enter Kafka's profile directory and modify the profile:

[root@192.168.99.1 /usr/local/src]# cd ../kafka/config/
[root@192.168.99.1 /usr/local/kafka/config]# vim server.properties
# Specify the brokerId of this node. The brokerId in the same cluster needs to be unique
broker.id=0
# Specify the listening address and port number. The configuration item is to specify the intranet ip
listeners=PLAINTEXT://192.168.99.1:9092
# If you need to open Internet access, specify the Internet ip in this configuration item
advertised.listeners=PLAINTEXT://192.168.99.1:9092
# Specify the storage directory of kafka log files
log.dirs=/usr/local/kafka/kafka-logs
# Specifies the connection address of zookeeper, separated by commas if there are multiple addresses
zookeeper.connect=192.168.99.4:2181
[root@192.168.99.1 /usr/local/kafka/config]# mkdir /usr/local/kafka/kafka-logs

After modifying the configuration file, to facilitate the use of Kafka's command script, we can configure the bin directory of Kafka to the environment variable:

[root@192.168.99.1 ~]# vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@192.168.99.1 ~]# source /etc/profile  # Let configuration take effect

In this way, Kafka can be started with the following command:

[root@192.168.99.1 ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &

After executing the above command, the startup log will be output to the console. You can judge whether the startup is successful through the log, or you can judge whether the startup is successful by checking whether port 9092 is monitored:

[root@192.168.99.1 ~]# netstat -lntp |grep 9092
tcp6    0     0 192.168.99.1:9092     :::*      LISTEN     31943/java
[root@192.168.99.1 ~]#

Similarly, if the firewall is enabled, the corresponding port number needs to be opened:

[root@192.168.99.1 ~]# firewall-cmd --zone=public --add-port=9092/tcp --permanent
[root@192.168.99.1 ~]# firwall-cmd --reload

So far, we have completed the installation of the first Kafka node. The installation steps of the other two nodes are the same. Just modify the brokerId in the configuration file and the monitored ip. So here I directly copy the Kafka directory in this node to the other two machines:

[root@192.168.99.1 ~]# rsync -av /usr/local/kafka 192.168.99.2:/usr/local/kafka
[root@192.168.99.1 ~]# rsync -av /usr/local/kafka 192.168.99.3:/usr/local/kafka

Then modify the brokerId and monitored ip of these two nodes:

[root@192.168.99.2 /usr/local/kafka/config]# vim server.properties
# Modify brokerId
broker.id=1
# Specify the listening address and port number. The configuration item is to specify the intranet ip
listeners=PLAINTEXT://192.168.99.2:9092
# If you need to open Internet access, specify the Internet ip in this configuration item
advertised.listeners=PLAINTEXT://192.168.99.2:9092
[root@192.168.99.2 /usr/local/kafka/config]# 
[root@192.168.99.3 /usr/local/kafka/config]# vim server.properties
# Modify brokerId
broker.id=2
# Specify the listening address and port number. The configuration item is to specify the intranet ip
listeners=PLAINTEXT://192.168.99.3:9092
# If you need to open Internet access, specify the Internet ip in this configuration item
advertised.listeners=PLAINTEXT://192.168.99.3:9092
[root@192.168.99.3 /usr/local/kafka/config]# 

When the configuration changes are complete, start the two nodes as described earlier. After successful startup, enter Zookeeper. Under / brokers/ids, there are corresponding brokerId data to represent the success of cluster construction:

[root@192.168.99.4 ~]# /usr/local/zookeeper/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 5]

Kafka replica set

About Kafka's replica set:

  • Kafka replica set refers to copying multiple logs. We know that Kafka data is stored in log files, which is equivalent to data backup and redundancy
  • Kafka can set the default number of replica sets through configuration
  • Kafka can set the replica set for each Topic, so the replica set is relative to the Topic

The replica set of a Topic can be distributed among multiple brokers. When a Broker is hung up, there is data on other brokers, which improves the reliability of data, which is also the main role of replica set.

We all know that the Topic in Kafka is just a logical concept, and the actual storage of data is Partition, so what is really copied is Partition. As shown below:

About copy factors:

  • The copy factor actually determines the number of copies of a Partition. For example, if the copy factor is 1, all the partitions in the Topic will be copied according to the number of brokers and distributed to each Broker

The replica allocation algorithm is as follows:

  • Sort all n brokers and i partitions to be allocated
  • Assign the ith Partition to the (i mod n) Broker
  • Assign the j-th copy of the i-th Partition to the ((i + j) mod n) Broker

Kafka node failure cause and treatment

There are two cases of Kafka node (Broker) failure:

  • If the heartbeat of Kafka node and Zookeeper is not maintained, it is regarded as node failure
  • When the message of follower lags behind the leader too much, it will be regarded as node failure

Kafka's treatment of node fault:

  • Kafka will remove the failed node, so it will not lose data due to node failure
  • Kafka's Semantic guarantee It also avoids data loss to a large extent
  • Kafka will balance messages within the cluster to reduce the excessive heat of messages in some nodes

Brief introduction of Kafka Leader election mechanism

Leader election of Kafka cluster:

  • If you have contact with some other distributed components, you will know that most components elect a leader among many nodes by voting, but in Kafka, voting is not used to elect a leader
  • Kafka will dynamically maintain a set of copies of Leader data (ISR)
  • Kafka will select a faster leader in ISR

"It's hard for a smart woman to cook without rice": Kafka has a helpless situation that all copies in ISR are down. In this case, Kafka will conduct unclean leader election by default. Kafka offers two different ways to deal with it:

  1. Wait for any Replica in ISR to recover and select it as Leader
    • If the waiting time is long, the availability will be reduced, or all replicas in the ISR cannot be recovered or the data is lost, the Partition will never be available
  2. Select the first recovered Replica as the new Leader, whether it is in ISR or not
    • It does not contain all messages that have been committed by the previous leader, so data loss will be caused, but the availability is high

Suggestions for Leader election configuration:

  • Disable unclean leader election
  • Set minimum ISR manually

For more details on ISR, please refer to:

Tags: Big Data kafka Zookeeper Apache Java

Posted on Thu, 04 Jun 2020 18:21:26 -0700 by Stonewall