还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家

  • 小编 发布于 2019-12-07 23:46:57
  • 栏目:科技
  • 来源:程序猿茅哥
  • 9608 人围观

Kafka集群部署

概述

之前的大数据集群主要是离线处理的方式对集群的数据进行开发处理。当前的集群数据量已经达到了PB级别了,离线数据获取主要是从数仓侧进行全量或者增量的方式导入大数据平台,部分是通过SFTP的方式解析进入大数据平台,少量数据是通过接口的方式准实时接入到大数据平台。随着业务的发展,对于实时数据的接入和应用显得越来越重要了,接下来的时间会一直更新整个时间数据接入和应用的分享。

还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家

我们都是知道kafak是一种分布式的,基于发布/订阅的消息系统。以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能、高吞吐率。这个是非常符合当前接入实时数据进行处理的一个组件。

为什么说kafka是非常适合我当前的场景呢?当前市场kafka的应用场景常见有:消息系统、跟踪网站活动、运营指标、日志聚合、流处理、采集日志、提交日志。kafka具有以下的以下优点(特性):

· 高吞吐、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。

· 可扩展性:kafka 集群支持热扩展

· 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

· 容错性:允许集群中节点失败

· 高并发:支持数千个客户端同时读写

还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家

即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。同时支持离线数据处理(hive、HBASE)和实时数据处理(spark、storm)。

话不多说,开启我实际安装过程。如下的安装步骤都是可以在虚拟机里面直接复制使用的。温馨提醒,如果是在实际的生产环境进行部署的话,请记得挂着磁盘。

希望对你们有帮助!!!其中标红的部分,新手要特别注意,大神们就忽略了。也欢迎大家留言指正!

一、集群规划

二、集群环境准备

以下步骤无特殊说明则各节点均需执行

1)关闭防火墙(三台)

# 需要用root用户执行

service iptables stop

chkconfig iptables off

2)关闭selinux(三台)

vim /etc/selinux/config


# 注释下面一行

#SELINUX=enforcing

# 添加下面一行

SELINUX=disabled

3)配置域名映射(三台)

vim /etc/hosts


# 在末尾添加以下几行,注意填写实际IP及hostname

IP01 hostname01 kafka01

IP02 hostname02 kafka02

IP03 hostname03 kafka03

重启机器

reboot -h now

4)配置免密登录

# 生成公钥与私钥, 执行以下命令后按三次回车键即可

ssh-keygen -t rsa

所有节点拷贝公钥到kafka01

ssh-copy-id kafka01

复制kafka01认证到其他机器

在kafka01执行

scp /root/.ssh/authorized_keys kafka02:/root/.ssh

scp /root/.ssh/authorized_keys kafka03:/root/.ssh

若中间出错,100%后面不为1212 (404X3,n个节点为404Xn),可以通过机器互ping验证

5)时间同步

在kafka01执行

确认是否安装ntpd服务

rpm -qa | grep ntp

若未安装,在线安装

yum install -y ntp

设置ntpd服务开机启动

chkconfig ntpd on

修改配置文件

vim /etc/ntp.conf


# 添加下面一行,注意修改子网IP,如192.168.11.0

restrict 子网IP mask 255.255.255.0 nomodify notrap

# 注释下面四行

#server 0.centos.pool.ntp.org

#server 1.centos.pool.ntp.org

#server 2.centos.pool.ntp.org

#server 3.centos.pool.ntp.org

# 取消注释或添加下面两行

server 127.127.1.0 # local clock

fudge 127.127.1.0 stratum 10

配置以下内容,保证BIOS与系统时间同步

vim /etc/sysconfig/ntpd


# 添加下面一行

SYNC_HWLOCK=yes

在其他节点执行

配置另外几台与kafka01时间同步

crontab -e


*/1 * * * * /usr/sbin/ntpdate kafka01

6)安装jdk(三台)

查看并卸载自带的openjdk

rpm -qa | grep java

rpm -e java-1.6.0-openjdk-1.6.0.41-1.13.13.1.el6_8.x86_64 tzdata-java-2016j-1.el6.noarch java-1.7.0-openjdk-1.7.0.131-2.6.9.0.el6_8.x86_64 --kafkaps

创建文件夹

# 用于存放安装包

mkdir -p /export/software/

# 用于安装软件

mkdir -p /export/servers/

上传到/export/software/并解压jdk

cd /export/software/

tar -zxvf jdk-8u141-linux-x64.tar.gz -C ../servers/


配置环境变量

vim /etc/profile


# JAVA_HOME

export JAVA_HOME=/export/servers/jdk1.8.0_141

export PATH=:$JAVA_HOME/bin:$PATH


source /etc/profile

验证

java -version

结果显示如下

java version "1.8.0_141"

Java(TM) SE Runtime Environment (build 1.8.0_141-b15)

Java HotSpot(TM) 64-Bit Server VM (build 25.141-b15, mixed mode)

三、Zookeeper集群安装

1)下载安装包


下载完成后上传到集群上

2)解压安装包

cd /export/software/

tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/

3)创建软链接

cd /export/servers/

ln -s zookeeper-3.4.14 zookeeper

4)修改配置文件

cd /export/servers/zookeeper/conf

cp zoo_sample.cfg zoo.cfg

mkdir -p /export/servers/zookeeper/zkdatas/


vim zoo.cfg


# 指定zk文件存储目录

dataDir=/export/servers/zookeeper/zkdatas

# 取消下面两行注释

