MyException - 我的异常网
当前位置:我的异常网» 互联网 » Spark调整Kafka小项目

Spark调整Kafka小项目

www.MyException.Cn  网友分享于:2013-10-27  浏览:0次
Spark整合Kafka小项目

SparkStreaming与kafka整合小项目实践含所有代码带详细注释

 

总流程:自制日志生成器生成含数据日志,使用kafkaAppender直接发送到kafka,SparkStreaming从kafka消费日志,并流式处理将结果发送到kafka另一个topic,Java后台从kafka消费日志分析结果,实现秒级大数据实时分析展示。

 

版本

kafka_2.11-0.11.0.1

spark-2.1.1-bin-hadoop2.7

scala-2.11.11

Jdk-1.8

Spark使用Intelij Idea

其余使用eclipse

 

 

第一步

日志生成器输出日志到kafka

 

重点jar包:

kafka-log4j-appender-0.11.0.1.jar //日志使用

kafka_2.11-0.11.0.1.jar //如果报错就加上吧

kafka-clients-0.11.0.1.jar //如果报错就加上吧

slf4j-api-1.7.25.jar //日志框架也可以用其他的

slf4j-log4j12-1.7.25.jar

 

配置文件内容及注意事项

文件名:log4j.properties

文件内容:

 

log4j.rootLogger=DEBUG,stdout,KAFKA
//appender Console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %x-%t %l  (message:%m)%n
 
## appender KAFKA
log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.KAFKA.topic=log-topic
log4j.appender.KAFKA.brokerList=master:9090
log4j.appender.KAFKA.compressionType=none
log4j.appender.KAFKA.syncSend=true
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %x-%t %l  (message:%m)

 

 

文件名:my.properties

 

#time interval of every times,unit is  ms,default 100ms
timeinterval=1000
#the count of log every times,default 1000
frequency=298
#runningtime unit is  ms,default 60000ms
runtime=6000000

 

 

代码解析:

LogWriterExcutor.java

 

import org.apache.log4j.Logger;
class LogWriterExcutor implements Runnable{
	
	Logger logger = Logger.getLogger(this.getClass().getName());
	private String []message;
	public LogWriterExcutor(String []message){
		this.message = message;
	}	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		for(String e : message)
			logger.info(e);
	}
}

 

 

LogCreater.java

 

 

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;

class LogCreater extends Constant{
	
	Logger logger = Logger.getLogger(this.getClass().getName());
	
	ExecutorService executor = null;
	private int timeinterval = TIME_INTERVAL;		//间隔多久发送一批日志,单位毫秒
	private int frequency = FREQUENCY;				//每一批发送发送多少条数据,单位条
	private int sumOfChinese = SUM_CHINESE;			//自定义中文字集元素个数
	private int runtime = RUNTIME;					//程序运行总时间
	private long startTime = 0;
	private long endTime = 0;
	private long logCount = 0;						//日志已发条数
	private boolean stop = true;
	
	LogCreater(){
		init();
	}
	
	public void init(){
		Properties properties = new Properties();
		FileInputStream in;
		try {
			in = new FileInputStream("src\\source\\my.properties");
			properties.load(in);
			timeinterval = Integer.parseInt((String)properties.get("timeinterval"));
			frequency =Integer.parseInt((String)properties.get("frequency"));
			runtime =Integer.parseInt((String)properties.get("runtime"));
		} catch (IOException e) {
			logger.error("配置文件读取失败");
			e.printStackTrace();
		}
		executor = Executors.newCachedThreadPool();
		startTime = System.currentTimeMillis();
		printHint();
	}
	
	
	public void startCreate() {
		System.out.println("正在生成日志.....");
		
		if(executor == null){
			logger.error("线程池获取失败,日志生成器执行失败。执行结束");
			return;
		}
		while(stop){
			String []messages = getMessages(frequency);
			create(messages);
			try {
				Thread.sleep(timeinterval);
			} catch (InterruptedException e) {
				logger.error("线程睡眠执行出错");
				e.printStackTrace();
			}
			endTime = System.currentTimeMillis();
			if((endTime-startTime)>runtime)
				stop = false;
		}
		
		System.out.println("共生成 "+logCount+" 条日志。");
	}
	
	private void create(String []messages) {
		executor.execute(new Thread(new LogWriterExcutor(messages)));
		logCount += messages.length;
	}
	
	private String[] getMessages(Integer frequency) {
		Random rand = new Random();
		String []massages = new String[frequency];
		for(int i=0;i<frequency;i++){
			massages[i] = REGRET[rand.nextInt(sumOfChinese)];
		}
		return massages;
	}
	
	private void printHint(){
		System.out.println("每次时间间隔\t"+timeinterval+"ms");
		System.out.println("每次日志数量\t"+frequency+"条/次");
		System.out.println("预计运行时间\t"+runtime/1000+"s");
	}
}

 

 

Constant .java

 

public class Constant {

	/*
	 * 这个文件中存放的全部是常量
	 */
	
	/*
	 * 日志生成器隔多少时间写一批日志,默认值
	 */
	public static Integer TIME_INTERVAL = 100;
	
	/*
	 * 日志生成器每一批次生成多少条日志,默认值
	 */
	public static Integer FREQUENCY = 100;
	
	/*
	 * 运行时间,默认一分钟,默认值
	 */
	public static Integer RUNTIME = 60000;
	
	/*
	 * 298个中文字,来自楚辞《惜誓》
	 */
	public static String[]REGRET = {"一","言","老","调","清","者","舆","昆","合","渊","下","而","同","不","明","与",
			"昏","谏","小","騑","少","我","气","谔","世","或","尚","丝","鸟","逢","瀣","中","是","鸱","就","水","临","制",
			"举","砾","鸾","所","乃","鹄","久","居","陆","之","虎","乎","乐","虑","乔","虖","剖","遗","虚","聚","江","吸",
			"瑟","象","乡","衡","周","息","虯","衰","驰","山","驱","乱","干","年","并","恶","穷","偷","顺","登","白","幽",
			"驾","岁","蚁","节","梅","沆","皆","皇","骋","二","于","隐","源","麒","骖","骛","墟","功","麟","纡","纫","被",
			"身","犬","躯","悲","河","蚴","犹","人","难","裁","仁","狂","黄","集","哉","背","苍","从","风","仑","黑","盖",
			"高","飙","仙","四","盛","惜","飞","回","苟","因","以","拥","苦","独","竭","曲","直","相","建","固","国","攀",
			"异","儃","处","茅","月","夏","霑","休","众","北","圜","生","索","謣","圣","贤","伤","大","在","用","木","天",
			"眩","太","夫","伯","地","朱","失","贵","然","贼","放","愿","流","权","充","故","商","均","先","浊","子","何",
			"余","神","非","止","赤","此","来","车","革","兮","佯","数","女","杳","海","睹","蝼","彼","载","松","使","长",
			"极","羁","如","概","历","玉","涉","冉","枉","羊","王","後","厌","再","美","箕","得","龙","原","龟","审","醢",
			"群","冥","推","循","讬","枭","况","德","容","方","澹","离","去","旁","见","观","係","心","寄","又","反","重",
			"野","藏","量","发","翔","比","俗","志","诚","进","远","川","察","忠","无","濡","矣","凤","日","知","左","自",
			"矫","可","称","翱","深","已","右","至","石","念","时","迻","忽","寿","丹","根","为","尽",};
	
	/*
	 * 中文字个数,用作随机数范围使用
	 */
	public static Integer SUM_CHINESE = 100;
}

 

 

MyUtil.java

import java.util.Random;
public class MyUtil {
	public static int[] getRand(int n,int range){
		Random ran = new Random();
		int []arr = new int[n];
		while(n-->0){
			arr[n] = ran.nextInt(range);
		}
		return arr;
	}
}

 

