您的位置:首页 > 编程语言

通过jupyter远程编写代码,并远程提交到spark集群执行

2018-03-03 18:08 866 查看
几个月前折腾了一番jupyter(web 代码编辑器),感觉jupyter在编写某些科学文章是比较灵活.不过几乎是单机版的,在生产环境应用有限.之前因为需要在集群上执行编辑的代码,遂查找资料,又折腾了一番,使得jupyter可以应用于集群环境.
我们的需求类似使用者可以写点简单的程序,然后在集群上面执行代码. 程序是python语言的.所以,集群是pyspark集群.
在spark包下面,会有pyspark 的jar包, 在/examples/src/main下面,也会有一些pyspark的示例代码.
下面介绍一下安装教程.
因为安装步骤较多,细节较多,本人可能会有遗漏. 而且,jupyter的这种应用,之前很少有人尝试;也采用了一些较新的组件,有需要的可以结合其它资料,交叉参考.
(1)安装jupyter.
链接: http://blog.csdn.net/cafebar123/article/details/78636826 (2)上面的链接里是jupyter的安装教程,安装完,主界面会有python和pyspark编辑器, 这里使用pyspark. python可以直接用代码片测试通过,包括使用%matplotlib inline 等命令行. 可是pyspark无法直接使用 magic 等命令行,需要继续安装额外的组件: sparkMagic.
sparkmagic安装:
Magic 有丰富的功能接口和命令行,可以增强spark/scala的功能,比如支持spark sql,spark dataframe。
pip install sparkmagic检查是否安装成功:
jupyter nbextension enable --py --sys-prefix widgetsnbextension找到sparkmagic的安装位置:
pip show sparkmagic比如我的是:/usr/local/lib/python2.7/site-packages进入该目录,找到sparkmagic
cd sparkmagic
ll
cd kernels
ll可以看到pyspark3kernelpysparkkernelsparkkernelsparkrkernelwrapperkernel回到sparkmagic目录,依次安装这些kernel:
jupyter-kernelspec install sparkmagic/kernels/sparkkernel
jupyter-kernelspec install sparkmagic/kernels/pysparkkernel
jupyter-kernelspec install sparkmagic/kernels/pyspark3kernel
jupyter-kernelspec install sparkmagic/kernels/sparkrkernel编辑config.json文件:
sudo vi /home/infosouth/.sparkmagic/config.json
原来是空文件,添加:{
 "kernel_python_credentials" : {
"username": "",
"password": "",
"url": "http://Master:8990",
"auth": "None"


},
 "kernel_scala_credentials" : {
"username": "",
"password": "",
"url": "http://Master:8990",
"auth": "None"


},
 "kernel_r_credentials": {
"username": "",
"password": "",
"url": "http://Master:8990",


},
 "logging_config": {
"version": 1,
"formatters": {
 "magicsFormatter": {
   "format": "%(asctime)s\t%(levelname)s\t%(message)s",
   "datefmt": ""
}
},
"handlers": {
 "magicsHandler": {
   "class": "hdijupyterutils.filehandler.MagicsFileHandler",
   "formatter": "magicsFormatter",
   "home_path": "~/.sparkmagic"
}
},
"loggers": {
 "magicsLogger": {
   "handlers": ["magicsHandler"],
   "level": "DEBUG",
   "propagate": 0
}
}


},
 "wait_for_idle_timeout_seconds": 15,
 "livy_session_startup_timeout_seconds": 60,
 "fatal_error_suggestion": "The code failed because of a fatal error:\n\t{}.\n\nSome things to try:\na) Make sure Spark has enough available resources for Jupyter to create a Spark context.\nb) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.\nc) Restart the kernel.",
 "ignore_ssl_errors": false,

 "session_configs": {

"driverMemory": "1000M",
"executorCores": 2

},

 "use_auto_viz": true,
 "coerce_dataframe": true,
 "max_results_sql": 2500,
 "pyspark_dataframe_encoding": "utf-8",

 "heartbeat_refresh_seconds": 30,
 "livy_server_heartbeat_timeout_seconds": 0,
 "heartbeat_retry_seconds": 10,

 "server_extension_default_kernel_name": "pysparkkernel",
 "custom_headers": {},

 "retry_policy": "configurable",
 "retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5],
 "configurable_retry_policy_max_retries": 8
}config.json 里面的Master 需要在host文件里面配置. Master同时也是主节点的别名.
设置sparkmagic拓展可用:
jupyter serverextension enable --py sparkmagic安装完,pyspark编辑器试下命令:lsmagic
试下出现的命令是否可用.

pyspark 测试:

