您的位置:首页 > 其它

spark上手系列一(常见问题,样例演示)

2016-11-30 11:33 459 查看

spark实例

在现有的机器上完成的配置

省略安装的过程,目前机器上安装的路径是 /opt/spark,记为SPARK_HOME=/opt/spark

问题1:找不到对应的host机器

java.net.UnknownHostException: bjzw_102_229: bjzw_102_229

at java.net.InetAddress.getLocalHost(InetAddress.java:1652)

at org.apache.spark.util.Utils$.findLocalInetAddress(Utils.scala:821)

at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress$lzycompute(Utils.scala:814)

at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress(Utils.scala:814)

at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:871)
at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:871)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.util.Utils$.localHostName(Utils.scala:871)

at org.apache.spark.SparkContext.<init>(SparkContext.scala:387)

at org.apache.spark.SparkContext.<init>(SparkContext.scala:111)

at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:56)

at com.sogou.spark.sparkTest.main(sparkTest.java:21)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)


这个原因是因为程序需要找到名字为bjzw_102_229的主机,但是这个机器名不发解析,需要在本机的hosts中进行操作:

进入
/etc/hosts
配置内容:

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 10.152.102.229 bjzw_102_229


前两行是一直有的,主要是增加了第三行,在重新运行就解决了这个问题

问题2:运行spark-shell过程中显示配置不正确

选择最简单的方式,直接在终端运行spark-shell 出现
scala>
显示启动成功,运行一个简单的例子:
scala> val text1=sc.textFile("hdfs://*/*/sparkTest/log")
得到的结果:

16/11/29 19:37:33 INFO MemoryStore: ensureFreeSpace(192808) called with curMem=630758, maxMem=289910292
16/11/29 19:37:33 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 188.3 KB, free 275.7 MB)
16/11/29 19:37:33 INFO MemoryStore: ensureFreeSpace(17458) called with curMem=823566, maxMem=289910292
16/11/29 19:37:33 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 17.0 KB, free 275.7 MB)
16/11/29 19:37:33 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:64219 (size: 17.0 KB, free: 276.4 MB)
16/11/29 19:37:33 INFO SparkContext: Created broadcast 3 from textFile at <console>:21


显示已经正确读入

scala> text1.count
java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)


显示配置文件不对

一般spark的配置文件主要包含两个:

$SPARK_HOME/conf/spark-defaults.conf

$SPARK_HOME/conf/spark-env.sh

最有可能出现的问题应该适合:

spark-defaults.conf 中的路径有问题:

export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_PREFIX/lib/hadoop-lzo-0.4.19.jar


也可以配置:

spark.executor.extraClassPath

spark.driver.extraClassPath

这两个参数和SPARK_CLASSPATH是共享的,但是注意不能同时两个都进行配置,同时配置,会报错

注意SPARK_CLASSPATH这个路径需要没有问题

解决完成conf中的配置问题后继续,能够正确获得文件的行数

16/11/30 11:00:54 INFO DAGScheduler: ResultStage 0 (count at <console>:24) finished in 0.148 s
16/11/30 11:00:54 INFO DAGScheduler: Job 0 finished: count at <console>:24, took 0.269712 s
res0: Long = 23


问题3:本地gpl加载不了

在程序运行的过程中,哪怕是很简单的sample,可能会遇到类似的错误

java.lang.UnsatisfiedLinkError: gplcompression (Not found in java.library.path)
16/11/29 23:02:17 ERROR GPLNativeCodeLoader: Could not load native gpl library
java.lang.UnsatisfiedLinkError: gplcompression (Not found in java.library.path)
at java.lang.ClassLoader.loadLibraryWithPath(ClassLoader.java:1217)
at java.lang.ClassLoader.loadLibraryWithClassLoader(ClassLoader.java:1181)
at java.lang.System.loadLibrary(System.java:530)
at com.hadoop.compression.lzo.GPLNativeCodeLoader.<clinit>(GPLNativeCodeLoader.java:32)


显示本地的gpl显示不了,无法解析本地的lzo文件

直接接触的hadoop集群,对于lzo是默认可以解析的,但是yarn中的hadoop可能不没有加载这个解析程序,因此需要手动安装一下:

参考文章:http://blog.csdn.net/inkfish/article/details/5194022

安装的过程:

1 下载gpl压缩包进行安装,编译获得gpl相关.so文件

如果
$HADOOP_HOME/lib/native
中包含gpl的库就不用再进行处理,已经
$HADOOP_HOME/lib
已经有hadoop-gpl-compression-0.1.0.jar 包

1)下载:

wget http://hadoop-gpl-compression.googlecode.com/files/hadoop-gpl-compression-0.1.0-rc0.tar.gz[/code] 
自己尝试的时候发现这个网址已经失效了新的地址

https://code.google.com/archive/a/apache-extras.org/p/hadoop-gpl-compression/downloads

2)编译:

tar -xvf hadoop-gpl-compression-0.1.0-rc0.tar.gz


mv hadoop-gpl-compression-0.1.0/hadoop-gpl-compression-0.1.0.jar $HADOOP_HOME/lib/


mv hadoop-gpl-compression-0.1.0/lib/native/Linux-i386-32/* $HADOOP_HOME/lib/native/Linux-i386-32/


mv hadoop-gpl-compression-0.1.0/lib/native/Linux-amd64-64/* $HADOOP_HOME/lib/native/Linux-amd64-64/


如果目标路径中就包含这些文件,可以选择不再拷贝

2 安装lzo文件

1)下载:

wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.03.tar.gz[/code] 
2)编译:

tar -xvf lzo-2.03.tar.gz


sudo sh configure


sudo make


sudo make install


3)修改环境变量:

在$SPARK_HOME/conf/spark-env.sh中进行配置

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/hadoop2.0/lib/native
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_PREFIX/lib/hadoop-gpl-compression-0.1.0.jar


至此,之前gpl加载不了的问题就已经解决了

Sample:第一次的小程序

在eclipse中,新建一个java文件:

程序代码如下:

package com.spark;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;

public final class sparkTest{
public static void main(String[] args) throws Exception {
//context ,用于读文件 ,类似于scala的sc
//格式为:
// JavaSparkContext(master: String, appName: String, sparkHome: String, jars: Array[String], environment: Map[String, String])
//System.out.println(args[0]+":"+args[1]);
JavaSparkContext ctx = new JavaSparkContext();
//JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(sparkTest.class));
//也可以使用ctx获取环境变量,例如下面的语句
System.out.println("spark home:"+ctx.getSparkHome());

//一次一行,String类型    ,还有hadoopfile,sequenceFile什么的  ,可以直接用sc.textFile("path")
JavaRDD<String> lines = ctx.textFile("hdfs://master01.*/sparkTest/log", 1);  //java.lang.String path, int minSplits
lines.cache();   //cache,暂时放在缓存中,一般用于哪些可能需要多次使用的RDD,据说这样会减少运行时间

//collect方法,用于将RDD类型转化为java基本类型,如下
List<String> line = lines.collect();
for(String val:line)
System.out.println(val);

//下面这些也是RDD的常用函数
// lines.collect();  List<String>
// lines.union();     javaRDD<String>
// lines.top(1);     List<String>
// lines.count();      long
// lines.countByValue();

/**
*   filter test
*   定义一个返回bool类型的函数,spark运行filter的时候会过滤掉那些返回只为false的数据
*   String s,中的变量s可以认为就是变量lines(lines可以理解为一系列的String类型数据)的每一条数据
*/
JavaRDD<String> contaninsE = lines.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {

return (s.contains("they"));
}
});
System.out.println("--------------next filter's  result------------------");
line = contaninsE.collect();
for(String val:line)
System.out.println(val);