Demo.java

/*
 * 日志生成器
 */

public class Demo{
	public static void main(String[] args){
		new LogCreater().startCreate();
		System.exit(0);
	}
}

目录结构:就普通java project,


 

第二步

创建kafka topic

安装跳过

配置%KAFKA_HOME%conf/server.properties:

网上教程很多,此处不再赘述

 

启动kafka

kafka-server-start.sh config/server.properties &

 

创建topic:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic log-topic

 

查看topic:

kafka-topics.sh --describe --zookeeper master:2181 --topic log-topic

 

创建控制台消费者:

kafka-console-consumer.sh --bootstrap-server master:9090 --from-beginning --topic log-topic

 

启动顺序

1.启动kafka Server,2.创建topic,3.查看创建的topic(可选),4.创建控制台消费者,5.启动日志生成器程序。

 

注意事项在启动控制台消费者的终端会将接收的日志打印出来,命令最后面加上 & 符号可将进程调至后台运行。关闭消费者使用Ctrl+c

 

 

第三步

spark消费kafka的日志

重点jar包:

kafka_2.11-0.11.0.1.jar

kafka-clients-0.11.0.1.jar

spark-streaming-kafka_2.11-1.6.3.jar

 

Spark所有自带jar包

Scala的SDK

 

报异常:

如果运行报java.lang.NoClassDefFoundError: org/apache/spark/Logging

这个Logging截止存在于spark-core_2.11-1.5.2中。

2.1.1版本saprk无此class文件,被org.apache.spark.internal.Logging取代。

解决办法

把1.5.2版本里面的这个class提出来单独用java -xvf  new_name.jar class_dir 打包成一个jar包,然后当做常规jar工具包使用

 

过程解析:

Spark创建Receiver从kafka消费日志数据。

 

代码解析:Kafka.scala 

import java.util.Properties
import java.util.logging.{Level, Logger}

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
//import com.trigl.spark.util.{DataUtil, LauncherMultipleTextOutputFormat}
import org.apache.spark.Logging
object Kafka extends Logging{

  private var producer: KafkaProducer[String, String] = _
  private var props : Properties = _

