您的位置:首页 > 其它

第24课:使用MAT动态分析Spark应用程序初体验

2016-05-29 11:12 716 查看
第24课:使用MAT动态分析Spark应用程序初体验

/* 王家林老师授课http://weibo.com/ilovepains  每天晚上20:00YY频道现场授课频道

68917580*/

1 如何使用mat直接分析运行中的spark程序

2 用mat直接分析运行中的spark程序初体验

 

MAT动态连接ecliplse连不上,

Acquire a heap dump from a locally running Java process.是空白的;

动态连不了,只好想办法,静态来查看spark的运行情况,人工造出一个OnOutOfMemoryError,生成IMFheap2.bin,就可以查看spark相关的内容了

WordCount 运行结果:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/05/29 17:37:04 INFO SparkContext: Running Spark version 1.6.0
16/05/29 17:37:07 INFO SecurityManager: Changing view acls to: admin
......
and : 10
command, : 2
./dev/run-tests : 1
sample : 1
16/05/29 17:37:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
16/05/29 17:37:30 INFO DAGScheduler: ResultStage 1 (foreach at WordCount.java:90) finished in 0.103 s
16/05/29 17:37:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 101 ms on localhost (1/1)
16/05/29 17:37:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/05/29 17:37:30 INFO DAGScheduler: Job 0 finished: foreach at WordCount.java:90, took 0.623398 s
16/05/29 17:37:30 INFO SparkUI: Stopped Spark web UI at http://192.168.3.4:4040 16/05/29 17:37:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/05/29 17:37:30 INFO MemoryStore: MemoryStore cleared
16/05/29 17:37:30 INFO BlockManager: BlockManager stopped
16/05/29 17:37:30 INFO BlockManagerMaster: BlockManagerMaster stopped
16/05/29 17:37:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/05/29 17:37:30 INFO SparkContext: Successfully stopped SparkContext
16/05/29 17:37:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/05/29 17:37:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
java.lang.OutOfMemoryError: Java heap space
Dumping heap to d:/IMFheap2.bin ...
16/05/29 17:37:50 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Heap dump file created [421100647 bytes in 18.728 secs]
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at com.dt.spark.SparkApps.cores.WordCount.main(WordCount.java:109)
16/05/29 17:37:50 INFO ShutdownHookManager: Shutdown hook called
16/05/29 17:37:50 INFO ShutdownHookManager: Deleting directory C:\Users\admin\AppData\Local\Temp\spark-5acb6e7b-8fdf-4d88-bbe3-cab7c9f3ce94

源代码

public class WordCount {

public static void main(String[] args) {

/**
* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1G的内存)的初学者 *
*/
SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local");

/**
* 第2步:创建SparkContext对象
* SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala、Java、Python、R等都必须有一个SparkContext(不同的语言具体的类名称不同,如果是Java的话则为JavaSparkContext)
* SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
* 同时还会负责Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
JavaSparkContext sc = new JavaSparkContext(conf); //其底层实际上就是Scala的SparkContext

/**
* 第3步:根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过JavaSparkContext来创建JavaRDD
* JavaRDD的创建基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其它的RDD操作
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
// JavaRDD lines = sc.textFile("D://Big_Data_Software//spark-1.6.0-bin-hadoop2.6//README.md");
JavaRDD lines = sc.textFile( "G://IMFBigDataSpark2016//Bigdata_Software//spark-1.6.0-bin-hadoop2.6//spark-1.6.0-bin-hadoop2.6//spark-1.6.0-bin-hadoop2.6//README.md");

/**
* 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.1步:讲每一行的字符串拆分成单个的单词
*/
JavaRDD words = lines.flatMap(new FlatMapFunction() { //如果是Scala,由于SAM转换,所以可以写成val words = lines.flatMap { line => line.split(" ")}

@Override
public Iterable call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});

/**
* 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)
*/
JavaPairRDD pairs = words.mapToPair(new PairFunction() {

@Override
public Tuple2 call(String word) throws Exception {
return new Tuple2(word, 1);
}
});

/**
* 第4步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
*/
JavaPairRDD wordsCount = pairs.reduceByKey(new Function2() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)

@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});

wordsCount.foreach(new VoidFunction>() {

@Override
public void call(Tuple2 pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2);

}
});

sc.close();
/*while (true) {

}*/

final int size = 128 *1024 *1024 ;
byte [] array1 =new byte [size];
byte [] array2 =new byte [size];
byte [] array3 =new byte [size];
byte [] array4 =new byte [size];
byte [] array5 =new byte [size];

// throw new OutOfMemoryError();

}

}

 



 

 

 

 



 

 



 

 

 

 

 

 



 

 

 

 



 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: