Flume+Kafka+Storm模拟应用日志的实时处理

in cn •  7 years ago  (edited)

模拟应用需求

  • 采集订单系统应用打印的日志文件。

日志文件使用log4j生成,滚动生成。使用tail -F xxx.log来监控文件名称,理解tail -f和tail -F的区别。

  • 将采集的日志文件保存到Kafka中。

(source)输入:tail -F xxx.log

(channel)存储:内存

(sink)输出:Kafka

config样例,

a1.source = s1
a1.channel = c1
a1.sink = k1

source exec tail -F xxx.log
channel RAM
sink xxxx.xxxx.xxxx.KafkaSink // 该类必须存放lib目录
sink.topic = orderMq
sink.itcast = itcast

map = getConfig();
value = map.get("itcast")
  • 通过Storm程序消费Kafka中数据。
KafkaSpout
Bolt1()
Bolt2()

业务模拟:统计双十一当前的订单金额、订单数量、订单人数。订单金额/数量/人数(整个网站、各个业务线、各个品类、各个店铺、各个品牌、每个商品)。

环境配置

应用安装的一般流程:下载、解压、配置、分发。

在Flume官方网站下载Flume,解压Flume安装包,

tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/
cd /export/servers/
ln -s apache-flume-1.6.0-bin flume

配置Flume环境变量,

vi /etc/profile

export FLUME_HOME=/export/servers/apache-flume-1.6.0-bin
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin:$FLUME_HOME/bin

source /etc/profile

创建Flume配置文件,

cd /export/servers/flume/conf/
mkdir myconf
cd myconf/
vi exec.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/1.log
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = orderMq
a1.sinks.k1.brokerList = kafka01:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

准备模拟应用日志数据的目录,

mkdir -p /export/data/flume_sources/click_log

通过脚本模拟生产应用日志数据,

# click_log_out.sh
for((i=0;i<=50000;i++));
do echo "message-"+$i >> /export/data/flume_sources/click_log/1.log;
done

chomd +x click_log_out.sh

打通所有流程,第一步,启动zk集群,

zkServer.sh start
zkServer.sh status

第二步,封装Kafka集群启动和停止的脚本,启动Kafka集群,

start-kafka.sh

第三步,启动Flume客户端,监控日志数据生成,

./bin/flume-ng agent -n a1 -c conf -f conf/myconf/exec.conf -Dflume.root.logger=INFO,console

第四步,创建一个topic并开启consumer,在客户端模拟消费,

kafka-console-consumer.sh --zookeeper zk01:2181 --topic orderMq

第五步,执行应用日志数据生产脚本,

sh click_log_out.sh

整合Storm程序的bug解决

服务端没有启动ZooKeeper,

ERROR org.apache.curator.ConnectionState - Connection timed out for connection string (zk01:2181,zk02:2181,zk03:2181) and timeout (15000) / elapsed (15071)

本地调试Storm程序,本机没有配置kafka的解析,

kafka.consumer.SimpleConsumer - Reconnect due to error:
java.nio.channels.ClosedChannelException: null

放到Storm集群运行,相关环境和jar包冲突,把Storm相关的依赖去掉。

SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.ExceptionInInitializerError
        at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106)
        at cn.itcast.storm.kafkaAndStorm.KafkaAndStormTopologyMain.main(KafkaAndStormTopologyMain.java:27)
Caused by: java.lang.RuntimeException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/hadoop/kafka2storm.jar!/defaults.yaml, jar:file:/export/servers/apache-storm-0.9.5/lib/storm-core-0.9.5.jar!/defaults.yaml]
        at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:133)
        at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:160)
        at backtype.storm.utils.Utils.readStormConfig(Utils.java:184)
        at backtype.storm.utils.Utils.<clinit>(Utils.ja

本文首发于steem,感谢阅读,转载请注明。

https://steemit.com/@padluo

微信公众号「数据分析」,分享数据科学家的自我修养,既然遇见,不如一起成长。

数据分析

读者交流电报群

https://t.me/sspadluo

知识星球交流群

知识星球读者交流群

Authors get paid when people like you upvote their post.
If you enjoyed what you read here, create your account today and start earning FREE STEEM!
Sort Order:  
Loading...

@padluo, 就知道你写文很有潜力!

多谢捧场支持呀

感谢分享!不过为什么大家都喜欢建个小密圈...

多谢大佬支持哈

学习了,有空来踩博。。

谢谢