第24课:使用MAT动态分析Spark应用程序初体验
2016-05-29 11:12
716 查看
第24课:使用MAT动态分析Spark应用程序初体验
/* 王家林老师授课http://weibo.com/ilovepains 每天晚上20:00YY频道现场授课频道
68917580*/
1 如何使用mat直接分析运行中的spark程序
2 用mat直接分析运行中的spark程序初体验
MAT动态连接ecliplse连不上,
Acquire a heap dump from a locally running Java process.是空白的;
动态连不了,只好想办法,静态来查看spark的运行情况,人工造出一个OnOutOfMemoryError,生成IMFheap2.bin,就可以查看spark相关的内容了
WordCount 运行结果:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/05/29 17:37:04 INFO SparkContext: Running Spark version 1.6.0
16/05/29 17:37:07 INFO SecurityManager: Changing view acls to: admin
......
and : 10
command, : 2
./dev/run-tests : 1
sample : 1
16/05/29 17:37:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
16/05/29 17:37:30 INFO DAGScheduler: ResultStage 1 (foreach at WordCount.java:90) finished in 0.103 s
16/05/29 17:37:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 101 ms on localhost (1/1)
16/05/29 17:37:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/05/29 17:37:30 INFO DAGScheduler: Job 0 finished: foreach at WordCount.java:90, took 0.623398 s
16/05/29 17:37:30 INFO SparkUI: Stopped Spark web UI at http://192.168.3.4:4040 16/05/29 17:37:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/05/29 17:37:30 INFO MemoryStore: MemoryStore cleared
16/05/29 17:37:30 INFO BlockManager: BlockManager stopped
16/05/29 17:37:30 INFO BlockManagerMaster: BlockManagerMaster stopped
16/05/29 17:37:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/05/29 17:37:30 INFO SparkContext: Successfully stopped SparkContext
16/05/29 17:37:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/05/29 17:37:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
java.lang.OutOfMemoryError: Java heap space
Dumping heap to d:/IMFheap2.bin ...
16/05/29 17:37:50 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Heap dump file created [421100647 bytes in 18.728 secs]
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at com.dt.spark.SparkApps.cores.WordCount.main(WordCount.java:109)
16/05/29 17:37:50 INFO ShutdownHookManager: Shutdown hook called
16/05/29 17:37:50 INFO ShutdownHookManager: Deleting directory C:\Users\admin\AppData\Local\Temp\spark-5acb6e7b-8fdf-4d88-bbe3-cab7c9f3ce94
源代码
public class WordCount {
public static void main(String[] args) {
/**
* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1G的内存)的初学者 *
*/
SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local");
/**
* 第2步:创建SparkContext对象
* SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala、Java、Python、R等都必须有一个SparkContext(不同的语言具体的类名称不同,如果是Java的话则为JavaSparkContext)
* SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
* 同时还会负责Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
JavaSparkContext sc = new JavaSparkContext(conf); //其底层实际上就是Scala的SparkContext
/**
* 第3步:根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过JavaSparkContext来创建JavaRDD
* JavaRDD的创建基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其它的RDD操作
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
// JavaRDD lines = sc.textFile("D://Big_Data_Software//spark-1.6.0-bin-hadoop2.6//README.md");
JavaRDD lines = sc.textFile( "G://IMFBigDataSpark2016//Bigdata_Software//spark-1.6.0-bin-hadoop2.6//spark-1.6.0-bin-hadoop2.6//spark-1.6.0-bin-hadoop2.6//README.md");
/**
* 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.1步:讲每一行的字符串拆分成单个的单词
*/
JavaRDD words = lines.flatMap(new FlatMapFunction() { //如果是Scala,由于SAM转换,所以可以写成val words = lines.flatMap { line => line.split(" ")}
@Override
public Iterable call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
/**
* 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)
*/
JavaPairRDD pairs = words.mapToPair(new PairFunction() {
@Override
public Tuple2 call(String word) throws Exception {
return new Tuple2(word, 1);
}
});
/**
* 第4步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
*/
JavaPairRDD wordsCount = pairs.reduceByKey(new Function2() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordsCount.foreach(new VoidFunction>() {
@Override
public void call(Tuple2 pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2);
}
});
sc.close();
/*while (true) {
}*/
final int size = 128 *1024 *1024 ;
byte [] array1 =new byte [size];
byte [] array2 =new byte [size];
byte [] array3 =new byte [size];
byte [] array4 =new byte [size];
byte [] array5 =new byte [size];
// throw new OutOfMemoryError();
}
}
![](https://oscdn.geek-share.com/Uploads/Images/Content/202005/31/72e0cd6a7514c026e8b179e1071eef6c)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202005/31/ac501972d9cc59b2109dea7ddf4bcd2d)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202005/31/d60f8121f9c0dda1d73c690d2c4846fe)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202005/31/00874a56a528b508e4821366aae27b5c)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202005/31/16805e24a750ca6c6047ec6bb404c21d)
/* 王家林老师授课http://weibo.com/ilovepains 每天晚上20:00YY频道现场授课频道
68917580*/
1 如何使用mat直接分析运行中的spark程序
2 用mat直接分析运行中的spark程序初体验
MAT动态连接ecliplse连不上,
Acquire a heap dump from a locally running Java process.是空白的;
动态连不了,只好想办法,静态来查看spark的运行情况,人工造出一个OnOutOfMemoryError,生成IMFheap2.bin,就可以查看spark相关的内容了
WordCount 运行结果:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/05/29 17:37:04 INFO SparkContext: Running Spark version 1.6.0
16/05/29 17:37:07 INFO SecurityManager: Changing view acls to: admin
......
and : 10
command, : 2
./dev/run-tests : 1
sample : 1
16/05/29 17:37:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
16/05/29 17:37:30 INFO DAGScheduler: ResultStage 1 (foreach at WordCount.java:90) finished in 0.103 s
16/05/29 17:37:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 101 ms on localhost (1/1)
16/05/29 17:37:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/05/29 17:37:30 INFO DAGScheduler: Job 0 finished: foreach at WordCount.java:90, took 0.623398 s
16/05/29 17:37:30 INFO SparkUI: Stopped Spark web UI at http://192.168.3.4:4040 16/05/29 17:37:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/05/29 17:37:30 INFO MemoryStore: MemoryStore cleared
16/05/29 17:37:30 INFO BlockManager: BlockManager stopped
16/05/29 17:37:30 INFO BlockManagerMaster: BlockManagerMaster stopped
16/05/29 17:37:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/05/29 17:37:30 INFO SparkContext: Successfully stopped SparkContext
16/05/29 17:37:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/05/29 17:37:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
java.lang.OutOfMemoryError: Java heap space
Dumping heap to d:/IMFheap2.bin ...
16/05/29 17:37:50 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Heap dump file created [421100647 bytes in 18.728 secs]
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at com.dt.spark.SparkApps.cores.WordCount.main(WordCount.java:109)
16/05/29 17:37:50 INFO ShutdownHookManager: Shutdown hook called
16/05/29 17:37:50 INFO ShutdownHookManager: Deleting directory C:\Users\admin\AppData\Local\Temp\spark-5acb6e7b-8fdf-4d88-bbe3-cab7c9f3ce94
源代码
public class WordCount {
public static void main(String[] args) {
/**
* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1G的内存)的初学者 *
*/
SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local");
/**
* 第2步:创建SparkContext对象
* SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala、Java、Python、R等都必须有一个SparkContext(不同的语言具体的类名称不同,如果是Java的话则为JavaSparkContext)
* SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
* 同时还会负责Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
JavaSparkContext sc = new JavaSparkContext(conf); //其底层实际上就是Scala的SparkContext
/**
* 第3步:根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过JavaSparkContext来创建JavaRDD
* JavaRDD的创建基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其它的RDD操作
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
// JavaRDD lines = sc.textFile("D://Big_Data_Software//spark-1.6.0-bin-hadoop2.6//README.md");
JavaRDD lines = sc.textFile( "G://IMFBigDataSpark2016//Bigdata_Software//spark-1.6.0-bin-hadoop2.6//spark-1.6.0-bin-hadoop2.6//spark-1.6.0-bin-hadoop2.6//README.md");
/**
* 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.1步:讲每一行的字符串拆分成单个的单词
*/
JavaRDD words = lines.flatMap(new FlatMapFunction() { //如果是Scala,由于SAM转换,所以可以写成val words = lines.flatMap { line => line.split(" ")}
@Override
public Iterable call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
/**
* 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)
*/
JavaPairRDD pairs = words.mapToPair(new PairFunction() {
@Override
public Tuple2 call(String word) throws Exception {
return new Tuple2(word, 1);
}
});
/**
* 第4步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
* 第4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
*/
JavaPairRDD wordsCount = pairs.reduceByKey(new Function2() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordsCount.foreach(new VoidFunction>() {
@Override
public void call(Tuple2 pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2);
}
});
sc.close();
/*while (true) {
}*/
final int size = 128 *1024 *1024 ;
byte [] array1 =new byte [size];
byte [] array2 =new byte [size];
byte [] array3 =new byte [size];
byte [] array4 =new byte [size];
byte [] array5 =new byte [size];
// throw new OutOfMemoryError();
}
}
相关文章推荐
- c++11学习 override 和 final 关键字
- fatal error C1001: INTERNAL COMPILER ERROR (compiler file 'msc1.cpp', line 1)记录
- 学生信息管理系统----错误集锦(一)
- LeetCode:Letter Combinations of a Phone Number
- 【编程马拉松】【003-素数和】
- Java Web用户登录实例代码
- 专题三-动态规划算法总结
- 21. Merge Two Sorted Lists(Linked List)
- C++ vector 用法
- Android Agera 框架简单使用
- CentOS系统配置.ssh遇到port 22:No route to host问题的解决方法
- Android购物车初步实现2(功能篇)
- Codeforces Round #319 (Div. 2)E Points on Plane
- asp.net中的相关知识
- 解决python画图标签中文乱码
- Android 自定义控件学习之二 简单布局实例
- Apriori算法简介---关联规则的频繁项集算法
- Codeforces Round #319 (Div. 2) B Modulo Sum
- Android 自定义View (一)
- debian-Python-pip安装失败问题解决