使用spark
2016-05-31 21:12
330 查看
更新说明
免密码登录for f in `cat ~/machines`; do scp .ssh/id_dsa.pub $f:~/ ssh $f "cat id_dsa.pub >> .ssh/authorized_keys" done
安装
for f in `cat machines `; do scp ~/software/soft/spark-2.1.0-bin-hadoop2.7.tgz $f:~/ ; ssh -t $f " sudo mkdir /opt/spark cd /opt/spark/; sudo tar -zxvf /home/keke.zhaokk/spark-2.1.0-bin-hadoop2.7.tgz ; sudo ln -s spark-2.1.0-bin-hadoop2.7 current; sudo chown -R keke.zhaokk /opt/spark/*; " done
修改配置
注意
SPARK_MASTER_IP=10.11.143.30需要改为
SPARK_MASTER_HOST=10.11.143.30
复制配置
for f in `cat machines `; do scp -r $SPARK_HOME/conf/ $f:~/ ssh $f " mv conf/slaves $SPARK_HOME/conf/ mv conf/*.sh $SPARK_HOME/conf/ " done
spark-集群搭建
机器角色 | IP | 安装软件 |
---|---|---|
worker | 10.11.143.24 | python(Anaconda3 4.0.0), R, spark-2.X-bin-hadoopX.X |
worker | 10.11.143.26 | python(Anaconda3 4.0.0), R, spark-2.X-bin-hadoopX.X |
master | 10.11.143.30 | python(Anaconda3 4.0.0), R, spark-2.X-bin-hadoopX.X |
http://10.11.143.30:8080/
集群通过
start-all.sh,
stop-all.sh来开启和停止.
在脚本使用
spark-shell --total-executor-cores 30 --executor-memory 50g \ --master spark://10.11.143.30:7077 # 或者 pyspark --total-executor-cores 10 --executor-memory 50g \ --master spark://10.11.143.30:7077
在Rstudio中使用
## 配置 library(SparkR) sc = sparkR.init( master = "spark://10.11.143.30:7077", sparkEnvir = list(spark.executor.memory="100g",spark.cores.max="20") ) sqlContext = sparkRSQL.init(sc) ## 处理数据 ### 本地数据 read.df(sqlContext, paste0(Sys.getenv("SPARK_HOME"), "/examples/src/main/resources/people.json" ), "json") -> people head(people) ### 数据 createDataFrame( sqlContext, data_dataframe) -> data_spark head(data_spark) printSchema(data_spark) ### SQL registerTempTable(people,"people") teenagers = sql(sqlContext, "select name from people where age>13 and age<=19") ### 转为data.frame as.data.frame(teenagers) -> T ## stop sparkR.stop()
在jupyter中使用, 见最后
调用例子, 在上面任意Linux机器上
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkLR --master spark://10.11.143.30:7077 \ $SPARK_HOME/examples/jars/spark-examples_2.11-2.0.0-preview.jar > out.log
安装说明
解压spark-2.0.0-preview-bin-hadoop2.7.tgz到$SPARK_HOME目录下配置master机器, 使得root账号能无密码登录worker机器
master机器按照如下配置, 然后复制到worker机器上.
$sed -r '/^#/d;/^$/d' spark-env.sh SPARK_MASTER_IP=10.11.143.30 #SPARK_LOCAL_IP=10.11.143.26 # 26这台机器出现错误, 需要加上这句. [keke.zhaokk@r73f16622.zui /opt/spark/spark-2.0.0-preview-bin-hadoop2.7/conf] $sed -r '/^#/d;/^$/d' spark-defaults.conf spark.master spark://10.11.143.30:7077 spark.executor.memory 100g spark.cores.max 30 [keke.zhaokk@r73f16622.zui /opt/spark/spark-2.0.0-preview-bin-hadoop2.7/conf] $sed -r '/^#/d;/^$/d' slaves 10.11.143.24 10.11.143.26 10.11.143.30 $tail -6 /etc/bashrc export JAVA_HOME=/opt/taobao/install/ajdk-8.0.0-b60 export PATH=$PATH:$JAVA_HOME/bin # worker 机器不需要这句 export SPARK_HOME=/opt/spark/spark-2.0.0-preview-bin-hadoop2.7 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin # worker 机器不需要这句
Windows下使用spark(单机版)
将winutils.exe 放到%HADOOP_HOME%\bin 目录下解压 spark-2.0.0-preview-bin-hadoop2.7.tgz 到 %SPARK_HOME%
将%SPARK_HOME%\bin; %SPARK_HOME%\sbin; 添加到环境变量PATH中
在jupyter中使用
配置环境变量
import os,sys os.environ['SPARK_HOME'] = '/opt/spark/spark-2.0.0-preview-bin-hadoop2.7' os.environ['JAVA_HOME']='/opt/taobao/install/ajdk-8.0.0-b60' os.environ['PYSPARK_SUBMIT_ARGS']=''' --total-executor-cores 10 --executor-memory 50g --master spark://10.11.143.30:7077 pyspark-shell '''
调用pyspark
import os,sys spark_home = os.environ.get('SPARK_HOME', None) if any(map(lambda x: spark_home in x, sys.path)) is False: sys.path.insert(0, os.path.join(spark_home ,"python")) sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.1-src.zip')) with open(os.path.join(spark_home, "python/pyspark/shell.py")) as f: code = compile(f.read(), "shell.py", 'exec') exec(code) text_file = sc.textFile(spark_home + "/README.md") word_counts = text_file \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) len(word_counts.collect())
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.0.0-preview /_/ Using Python version 2.7.8 (default, Nov 27 2014 17:41:17) SparkSession available as 'spark'. 263
停止调用
sc.stop()
相关文章推荐
- 使用Nginx实现反向代理
- osgAnimation之作用对象
- [iOS] 如何自定义CollectionView中每个元素的大小和间距
- 网站分析
- 安卓之创建,读写文件,本地收藏,访问文件的权限和产品测试种类(方法)
- android使用selector实现按下去的效果注意事项
- 【BZOJ4405】挑战NPC 带花树模板 一般图最大匹配
- HDOJ--Rightmost Digit
- Linux内核模块编程
- 原生servlet配合smartupload实现批量下载和批量上传
- C语言 链表相关 操作
- Linux - 常用命令
- [iOS]UITableView回调和table相关成员方法详解
- ngrok内网发布到外网神器
- 深入学习java并发编程:Thread类的使用
- lookmore-Apache Commons包含开源工具
- osgAnimation之动画管理器
- Linux设置环境变量方法
- FZU 2147(找规律)
- <%@include file=""%>和 <jsp:include file="">之间的区别