from pyspark import SparkContext
sc = SparkContext()sqlContext = SQLContext(sc)读csv,生成df:
test = sqlContext.load(source="com.databricks.spark.csv", path = 'hdfs://10.0.0.120:9000/infosouth/exports/tmp1511161959.csv', header = True,inferSchema = True)
test.printSchema()
test.count()
len(test.columns), test.columns
test.describe('TEST_ALTSTD_1').show()
test.select('TEST_ALTSTD_1','Age').show(5)
不重复的行:
test.select('TEST_ALTSTD_1','').dropDuplicates().show()增加新列:
test.withColumn('TEST_ALTSTD_1_test', test.TEST_ALTSTD_1 /2.0).select('TEST_ALTSTD_1','TEST_ALTSTD_1_new').show(5)
df.registerTempTable('temple_table')
创建df:
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)画图(1):%pyspark
import matplotlib.pyplot as plt

//Test data

x = [1, 2, 3, 4, 5, 6, 7, 8]
y = [20, 21, 20.5, 20.81, 21.0, 21.48, 22.0, 21.89]

//Plot

plt.plot(x, y, linestyle='dashed', marker='o', color='red')
%pyspark
show(plt)
画图(2):import matplotlib.pyplot as pyp
x = [0, 2, 4, 6, 8]
y = [0, 3, 3, 7, 0]
pyp.plot(x, y)
pyp.savefig("MyFirstPlot.png")画图(3):import numpy as np
import pylab as pl
x = [1, 2, 3, 4, 5]

# Make an array of y values for each x value

y = [1, 4, 9, 16, 25]
pl.plot(x, y)
pl.show()报错:
ImportError: No module named _tkinter解决:
sudo yum -y install tkinter tcl-devel tk-develall tkinter再安装:sudo yum install tclsudo yum install tktcl ,tk 版本要和tcl-dev,tk-dev 一致安装完,重新编译python报错2:no display name and no $DISPLAY environment variable解决:在代码头:import matplotlibmatplotlib.use('Agg')报错3:图形不显示安装Tkinter并且,修改matplotlibrc文件matplotlibrc文件的位置在:/usr/local/lib/python2.7/site-packages/matplotlib/mpl-data/修改位置:backend : youragg


接下来,安装python环境

安装python,具体请百度.实际步骤也很多,包括安装pip等许多组件,最好安装完整的python,不然终究会遇到报错的.有必要,本人会再把完整的python环境安装步骤放上来.

接下来,安装Livy 组件.

Livy组件原来是Hub框架的一部分,后来分离单独做开发.这里用的是单独版. 这里用Livy 的REST的请求方式,来实现将代码片远程分布式提交到集群中去.
Livy会提供一个类似客户端连接的REST请求方式, 它是一种长连接,连接之后,就可以远程上传代码片了.
安装:
文档链接:https://github.com/cloudera/livy

连接这里是一些Livy工作方式的介绍包括配置方法.

Building Livy

Livy is built using Apache Maven. To check out and build Livy, run:
git clone https://github.com/cloudera/livy.git cd livy
mvn package最好使用以下命令,会全部编译成功.遇到下载出错或下载不了,多试几次或添加google dns等方式解决.
mvn -DskipTests clean packageBy default Livy is built against Apache Spark 1.6.2, but the version of Spark used when running Livy does not need to match the version used to build Livy. Livy internally uses reflection to mitigate the gaps between different Spark versions, also Livy package itself does not contain a Spark distribution, so it will work with any supported version of Spark (Spark 1.6+) without needing to rebuild against specific version of Spark.(注:Livy默认是构建在spark1.6.2之上的,因此,spark版本最好用1.6.2以上的版本.hadoop最好用2.7 以上的版本.本人之前用spark1.5尝试过,Livy会无法启动,报错.)

Running Livy

In order to run Livy with local sessions, first export these variables:
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf(注:上面的2行命令,需要在Livy 的/conf 下面的配置文件里面修改.)Then start the server with:
./bin/livy-serverLivy uses the Spark configuration under
SPARK_HOME
by default. You can override the Spark configuration by setting the
SPARK_CONF_DIR
environment variable before starting Livy.It is strongly recommended to configure Spark to submit applications in YARN cluster mode. That makes sure that user sessions have their resources properly accounted for in the YARN cluster, and that the host running the Livy server doesn't become overloaded when multiple user sessions are running.

Livy Configuration

Livy uses a few configuration files under configuration the directory, which by default is the
conf
directory under the Livy installation. An alternative configuration directory can be provided by setting the
LIVY_CONF_DIR
environment variable when starting Livy.The configuration files used by Livy are:
livy.conf
: contains the server configuration. The Livy distribution ships with a default configuration file listing available configuration keys and their default values.
spark-blacklist.conf
: list Spark configuration options that users are not allowed to override. These options will be restricted to either their default values, or the values set in the Spark configuration used by Livy.
log4j.properties
: configuration for Livy logging. Defines log levels and where log messages will be written to. The default configuration will print log messages to stderr.
运行livy需要python的以下依赖库:
cloudpickle requests flake8 flaky pytest

