分布式消息队列Kafka学习笔记

in cn •  7 years ago  (edited)

Kafka概述

a distributed streaming platform

Kafka架构和核心概念

producer, 生产者,生产馒头。

consumer, 消费者,吃馒头。

broker, 篮子。

topic, 主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃。

Zookeeper集群部署

安装包解压

tar -xzvf zookeeper-3.4.5.tar.gz -C /export/servers

zookeeper配置文件修改

cp zoo_sample.cfg zoo.cfg
vi zoo.cfg

#数据目录. 可以是任意目录,其中的dataDir目录和dataLogDir需要提前建立好
#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。
dataDir=/export/servers/data/zookeeper

#log目录, 同样可以是任意目录. 如果没有设置该参数, 将使用和dataDir相同的设置,其中的dataDir目录和dataLogDir需要提前建立好
#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。
dataLogDir=/export/servers/logs/zookeeper

# 主机名:心跳端口:数据端口
server.1=zk01:2888:3888
server.2=zk02:2888:3888
server.3=zk03:2888:3888

myid记录到数据文件夹

# zk01
mkdir -p /export/servers/data/zookeeper
echo 1 > myid
cat myid

zookeeper分发到其他节点

sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk02:/export/servers/
# zk02
mkdir -p /export/servers/data/zookeeper
echo 2 > myid

sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk03:/export/servers/
# zk03
mkdir -p /export/servers/data/zookeeper
echo 3 > myid

配置环境变量

vi /etc/profile

export ZK_HOME=/export/servers/zookeeper-3.4.5
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin

source /etc/profile

启动

# 启动
zkServer.sh start
# 查看集群状态和主从信息
zkServer.sh status
# 查看进程
jps -m

export变量作用域解析

export A=1,定义的变量,会对自己所在的shell进程及子进程生效。

B=1,定义的变量,只对自己所在的shell进程生效。

在script.sh中定义的变量,在当前登陆的shell进程中,source script.sh时,脚本中定义的变量也会进入当前登陆的进程。

要在父进程shell可见,可source一下定义export变量的脚本文件,让当前shell可见。

Zookeeper集群启动和停止脚本,可先配置集群机器间的免密登录。

#!/bin/sh
# start-zkServer-cluster.sh

zkServers="zk01 zk02 zk03"
echo "start zkServer..."
for i in $zkServers
do
    echo "start zkServer on ${i} ..."
    ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh start > /dev/null 2>&1 &"
done
#!/bin/sh
# stop-zkServer-cluster.sh

zkServers="zk01 zk02 zk03"
echo "stop zkServer..."
for i in $zkServers
do
    echo "stop zkServer on ${i} ..."
    ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh stop > /dev/null 2>&1 &"
done

Kafka集群部署及使用

安装包解压

tar -xzvf kafka_2.11-0.9.0.1.tar.gz -C /export/servers

$KAFKA_HOME/config/server.properties修改,集群的每个节点的broker.id和host.name都需要修改。

#broker的全局唯一编号,不能重复
broker.id=0
#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092
#kafka运行日志存放的路径
log.dirs=/export/servers/logs/kafka
#broker需要使用zookeeper保存meta数据
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01

Kafka集群启动和停止脚本

#!/bin/bash
#start-kafka-cluster.sh
brokers="kafka01 kafka02 kafka03"
kafka_home="/export/servers/kafka_2.11-0.9.0.1"
for i in $brokers
do
    echo "Starting kafka on ${i} ... "
    ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
    if [[ $? -eq 0 ]];  then
        echo "Start kafka on ${i} is OK !"
    fi
done
echo all kafka are started !
exit 0
#!/bin/bash
#stop-kafka-cluster.sh
brokers="kafka01 kafka02 kafka03"
kafka_home="/export/servers/kafka_2.11-0.9.0.1"
for i in $brokers
do
    echo "stop kafka on ${i} ... "
    ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-stop.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
    if [[ $? -eq 0 ]];  then
        echo "stop kafka on ${i} is OK !"
    fi
done
echo all kafka are stoped !
exit 0

启动Kafkastart-kafka-cluster.sh

创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic hello_topic

查看所有topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

kafka-topics.sh --list --zookeeper localhost:2181

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

kafka-console-producer.sh --broker-list zk01:9092 --topic hello_topic

消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic --from-beginning

查看topic详细信息

kafka-topics.sh -describe --zookeeper zk01:2181 --topic hello_topic 

整合Flume和Kafka完成实时数据的采集

分布式日志收集框架Flume学习笔记的应用需求3中,将A服务器上的日志实时采集到B服务器,打印到控制台,通过整合Flume和Kafka,把logger sink改为kafka sink,这里的kafka sink是作为producer的角色,通过控制台起一个consumer进行消费来验证。

技术选型:

exec-memory-avro.conf: exec source + memory channel + avro sink

avro-memory-logger.conf: avro source + memory channel + kafka sink

整合Flume和Kafka完成实时数据的采集

# avro-memory-kafka.conf
# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 192.168.169.100
avro-memory-kafka.sources.avro-source.port = 44444

# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = kafka01:9092,kafka02:9092,kafka03:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

验证,先启动avro-memory-kafka.conf,因为它监听192.168.169.100的44444端口,

flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf/myconf \
--conf-file $FLUME_HOME/conf/myconf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf/myconf \
--conf-file $FLUME_HOME/conf/myconf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console

在控制台启动消费者验证,

kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic
echo hellokafka1 >> data.log
echo hellokafka2 >> data.log

本文首发于steem,感谢阅读,转载请注明。

https://steemit.com/@padluo


微信公众号「数据分析」,分享数据科学家的自我修养,既然遇见,不如一起成长。

数据分析


读者交流电报群

https://t.me/sspadluo


知识星球交流群

知识星球读者交流群

Authors get paid when people like you upvote their post.
If you enjoyed what you read here, create your account today and start earning FREE STEEM!
Sort Order:  

@padluo, 这是小可可我在steemit最好的邂逅,好喜欢你的贴(^∀^)哇~~~ img