  def main(args: Array[String]) {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARNING)
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkConf = new SparkConf().setAppName("LauncherStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    /*
        provider的参数
     */
    val brokerAddress = "master:9090"
    val topic = "pro-topic"
    props = new Properties()
    props.put("bootstrap.servers", brokerAddress)
    props.put("value.serializer", classOf[StringSerializer].getName)
    // Key serializer is required.
    props.put("key.serializer", classOf[StringSerializer].getName)
    // wait for all in-sync replicas to ack sends
    props.put("acks", "all")

	//创建kafka生产者,后面可以直接使用它发送数据  
    producer = new KafkaProducer[String, String](props)
    if(producer == null) {
      println("producer为空")
      ssc.stop()
    }

    /*
    *消费者参数
     */
    val zkQuorum = "master:2181,slave1:2181,slave2:2181"
	//这个group本来是随意创建,但是不能与已存在的重复,否在接收不到数据。每次运行请务必修改,或者做成参数,这个问题我尚未解决,但不影响流程///测试
    val group = "log-group21"		
    val topicMap = Map[String, Int]("log-topic" -> 1)

	//创建kafka消费者,如果不使用窗口将每隔【StreamingContext第二个参数定义时间】创建一个rdd
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

    kafkaStream.window(Seconds(12),Seconds(6)).foreachRDD((rdd: RDD[String], time: Time) => {
		//使用窗口每隔6秒钟处理一次前12秒区段的数据,此处6秒钟位置所在参数必须为StreamingContext(),第二个参数的倍数
		//这12秒时间区段的数据全在这一个rdd里面,直接迭代计算wordcount,将最终生成的数据发送到kafka另一个topic
      val re = rdd.flatMap(t => t.reverse.charAt(1).toString).map(m => (m,1L)).reduceByKey(_+_)
      val a = re.collect().toMap
      producer.send(new ProducerRecord[String, String](topic, a.mkString(",")))
    })

/*
    //这个可以用
    kafkaStream.foreachRDD((rdd: RDD[String], time: Time) => {

      //下面这个可以用,直接转发
      //rdd.collect().foreach(t => producer.send(new ProducerRecord[String, String](topic, t)))

      //下面这个可以用,微处理然后发送
      rdd.collect().foreach(t =>{
        println("正在发送: "+t)
        var s = t.reverse.charAt(1).toString		//提取前面夹杂在日志中的一个汉字
        producer.send(new ProducerRecord[String, String](topic, s))
      })

    })
*/
    ssc.start()
        // 等待实时流
    ssc.awaitTermination()
	
	//这条语句建议写上。
    producer.close()	
    println("它发生了")
  }

 

运行命令及注意事项

spark-submit  --master spark://master:7077 --class streaming.Kafka libra.jar

如果缺包可以用--jars或者其他参数加上

特别注意:

每次运行请修改scala消费者的group消费组名,否则会接收不到数据,这个问题我还没解决

 

第四步

spark生成处理结果发送给kafka

jar包:

与第三步一样

 

创建新的topic:

创建命令请看第二步,新的topic请配置到spark的Producer中

,创建控制台消费者

 

第五步

Java后台消费kafka日志

重点ar包:

kafka-clients-0.11.0.1.jar

kafka_2.11-0.11.0.1.jar

slf4j-api-1.7.25.jar

slf4j-log4j12-1.7.25.jar

log4j-1.2.17.jar

 

普通Java工程

代码解析:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class Consumer{

	//0.11.0.0版本后使用KafkaConsumer,,版本0.11.0.0之前使用ConsumerConnector
    private final KafkaConsumer<Integer, String> consumer;
    private String topic;

    public Consumer(String topic) {
        Properties props = new Properties();
		//KafkaProperties是自定义接口文件,用于存放静态参数
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
		
		//这里消费组名貌似也有不能重复的嫌疑,每次运行建议修改一下
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-group101");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }

    public void doWork() {
	
		//设置topic
        consumer.subscribe(Collections.singletonList(topic));
        ConsumerRecords<Integer, String> records = null;
		
		//循环消费数据,每次请求都会把还没消费过的数据全部请求回来
        while(true) {
			//这里7秒是每次请求数据的最大等待时间,因为前面spark设置的6秒处理一次,这里用6秒,kafka中转可能延迟
        	records = consumer.poll(7000);
        	System.out.println("===========================");
        	System.out.println("接收数据条数:"+records.count());
        	  for (ConsumerRecord<Integer, String> record : records) {
                  System.out.println(record.value()+"=="+ record.offset());
              }
        	  System.out.println("===========================");
        }
    }
}

 

 

文章评论

我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
一个程序员的时间管理
一个程序员的时间管理
漫画:程序员的工作
漫画:程序员的工作
旅行,写作,编程
旅行,写作,编程
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
中美印日四国程序员比较
中美印日四国程序员比较
那些争议最大的编程观点
那些争议最大的编程观点
Java程序员必看电影
Java程序员必看电影
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
编程语言是女人
编程语言是女人
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
为什么程序员都是夜猫子
为什么程序员都是夜猫子
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
我的丈夫是个程序员
我的丈夫是个程序员
程序员的鄙视链
程序员的鄙视链
老程序员的下场
老程序员的下场
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
如何成为一名黑客
如何成为一名黑客
10个调试和排错的小建议
10个调试和排错的小建议
代码女神横空出世
代码女神横空出世
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
总结2014中国互联网十大段子
总结2014中国互联网十大段子
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
每天工作4小时的程序员
每天工作4小时的程序员
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
程序员应该关注的一些事儿
程序员应该关注的一些事儿
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
鲜为人知的编程真相
鲜为人知的编程真相
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有