(注:Livy也会需要python的许多依赖,所以才需要python环境安装完整.以上几个依赖使用pip安装.)

提交代码到集群

 先启动pyspark集群,启动方式是用spark shell 的方式启动的.具体,搜索下pyspark的启动方式.
然后启动Livy server, 没报错,说明当前版本互相适配,日志也会打印相关环境信息.
然后启动jupyter. jupyter 里面的pyspark kernel 启动的时候,有可能与之前pyspark 启动有冲突,注意下启动命令.如果出现冲突,回顾下前面的spark magic的安装内容,里面的pyspark kernel 安装配置方式会有帮助
接着,在jupyter的pyspark 编辑器里面连接Livy服务,连接方式:

接下来,在jupyter的pyspark 编辑器里面写代码--提交执行--结果

下面是本人写的pyspark编辑器简易教程:

PySpark简介

PySpark 是加入Livy 和sparkmagic 服务的pyspark编辑器(没有livy服务会无法使用&&livy需要配集群模式)。sparkmagic 为pyspark提供了许多方便的命令行功能。比如下面的命令。PySpark支持spark1.6以上版本。In [1]:
%lsmagic
Out[1]:
Available line magics:
%_do_not_call_change_endpoint  %alias  %alias_magic  %autocall  %automagic  %autosave  %bookmark  %cat  %cd  %clear  %colors  %config  %connect_info  %cp  %debug  %dhist  %dirs  %doctest_mode  %ed  %edit  %env  %gui  %hist  %history  %install_default_config  %install_ext  %install_profiles  %killbgscripts  %ldir  %less  %lf  %lk  %ll  %load  %load_ext  %loadpy  %logoff  %logon  %logstart  %logstate  %logstop  %ls  %lsmagic  %lx  %macro  %magic  %man  %matplotlib  %mkdir  %more  %mv  %notebook  %page  %pastebin  %pdb  %pdef  %pdoc  %pfile  %pinfo  %pinfo2  %popd  %pprint  %precision  %profile  %prun  %psearch  %psource  %pushd  %pwd  %pycat  %pylab  %qtconsole  %quickref  %recall  %rehashx  %reload_ext  %rep  %rerun  %reset  %reset_selective  %rm  %rmdir  %run  %save  %sc  %set_env  %store  %sx  %system  %tb  %time  %timeit  %unalias  %unload_ext  %who  %who_ls  %whos  %xdel  %xmode

Available cell magics:
%%!  %%HTML  %%SVG  %%_do_not_call_change_language  %%_do_not_call_delete_session  %%_do_not_call_start_session  %%bash  %%capture  %%cleanup  %%configure  %%debug  %%delete  %%file  %%help  %%html  %%info  %%javascript  %%latex  %%local  %%logs  %%perl  %%prun  %%pypy  %%python  %%python2  %%python3  %%ruby  %%script  %%sh  %%spark  %%sql  %%svg  %%sx  %%system  %%time  %%timeit  %%writefile

Automagic is ON, % prefix IS NOT needed for line magics.

用Livy创建session来连接spark集群,一个session对应一个spark集群。

In [2]:
%reload_ext sparkmagic.magics
In [3]:
%manage_spark
Added endpoint http://10.0.0.1:8998 Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
1Nonepysparkidle  
SparkSession available as 'spark'.
先添加一个Endpoint ,Address: http://10.0.0.1:8998 ,再添加一个Session,Session中 Endpoint:http://10.0.0.1:8998,Name:自定义;Language:python ,这样就连上了spark集群。

写代码片,点shift+enter 执行, livy 会把代码片远程提交到集群上面执行。比如:

In [11]:
#from pyspark.conf import SparkConf
#spark = SparkSession.builder.master("yarn-client").appName("client_mode_test").config("spark.some.config.option", "some-value").getOrCreate()
user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv")
header = user_data.first()
user_data = user_data.filter(lambda line: line != header)
#去掉第一行的csv数据
user_fields = user_data.map(lambda line: line.split(","))
#定义一个字符串转int的函数
def strToInt(str):
if str == None:
return 0
elif str =="":
return 0
else:
temp = int(str)
return temp
user_fields2 = user_fields.map(lambda x: strToInt(x[0]))
user_fields2.count()
105912
查看执行时间,在代码片开头加上:In [ ]:
%%timeit
查看日志In [6]:
%spark logs
其它命令In [7]:
%spark?
查看spark task信息方式:http://10.0.0.1:8998 或 %%info 命令In [8]:
%%info
Current session configs: {u'executorCores': 2, u'kind': 'pyspark', u'driverMemory': u'1000M'}
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
0Nonepysparkidle   
1Nonepysparkidle   

