博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark读取 kafka nginx网站日志消息 并写入HDFS中(转)
阅读量:6184 次
发布时间:2019-06-21

本文共 1932 字,大约阅读时间需要 6 分钟。

原文链接:

spark 版本为1.0

kafka 版本为0.8

首先来看看kafka的架构图 详细了解请参考

我这边有三台机器用于kafka 日志收集的
A 192.168.1.1 为server
B 192.168.1.2 为producer
C 192.168.1.3 为consumer

首先在A上的kafka安装目录下执行如下命令

./kafka-server-start.sh ../config/server.properties

启动kafka 通过netstat -npl 可以查看出是否开启默认端口9092

B为我们的nginx日志产生服务器,在这里的日志是网站实时写入到access-nginx.log 中

因此我们可以通过 tail -f 的方式能看到当前网站正在请求的日志信息。如果你的网站访问量很大请勿执行tail -f

同样我们也要在B上部署kafka,如果你没有写kafka 的客户端的话( )

执行如下命令来push 数据到集群中

tail -n 0 -f /www/nh-nginx02/access.log | bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic sb-nginx03

这样我们就将日志push到kafka消息中了

C中,现在我们来写 consumer pull数据,还是要部署一下kafka 然后执行命令

bin/kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --topic sb-nginx03 --from-beginning

参数

–zookeeper 指定了你集群中zookeeper 的地址和端口即可
–topic 要和我们在B中push的时候指定的名称一致

上述方式只为在shell 命令行下,如何通过spark来写consumer呢?

假设你已经下载好spark1.0 源码 假设你已经部署好sbt scala等环境

scala 代码如下:

test  java.util.Properties org.apache.spark.streaming._ org.apache.spark.streaming.StreamingContext._ org.apache.spark.streaming.kafka._ org.apache.spark.SparkConf KafkaTest { main(args:Array[String]) { (args.length < 5) { System.err.println("Usage: KafkaTest 
") System.exit(1) } Array(zkQuorum, group, topics, numThreads,output) = args sparkConf = SparkConf().setAppName("KafkaTest") ssc = StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) lines.saveAsTextFiles(output) ssc.start() ssc.awaitTermination() //.saveAsTextFile(output) } }

然后编译

mvn -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.0.1 -DskipTests package

然后spark作业提交

./bin/spark-submit --master local[*] --class org.apache.spark.fccs.KafkaTest ./test/target/scala-2.10/spark-test-1.0.0-hadoop2.3.0-cdh5.0.1.jar zoo02 my-test sb-nginx03 1 hdfs://192.168.1.1:9100/tmp/spark-log.txt

结果如下:

转载地址:http://pfsda.baihongyu.com/

你可能感兴趣的文章
51CTO已展开反击?原账号无法在论坛上发表长篇内容
查看>>
我的友情链接
查看>>
利用R语言实现支持向量机(SVM)数据挖掘案例
查看>>
Bash shell脚本的语法结构
查看>>
linux sftp相关命令
查看>>
Like 关键字对时间模糊查询
查看>>
STL之容器共性
查看>>
金笛JDMail邮件系统从源头上为企业铸造防lj邮件墙--3
查看>>
ArcGis for Javascript 的使用注意点
查看>>
FileWriter不能写utf-8的原因及解决
查看>>
IOCP (关于WSASend,WSARecv调用)
查看>>
MOOON-scheduler核心设计图(初稿)
查看>>
网站开发的流程
查看>>
Apache Common-pool2对象池浅析
查看>>
Microsoft Hyper-V Server 2008 R2和SCVMM2012部署XenDesktop 5.6桌面虚拟化系列之二准备虚拟桌面模板...
查看>>
我的友情链接
查看>>
改变虚拟机MAC地址
查看>>
solr教程
查看>>
我的友情链接
查看>>
钱旺之行:互联网经济的增长点——注意力价值经济
查看>>