autopurge.snapRetainCount=3

autopurge.purgeInterval=1

# 添加下面三行

server.1=kafka01:2888:3888

server.2=kafka02:2888:3888

server.3=kafka03:2888:3888

5)添加myid配置

echo 1 > /export/servers/zookeeper/zkdatas/myid

6)分发安装包

scp -r /export/servers/zookeeper-3.4.14/ kafka02:/export/servers/

scp -r /export/servers/zookeeper-3.4.14/ kafka03:/export/servers/

7)其他机器创建软链接及修改myid值

ln -s /export/servers/zookeeper-3.4.14/ /export/servers/zookeeper

kafka02

echo 2 > /export/servers/zookeeper/zkdatas/myid

kafka03

echo 3 > /export/servers/zookeeper/zkdatas/myid

8)启动zookeeper集群

启动脚本如下

zookeeper-start.sh

#!/bin/bash


echo "1.启动节点kafka01、kafka02和kafka03......"

for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh start"

done


#休眠1秒

sleep 1


echo "2.查看集群各节点状态......"

for n in kafa01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh status"

done

停止脚本如下

zookeeper-stop.sh

#!/bin/bash


echo "1.停止节点kafka01、kafka02和kafka03......"

for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh stop"

done

#休眠1秒

sleep 1


echo "4. 查看集群各节点状态......"

for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh status"

done


四、kafka集群安装

1)下载安装包


还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家

下载完成后上传到集群上

2)解压安装包

cd /export/software/

tar -zxvf kafka_2.11-2.2.1.tgz -C ../servers/

3)创建软链接

ln -s kafka_2.11-2.2.1/ kafka

4)创建logs文件夹

cd /export/servers/kafka

mkdir logs

5)备份并修改配置文件

cd /export/servers/kafka/config

cp server.properties server.properties.bak

修改内容

vim server.properties


############################# Server Basics #############################


#broker的全局唯一编号,不能重复

broker.id=0


############################# Socket Server Settings #############################


# 监听列表

listeners=PLAINTEXT://kafka01:9092


# 处理网络请求的线程数, 一般等于核心数

num.network.threads=3


# 用来处理磁盘IO的线程数

num.io.threads=8


# socket 发送缓冲区

socket.send.buffer.bytes=102400


# socket 接收缓冲区

socket.receive.buffer.bytes=102400


# socket请求最大数值,防止serverOOM

socket.request.max.bytes=104857600


############################# Log Basics #############################


# kafka运行日志存放的路径

log.dirs=/export/servers/kafka/logs


# 默认分区数 会被命令行参数覆盖

num.partitions=3


# 用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1


############################# Internal Topic Settings #############################

# 内置主题 "__consumer_offsets" and "__transaction_state" 的副本数

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=3


############################# Log Flush Policy #############################


# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000


# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000


############################## Log Retention Policy #############################


# topic数据默认保留时长, 10天

log.retention.hours=240


# A size-based retention policy for logs. Segments are pruned from the log unless the remaining

# segments drop below log.retention.bytes. Functions independently of log.retention.hours.

#log.retention.bytes=1073741824


# 一个消息长度, 超过再创建一个

log.segment.bytes=1073741824


# 文件大小检查周期

log.retention.check.interval.ms=300000


############################# Zookeeper #############################


# 配置连接Zookeeper集群地址

zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181


# zookeeper连接超时时间

zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################


group.initial.rebalance.delay.ms=0


############################# Other #############################


# 数据副本个数

default.replication.factor=3


# 单条消息最大长度4Mb

message.max.bytes=4000000


# broker可复制的消息的最大字节数。值应该比message.max.bytes大

replica.fetch.max.bytes=4194304


# 消费者能读取的最大消息,值应该大于或等于message.max.bytes

max.partition.fetch.bytes=4194304


# 允许删除主题

delete.topic.enable=true

6)各节点分别添加环境变量

vim /etc/profile


#KAFKA_HOME

export KAFKA_HOME=/export/servers/kafka/

export PATH=$PATH:$KAFKA_HOME/bin


source /etc/profile

7)分发安装包

注意:kafka02和kafka03

scp -r /export/servers/kafka_2.11-2.2.1/ kafka02:/export/servers/

scp -r /export/servers/kafka_2.11-2.2.1/ kafka03:/export/servers/

8)其他机器创建软链接及修改配置文件

ln -s /export/servers/kafka_2.11-2.2.1 /export/servers/kafka

kafka02

broker.id=1


listeners=PLAINTEXT://kafka02:9092


kafka03

broker.id=2


listeners=PLAINTEXT://kafka03:9092

9)启动kafka集群

启动脚本如下

kafka-start.sh

#!/bin/bash


echo "正在启动kafka集群......"


for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties"

done


echo "启动kafka集群完成......"


停止脚本如下

kafka-stop.sh

#!/bin/bash


echo "正在关闭kafka集群......"


for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/kafka/bin/kafka-server-stop.sh"

done


echo "关闭kafka集群完成......"

10)kafka测试

# 创建topic

kafka-topics.sh --create --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 --replication-factor 3 --partitions 3 --topic topic-test

# 查看topic详情

kafka-topics.sh --describe --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 localhost:2181 --topic topic-test

还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家

# 开启消息生产者

kafka-console-producer.sh --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic topic-test

# 开启消息消费者

kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --from-beginning --topic topic-test

在消息生产者发送消息消费者能收到即可

还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家

转载请说明出处:866热点网 ©