pyspark使用matplotlib等画图表

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.plot([1,1.6,3])
Out[1]:
[<matplotlib.lines.Line2D at 0x5d51a50>]
调整图片属性,比如大小:In [4]:
%matplotlib notebook
#调用2次可能会防止一些错误出现
%matplotlib notebook
import matplotlib.pyplot as plt
plt.plot([1,1.6,3])
Out[4]:
[<matplotlib.lines.Line2D at 0x66c0210>]

绘制HDFS中数据的图表

读取csvIn [1]:
import numpy as np
user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv")
header = user_data.first()
user_data = user_data.filter(lambda line: line != header)
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
7Nonepysparkidle  
SparkSession available as 'spark'.
读取csv文件中一个参数,并绘制前100行的图表,绘线性图:In [4]:
#定义一个字符串转int的函数
def strToInt(str):
if str == None:
return 0
elif str =="":
return 0
else:
temp = int(str)
return temp

import matplotlib.pyplot as plt
import numpy as np user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv") header = user_data.first() user_data = user_data.filter(lambda line: line != header)#去掉第一行的csv数据
user_fields = user_data.map(lambda line: line.split(","))
user_fields2 = user_fields.map(lambda x: strToInt(x[10]))
#获取100行SECONDS数据
mytestList = user_fields2.take(100)
#x轴数字
numpyList = np.arange(1,len(mytestList)+1,1)

x = numpyList
y = mytestList

plt.plot(x, y)
#保存图片
out_png = '/home/infosouth/jupyter_script/test_seconds_linechart.png'
plt.savefig(out_png, dpi=150)
查看图片In [5]:
%matplotlib inline
import os
from IPython.display import display, Image
display(Image('/home/infosouth/jupyter_script/test_seconds_linechart.png', width=500))
绘散点图In [1]:
#定义一个字符串转int的函数
def strToInt(str):
if str == None:
return 0
elif str =="":
return 0
else:
temp = int(str)
return temp

import matplotlib.pyplot as plt
import numpy as np user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv") header = user_data.first() user_data = user_data.filter(lambda line: line != header)#去掉第一行的csv数据
user_fields = user_data.map(lambda line: line.split(","))
user_fields2 = user_fields.map(lambda x: strToInt(x[10]))
#获取100行SECONDS数据
mytestList = user_fields2.take(100)
#x轴数字
mytestList = user_fields2.take(100)

x = numpyList
y = mytestList
plt.plot(x, y, 'ro')

#保存图片
out_png = '/home/infosouth/jupyter_script/test_seconds_dotchart.png'
plt.savefig(out_png, dpi=150)
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
2Nonepysparkidle  
SparkSession available as 'spark'.
In [2]:
%matplotlib inline
import os

from IPython.display import display, Image
display(Image('/home/infosouth/jupyter_script/test_seconds_dotchart.png', width=500))
绘直方图In [2]:
#定义一个字符串转int的函数
def strToInt(str):
if str == None:
return 0
elif str =="":
return 0
else:
temp = int(str)
return temp

import matplotlib.pyplot as plt
import numpy as np user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv") header = user_data.first() user_data = user_data.filter(lambda line: line != header)#去掉第一行的csv数据
user_fields = user_data.map(lambda line: line.split(","))
user_fields2 = user_fields.map(lambda x: strToInt(x[10]))
#获取100行SECONDS数据
mytestList = user_fields2.take(100)
#x轴数字
numpyList = np.arange(1,len(mytestList)+1,1)

x = numpyList
y = mytestList
plt.bar(x, y)

#保存图片
test_seconds_zhifangchart_png = '/home/infosouth/jupyter_script/test_seconds_zhifangchart.png'
plt.savefig(test_seconds_zhifangchart_png, dpi=150)
In [3]:
%matplotlib inline
import os

from IPython.display import display, Image
display(Image('/home/infosouth/jupyter_script/test_seconds_zhifangchart.png', width=500))

以上是PySpark编辑器 的使用简介. 执行程序的过程是分布式的.

缺点和不足:

(1)不知是pyspark kernel 执行效率问题,还是 Livy 服务效率原因,每次连接Livy后,头几次执行代码片,执行结果还是比较快返回的,执行次数多了,结果返回反而比较慢,也有类似卡死的情况,就是编辑器一直是"busy"中;
(2)在pyspark中绘图,由于用到的绘图包是matlib,以及plot等和python用的绘图包一样. 该绘图时,会把产生的图形临时放在内存中,不会及时清除,导致本人重复执行同一绘图代码片时,会出现图形叠加的情况,不知有哪位知道原因,烦请留言告知.
(3)该pyspark编辑器要实现"远程提交代码片到pyspark集群执行"的功能,需要的组件比较多,安装过程也比较繁杂,也容易出错,实用性会大打折扣,不知有没朋友有更好的解决思路和方案?
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