/**
* sample test
* sample函数使用很简单,用于对数据进行抽样
* 参数为:withReplacement: Boolean, fraction: Double, seed: Int
*
*/

JavaRDD<String> sampletest = lines.sample(false,0.1,5);
System.out.println("-------------next sample-------------------");
line = sampletest.collect();
for(String val:line)
System.out.println(val);

/**
*
* new FlatMapFunction<String, String>两个string分别代表输入和输出类型
* Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
*
* flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
* 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
* 可以这样写 :
*/

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
String[] words=s.split(" ");
return Arrays.asList(words);
}
});

/**
* map 键值对 ,类似于MR的map方法
* pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
* 需要重写call方法实现转换
*/
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
//A two-argument function that takes arguments
// of type T1 and T2 and returns an R.
/**
*  reduceByKey方法,类似于MR的reduce
*  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
*/
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {  //reduce阶段,key相同的value怎么处理的问题
return i1 + i2;
}
});

//备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
// reduce方法会对输入进来的所有数据进行两两运算

/**
* sort,顾名思义,排序
*/
JavaPairRDD<String,Integer> sort = counts.sortByKey();
System.out.println("----------next sort----------------------");

/**
* collect方法其实之前已经出现了多次,该方法用于将spark的RDD类型转化为我们熟知的java常见类型
*/
List<Tuple2<String, Integer>> output = sort.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1 + ": " + tuple._2());
}

/**
* 保存函数,数据输出,spark为结果输出提供了很多接口
*/
sort.saveAsTextFile("/tmp/spark-tmp/test");

// sort.saveAsNewAPIHadoopFile();
//  sort.saveAsHadoopFile();
System.exit(0);
}
}


对应着在hdfs://master01.*/sparkTest/log存储好需要分析的文件

编写完毕后,从eclipse中打包jar包上传到对应的服务器上

在linux上编写脚本,调用spark集群进行运算

#! /bin/bash
path=/search/odin/testSpark
cd $path

sh /opt/spark/bin/spark-submit \
--class com.spark.sparkTest sparkTest.jar


运行后的结果:

("The,1)
('biding,1)
(A,1)
(Danish,1)
(King,1)
(Nor,1)
(She,1)
(The,1)
(They,1)
(What,2)
(a,2)
(according,1)
(afterwards,1)
(all,1)
(always,1)
(an,3)
(and,12)
(angels',1)
(anything,1)
(are,1)
(are-,1)
(at,2)
(balsam,,1)
(be,1)
(been,1)
(before,2)
(below,1)
(blade,1)
(boiling,1)
(but,1)
(by,1)
(capital,,1)


附上使用的测试log文件内容:

ok! at the window there leans an old maid. She plucks the
withered leaf from the balsam, and looks at the grass-covered rampart,
on which many children are playing. What is the old maid thinking
of? A whole life drama is unfolding itself before her inward gaze.
"The poor little children, how happy they are- how merrily they
play and romp together! What red cheeks and what angels' eyes! but
they have no shoes nor stockings. They dance on the green rampart,
just on the place where, according to the old story, the ground always
sank in, and where a sportive, frolicsome child had been lured by
means of flowers, toys and sweetmeats into an open grave ready dug for
it, and which was afterwards closed over the child; and from that
moment, the old story says, the ground gave way no longer, the mound
remained firm and fast, and was quickly covered with the green turf.
The little people who now play on that spot know nothing of the old
tale, else would they fancy they heard a child crying deep below the
earth, and the dewdrops on each blade of grass would be to them
tears of woe. Nor do they know anything of the Danish King who here,
in the face of the coming foe, took an oath before all his trembling
courtiers that he would hold out with the citizens of his capital, and
die here in his nest; they know nothing of the men who have fought
here, or of the women who from here have drenched with boiling water
the enemy, clad in white, and 'biding in the snow to surprise the
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 实例 配置问题