sparkstreaming+flume+kafka实现实时日志处理
本实验自己做的亲测可以完整执行没如果你需要使用的话需要修改几个地方:
- flume配置文件中source和sinks的目标主机名
- 在idea中配置的log4j.properties文件中的主机名也需要修改为你自己的主机名
- SparkStreaming程序中存储数据的数据库和数据表需要提前创建
实验目录
实验环境:
Widows10+Idea+CentOS+flume+kafka+hdfs
准备工作
Kafka 安装并创建topic
Centos7环境
Hadoop 环境搭建
Flume 安装并配置conf文件
实验案例分析
采用sparkStreaming整合Kafka+Flume实现日志流式处理
环境启动
(1)启动Hadoop集群
start-all.sh
(2)启动zookeeper服务
bin/zkServer.sh start
(3)启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties
实验步骤
1、编写java程序能够生成日志文件
package com.ethan.kafka;
import org.apache.avro.test.util.MD5;
import org.apache.log4j.Logger;
public class LogsFactor {
private static Logger logger = Logger.getLogger(GenerateLog.class.getName());
public static void main(String[] args) throws InterruptedException {
while(true){
Thread.sleep(5000);
StringBuilder sb = new StringBuilder();
sb.append(timeGen())
.append("_")
.append(userIdGen()) // 用户ID
.append(sessionIdGen()) // sessionId
.append("_")
.append(userIdGen()) // 页面ID
.append("_")
.append(timeGen() + " " + timeStampGen())
.append("_")
.append(keyWorld()) // 搜索关键字
.append("_")
.append(productId()) // 点击品类ID
.append("_")
.append(productId()) // 产品ID
.append("_")
.append(cageryId()) // 下单品类ID
.append("_")
.append(productId()) // 产品ID
.append("_")
.append(cageryId()) // 支付品类Ids
.append("_")
.append(cageryId()) // 产品ids
.append("_")
.append("3"); // 城市Id
//.append("n");
//System.out.print(sb.toString());
logger.info(sb);
}
}
public static int productId(){
return (int)(Math.random()*300);
}
public static int cageryId(){
return (int)(Math.random()*300)+(int)(Math.random()*1234);
}
public static String keyWorld(){
String[] kw ={"手机","数码","服装","篮球","文具","食品","学习资料","工具","餐饮用具"};
int value = (int)(Math.random() * 9);
String str = kw[value];
return str;
}
private static String timeGen() {
int year = (int)(Math.random() * (2021 - 2016 + 1) + 2016);
int month = (int)(Math.random() * (12 - 1 + 1) + 1);
int day = (int)(Math.random() * (31 - 1 + 1) + 1);
return (year + "-" + month + "-" + day);
}
private static String timeStampGen(){
int hour = (int)(Math.random() * (24 - 1 + 1) + 1);
int munite = (int)(Math.random() * (60 - 1 + 1) + 1);
int second = (int)(Math.random() * (60 - 1 + 1) + 1);
return (hour + ":" + munite + ":" + second);
}
public static int userIdGen() {
return (int)(Math.random() * (1000000 - 1) + 1);
}
public static String sessionIdGen(){
int n = (int)(Math.random() * (1000000 - 1) + 1);
//int str = MD5.digest("niit" + n).split(":")(1);
String str = "niit:"+n+(int)(Math.random()*123*Math.random()*1000);
return str;
}
}
2、在resource中修改log4j.propertieswe文件的内容
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1,flume
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.target=System.out
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] %x - %m%n
# 1.8以及以上版本与本配置略有不同,详情参照官方doc
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = ethan002
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
3、在hdfs上创建检查点spark/checkpoint
- 启动hadopp
start-all.sh
注意:如果前面已经启动的无需再次启动 - 创建检查点
hadoop fs -mkdir -p /spark/checkpoin/
4、创建kafka的topic
bin/kafka-topics.sh --zookeeper ethan002:2181 --create --replication-factor 3 --partitions 1 --topic test
5、编写Flume配置文件
- 在flume安装目录价创建job目录
- 在jobs目录下创建flume配置文件
vi flume-kafka-idea.conf
- 添加如下内容
a1.sources=r1 a1.sinks=k1 a1.channels=c1 #Describe/configure the source a1.sources.r1.type=avro a1.sources.r1.bind=ethan002 a1.sources.r1.port=41414 #define sink a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic=test a1.sinks.k1.kafka.bootstrap.servers=ethan002:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 100 ##kafka消息生产的大小限制 a1.sinks.k1.kafka.producer.max.request.size=51200000 # Use a channel which buffers events in memory a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 # Bind the source and sink to the channel a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 #a1.sinks.k1.type=logger
6、编写SparkStreaming程序代码
package com.ethan.spark.stream.practice.kafka
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
case class Word(
wordName: String,
count:Int
)
object KafkaFlumeDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDemo")
val streamingContext = new StreamingContext(conf,Seconds(5))
val sc = streamingContext.sparkContext
streamingContext.checkpoint("hdfs://ethan001:9000/spark/checkpoint")
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> "ethan002:9092", //从那些broker消费数据
"key.deserializer" -> classOf[StringDeserializer], //发序列化的参数,因为写入kafka的数据经过序列化
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream", //指定group.id
"auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始 ;② latest从消费者启动之后开始
"enable.auto.commit" -> (false: java.lang.Boolean)
//是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】,
// true 消费者定期会更新偏移量 groupid,topic,parition -> offset ;
// "false" 不让kafka自动维护偏移量 手动维护偏移
)
// 数组中存放的是在kafka中创建的topic
val topics = Array("test", "knf")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams) //订阅主题
)
val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) //转换格式
//对从kafka生产的一次消息进行词频统计
val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split("_")).map((_, 1))//.reduceByKey(_ + _)
//**一下部分实现累计统计写入MySQL**
/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/
//定义函数用于累计每个单词出现的次数
val addWordFunction = (currentValues:Seq[Int],previousValueState:Option[Int])=>{
//通过spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
val currentCount = currentValues.sum
//已经进行累加的值
val previousCount = previousValueState.getOrElse(0)
//返回累加后的结果,是一个Option[Int]类型
Some(currentCount+previousCount)
}
val result = resultRDD.updateStateByKey(addWordFunction)
//将DStream中的数据存储到mysql数据库中
result.foreachRDD(
rdd=>{
val url = "jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=UTF-8"
val user = "root"
val password = "123456"
Class.forName("com.mysql.jdbc.Driver").newInstance()
//截断数据表,将数据表原有的数据进行删除
var conn1: Connection = DriverManager.getConnection(url,user,password)
val sql1 = "truncate table word"
var stmt1 : PreparedStatement = conn1.prepareStatement(sql1)
stmt1.executeUpdate()
conn1.close()
rdd.foreach(
data=>{
//将数据库数据更新为最新的RDD中的数据集
var conn2: Connection = DriverManager.getConnection(url,user,password)
val sql2 = "insert into word(wordName,count) values(?,?)"
var stmt2 : PreparedStatement = conn2.prepareStatement(sql2)
stmt2.setString(1,data._1.toString)
stmt2.setString(2,data._2.toString)
stmt2.executeUpdate()
conn2.close()
})
}) /*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/
// 打印
result.print()
// 启动
streamingContext.start()
// 等待计算采集器的执行
streamingContext.awaitTermination()
}
}
注意:需要提前在本地数据中数据库hadoop和创建表word
7、启动整个环境就行测试
注意:如果前面已经启动的无需再次启动
(1)启动Hadoop集群
bin/start-all.sh
(2)启动zookeeper服务
bin/zkServer.sh start
(3)启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties
(4)启动flume
bin/flume-ng agent -n a1 -c conf/ -f job/a3.conf -Dflume. root.logger=INFO,console
(5)启动java程序生成日志并提交到flume监听的端口
(6)启动SparkStreaming处理程序
启动程序处理日志信息
测试结果
Idea控制台的输出结果
MySQL数据库中存储的数据
欢迎你的一键三连