通过jupyter远程编写代码,并远程提交到spark集群执行
2018-03-03 18:08
866 查看
几个月前折腾了一番jupyter(web 代码编辑器),感觉jupyter在编写某些科学文章是比较灵活.不过几乎是单机版的,在生产环境应用有限.之前因为需要在集群上执行编辑的代码,遂查找资料,又折腾了一番,使得jupyter可以应用于集群环境.
我们的需求类似使用者可以写点简单的程序,然后在集群上面执行代码. 程序是python语言的.所以,集群是pyspark集群.
在spark包下面,会有pyspark 的jar包, 在/examples/src/main下面,也会有一些pyspark的示例代码.
下面介绍一下安装教程.
因为安装步骤较多,细节较多,本人可能会有遗漏. 而且,jupyter的这种应用,之前很少有人尝试;也采用了一些较新的组件,有需要的可以结合其它资料,交叉参考.
(1)安装jupyter.
链接: http://blog.csdn.net/cafebar123/article/details/78636826 (2)上面的链接里是jupyter的安装教程,安装完,主界面会有python和pyspark编辑器, 这里使用pyspark. python可以直接用代码片测试通过,包括使用%matplotlib inline 等命令行. 可是pyspark无法直接使用 magic 等命令行,需要继续安装额外的组件: sparkMagic.
sparkmagic安装:
Magic 有丰富的功能接口和命令行,可以增强spark/scala的功能,比如支持spark sql,spark dataframe。
pip install sparkmagic检查是否安装成功:
jupyter nbextension enable --py --sys-prefix widgetsnbextension找到sparkmagic的安装位置:
pip show sparkmagic比如我的是:/usr/local/lib/python2.7/site-packages进入该目录,找到sparkmagic
cd sparkmagic
ll
cd kernels
ll可以看到pyspark3kernelpysparkkernelsparkkernelsparkrkernelwrapperkernel回到sparkmagic目录,依次安装这些kernel:
jupyter-kernelspec install sparkmagic/kernels/sparkkernel
jupyter-kernelspec install sparkmagic/kernels/pysparkkernel
jupyter-kernelspec install sparkmagic/kernels/pyspark3kernel
jupyter-kernelspec install sparkmagic/kernels/sparkrkernel编辑config.json文件:
sudo vi /home/infosouth/.sparkmagic/config.json
原来是空文件,添加:{
"kernel_python_credentials" : {
"username": "",
"password": "",
"url": "http://Master:8990",
"auth": "None"
},
"kernel_scala_credentials" : {
"username": "",
"password": "",
"url": "http://Master:8990",
"auth": "None"
},
"kernel_r_credentials": {
"username": "",
"password": "",
"url": "http://Master:8990",
},
"logging_config": {
"version": 1,
"formatters": {
"magicsFormatter": {
"format": "%(asctime)s\t%(levelname)s\t%(message)s",
"datefmt": ""
}
},
"handlers": {
"magicsHandler": {
"class": "hdijupyterutils.filehandler.MagicsFileHandler",
"formatter": "magicsFormatter",
"home_path": "~/.sparkmagic"
}
},
"loggers": {
"magicsLogger": {
"handlers": ["magicsHandler"],
"level": "DEBUG",
"propagate": 0
}
}
},
"wait_for_idle_timeout_seconds": 15,
"livy_session_startup_timeout_seconds": 60,
"fatal_error_suggestion": "The code failed because of a fatal error:\n\t{}.\n\nSome things to try:\na) Make sure Spark has enough available resources for Jupyter to create a Spark context.\nb) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.\nc) Restart the kernel.",
"ignore_ssl_errors": false,
"session_configs": {
"driverMemory": "1000M",
"executorCores": 2
},
"use_auto_viz": true,
"coerce_dataframe": true,
"max_results_sql": 2500,
"pyspark_dataframe_encoding": "utf-8",
"heartbeat_refresh_seconds": 30,
"livy_server_heartbeat_timeout_seconds": 0,
"heartbeat_retry_seconds": 10,
"server_extension_default_kernel_name": "pysparkkernel",
"custom_headers": {},
"retry_policy": "configurable",
"retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5],
"configurable_retry_policy_max_retries": 8
}config.json 里面的Master 需要在host文件里面配置. Master同时也是主节点的别名.
设置sparkmagic拓展可用:
jupyter serverextension enable --py sparkmagic安装完,pyspark编辑器试下命令:lsmagic
试下出现的命令是否可用.
sc = SparkContext()sqlContext = SQLContext(sc)读csv,生成df:
test = sqlContext.load(source="com.databricks.spark.csv", path = 'hdfs://10.0.0.120:9000/infosouth/exports/tmp1511161959.csv', header = True,inferSchema = True)
test.printSchema()
test.select('TEST_ALTSTD_1','').dropDuplicates().show()增加新列:
test.withColumn('TEST_ALTSTD_1_test', test.TEST_ALTSTD_1 /2.0).select('TEST_ALTSTD_1','TEST_ALTSTD_1_new').show(5)
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)画图(1):%pyspark
import matplotlib.pyplot as plt
//Test data
x = [1, 2, 3, 4, 5, 6, 7, 8]
y = [20, 21, 20.5, 20.81, 21.0, 21.48, 22.0, 21.89]
//Plot
plt.plot(x, y, linestyle='dashed', marker='o', color='red')
x = [0, 2, 4, 6, 8]
y = [0, 3, 3, 7, 0]
pyp.plot(x, y)
pyp.savefig("MyFirstPlot.png")画图(3):import numpy as np
import pylab as pl
x = [1, 2, 3, 4, 5]
# Make an array of y values for each x value
y = [1, 4, 9, 16, 25]
pl.plot(x, y)
pl.show()报错:
ImportError: No module named _tkinter解决:
sudo yum -y install tkinter tcl-devel tk-develall tkinter再安装:sudo yum install tclsudo yum install tktcl ,tk 版本要和tcl-dev,tk-dev 一致安装完,重新编译python报错2:no display name and no $DISPLAY environment variable解决:在代码头:import matplotlibmatplotlib.use('Agg')报错3:图形不显示安装Tkinter并且,修改matplotlibrc文件matplotlibrc文件的位置在:/usr/local/lib/python2.7/site-packages/matplotlib/mpl-data/修改位置:backend : youragg
Livy会提供一个类似客户端连接的REST请求方式, 它是一种长连接,连接之后,就可以远程上传代码片了.
安装:
文档链接:https://github.com/cloudera/livy
连接这里是一些Livy工作方式的介绍包括配置方法.
git clone https://github.com/cloudera/livy.git cd livy
mvn package最好使用以下命令,会全部编译成功.遇到下载出错或下载不了,多试几次或添加google dns等方式解决.
mvn -DskipTests clean packageBy default Livy is built against Apache Spark 1.6.2, but the version of Spark used when running Livy does not need to match the version used to build Livy. Livy internally uses reflection to mitigate the gaps between different Spark versions, also Livy package itself does not contain a Spark distribution, so it will work with any supported version of Spark (Spark 1.6+) without needing to rebuild against specific version of Spark.(注:Livy默认是构建在spark1.6.2之上的,因此,spark版本最好用1.6.2以上的版本.hadoop最好用2.7 以上的版本.本人之前用spark1.5尝试过,Livy会无法启动,报错.)
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf(注:上面的2行命令,需要在Livy 的/conf 下面的配置文件里面修改.)Then start the server with:
./bin/livy-serverLivy uses the Spark configuration under
运行livy需要python的以下依赖库:
cloudpickle requests flake8 flaky pytest
(注:Livy也会需要python的许多依赖,所以才需要python环境安装完整.以上几个依赖使用pip安装.)
然后启动Livy server, 没报错,说明当前版本互相适配,日志也会打印相关环境信息.
然后启动jupyter. jupyter 里面的pyspark kernel 启动的时候,有可能与之前pyspark 启动有冲突,注意下启动命令.如果出现冲突,回顾下前面的spark magic的安装内容,里面的pyspark kernel 安装配置方式会有帮助
接着,在jupyter的pyspark 编辑器里面连接Livy服务,连接方式:
接下来,在jupyter的pyspark 编辑器里面写代码--提交执行--结果
以上是PySpark编辑器 的使用简介. 执行程序的过程是分布式的.
(2)在pyspark中绘图,由于用到的绘图包是matlib,以及plot等和python用的绘图包一样. 该绘图时,会把产生的图形临时放在内存中,不会及时清除,导致本人重复执行同一绘图代码片时,会出现图形叠加的情况,不知有哪位知道原因,烦请留言告知.
(3)该pyspark编辑器要实现"远程提交代码片到pyspark集群执行"的功能,需要的组件比较多,安装过程也比较繁杂,也容易出错,实用性会大打折扣,不知有没朋友有更好的解决思路和方案?
我们的需求类似使用者可以写点简单的程序,然后在集群上面执行代码. 程序是python语言的.所以,集群是pyspark集群.
在spark包下面,会有pyspark 的jar包, 在/examples/src/main下面,也会有一些pyspark的示例代码.
下面介绍一下安装教程.
因为安装步骤较多,细节较多,本人可能会有遗漏. 而且,jupyter的这种应用,之前很少有人尝试;也采用了一些较新的组件,有需要的可以结合其它资料,交叉参考.
(1)安装jupyter.
链接: http://blog.csdn.net/cafebar123/article/details/78636826 (2)上面的链接里是jupyter的安装教程,安装完,主界面会有python和pyspark编辑器, 这里使用pyspark. python可以直接用代码片测试通过,包括使用%matplotlib inline 等命令行. 可是pyspark无法直接使用 magic 等命令行,需要继续安装额外的组件: sparkMagic.
sparkmagic安装:
Magic 有丰富的功能接口和命令行,可以增强spark/scala的功能,比如支持spark sql,spark dataframe。
pip install sparkmagic检查是否安装成功:
jupyter nbextension enable --py --sys-prefix widgetsnbextension找到sparkmagic的安装位置:
pip show sparkmagic比如我的是:/usr/local/lib/python2.7/site-packages进入该目录,找到sparkmagic
cd sparkmagic
ll
cd kernels
ll可以看到pyspark3kernelpysparkkernelsparkkernelsparkrkernelwrapperkernel回到sparkmagic目录,依次安装这些kernel:
jupyter-kernelspec install sparkmagic/kernels/sparkkernel
jupyter-kernelspec install sparkmagic/kernels/pysparkkernel
jupyter-kernelspec install sparkmagic/kernels/pyspark3kernel
jupyter-kernelspec install sparkmagic/kernels/sparkrkernel编辑config.json文件:
sudo vi /home/infosouth/.sparkmagic/config.json
原来是空文件,添加:{
"kernel_python_credentials" : {
"username": "",
"password": "",
"url": "http://Master:8990",
"auth": "None"
},
"kernel_scala_credentials" : {
"username": "",
"password": "",
"url": "http://Master:8990",
"auth": "None"
},
"kernel_r_credentials": {
"username": "",
"password": "",
"url": "http://Master:8990",
},
"logging_config": {
"version": 1,
"formatters": {
"magicsFormatter": {
"format": "%(asctime)s\t%(levelname)s\t%(message)s",
"datefmt": ""
}
},
"handlers": {
"magicsHandler": {
"class": "hdijupyterutils.filehandler.MagicsFileHandler",
"formatter": "magicsFormatter",
"home_path": "~/.sparkmagic"
}
},
"loggers": {
"magicsLogger": {
"handlers": ["magicsHandler"],
"level": "DEBUG",
"propagate": 0
}
}
},
"wait_for_idle_timeout_seconds": 15,
"livy_session_startup_timeout_seconds": 60,
"fatal_error_suggestion": "The code failed because of a fatal error:\n\t{}.\n\nSome things to try:\na) Make sure Spark has enough available resources for Jupyter to create a Spark context.\nb) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.\nc) Restart the kernel.",
"ignore_ssl_errors": false,
"session_configs": {
"driverMemory": "1000M",
"executorCores": 2
},
"use_auto_viz": true,
"coerce_dataframe": true,
"max_results_sql": 2500,
"pyspark_dataframe_encoding": "utf-8",
"heartbeat_refresh_seconds": 30,
"livy_server_heartbeat_timeout_seconds": 0,
"heartbeat_retry_seconds": 10,
"server_extension_default_kernel_name": "pysparkkernel",
"custom_headers": {},
"retry_policy": "configurable",
"retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5],
"configurable_retry_policy_max_retries": 8
}config.json 里面的Master 需要在host文件里面配置. Master同时也是主节点的别名.
设置sparkmagic拓展可用:
jupyter serverextension enable --py sparkmagic安装完,pyspark编辑器试下命令:lsmagic
试下出现的命令是否可用.
pyspark 测试:
from pyspark import SparkContextsc = SparkContext()sqlContext = SQLContext(sc)读csv,生成df:
test = sqlContext.load(source="com.databricks.spark.csv", path = 'hdfs://10.0.0.120:9000/infosouth/exports/tmp1511161959.csv', header = True,inferSchema = True)
test.printSchema()
test.count()
len(test.columns), test.columns
test.describe('TEST_ALTSTD_1').show()
test.select('TEST_ALTSTD_1','Age').show(5)不重复的行:
test.select('TEST_ALTSTD_1','').dropDuplicates().show()增加新列:
test.withColumn('TEST_ALTSTD_1_test', test.TEST_ALTSTD_1 /2.0).select('TEST_ALTSTD_1','TEST_ALTSTD_1_new').show(5)
df.registerTempTable('temple_table')创建df:
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)画图(1):%pyspark
import matplotlib.pyplot as plt
//Test data
x = [1, 2, 3, 4, 5, 6, 7, 8]
y = [20, 21, 20.5, 20.81, 21.0, 21.48, 22.0, 21.89]
//Plot
plt.plot(x, y, linestyle='dashed', marker='o', color='red')
%pyspark show(plt)画图(2):import matplotlib.pyplot as pyp
x = [0, 2, 4, 6, 8]
y = [0, 3, 3, 7, 0]
pyp.plot(x, y)
pyp.savefig("MyFirstPlot.png")画图(3):import numpy as np
import pylab as pl
x = [1, 2, 3, 4, 5]
# Make an array of y values for each x value
y = [1, 4, 9, 16, 25]
pl.plot(x, y)
pl.show()报错:
ImportError: No module named _tkinter解决:
sudo yum -y install tkinter tcl-devel tk-develall tkinter再安装:sudo yum install tclsudo yum install tktcl ,tk 版本要和tcl-dev,tk-dev 一致安装完,重新编译python报错2:no display name and no $DISPLAY environment variable解决:在代码头:import matplotlibmatplotlib.use('Agg')报错3:图形不显示安装Tkinter并且,修改matplotlibrc文件matplotlibrc文件的位置在:/usr/local/lib/python2.7/site-packages/matplotlib/mpl-data/修改位置:backend : youragg
接下来,安装python环境
安装python,具体请百度.实际步骤也很多,包括安装pip等许多组件,最好安装完整的python,不然终究会遇到报错的.有必要,本人会再把完整的python环境安装步骤放上来.接下来,安装Livy 组件.
Livy组件原来是Hub框架的一部分,后来分离单独做开发.这里用的是单独版. 这里用Livy 的REST的请求方式,来实现将代码片远程分布式提交到集群中去.Livy会提供一个类似客户端连接的REST请求方式, 它是一种长连接,连接之后,就可以远程上传代码片了.
安装:
文档链接:https://github.com/cloudera/livy
连接这里是一些Livy工作方式的介绍包括配置方法.
Building Livy
Livy is built using Apache Maven. To check out and build Livy, run:git clone https://github.com/cloudera/livy.git cd livy
mvn package最好使用以下命令,会全部编译成功.遇到下载出错或下载不了,多试几次或添加google dns等方式解决.
mvn -DskipTests clean packageBy default Livy is built against Apache Spark 1.6.2, but the version of Spark used when running Livy does not need to match the version used to build Livy. Livy internally uses reflection to mitigate the gaps between different Spark versions, also Livy package itself does not contain a Spark distribution, so it will work with any supported version of Spark (Spark 1.6+) without needing to rebuild against specific version of Spark.(注:Livy默认是构建在spark1.6.2之上的,因此,spark版本最好用1.6.2以上的版本.hadoop最好用2.7 以上的版本.本人之前用spark1.5尝试过,Livy会无法启动,报错.)
Running Livy
In order to run Livy with local sessions, first export these variables:export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf(注:上面的2行命令,需要在Livy 的/conf 下面的配置文件里面修改.)Then start the server with:
./bin/livy-serverLivy uses the Spark configuration under
SPARK_HOMEby default. You can override the Spark configuration by setting the
SPARK_CONF_DIRenvironment variable before starting Livy.It is strongly recommended to configure Spark to submit applications in YARN cluster mode. That makes sure that user sessions have their resources properly accounted for in the YARN cluster, and that the host running the Livy server doesn't become overloaded when multiple user sessions are running.
Livy Configuration
Livy uses a few configuration files under configuration the directory, which by default is theconfdirectory under the Livy installation. An alternative configuration directory can be provided by setting the
LIVY_CONF_DIRenvironment variable when starting Livy.The configuration files used by Livy are:
livy.conf: contains the server configuration. The Livy distribution ships with a default configuration file listing available configuration keys and their default values.
spark-blacklist.conf: list Spark configuration options that users are not allowed to override. These options will be restricted to either their default values, or the values set in the Spark configuration used by Livy.
log4j.properties: configuration for Livy logging. Defines log levels and where log messages will be written to. The default configuration will print log messages to stderr.
运行livy需要python的以下依赖库:
cloudpickle requests flake8 flaky pytest
(注:Livy也会需要python的许多依赖,所以才需要python环境安装完整.以上几个依赖使用pip安装.)
提交代码到集群
先启动pyspark集群,启动方式是用spark shell 的方式启动的.具体,搜索下pyspark的启动方式.然后启动Livy server, 没报错,说明当前版本互相适配,日志也会打印相关环境信息.
然后启动jupyter. jupyter 里面的pyspark kernel 启动的时候,有可能与之前pyspark 启动有冲突,注意下启动命令.如果出现冲突,回顾下前面的spark magic的安装内容,里面的pyspark kernel 安装配置方式会有帮助
接着,在jupyter的pyspark 编辑器里面连接Livy服务,连接方式:
接下来,在jupyter的pyspark 编辑器里面写代码--提交执行--结果
下面是本人写的pyspark编辑器简易教程:
PySpark简介
PySpark 是加入Livy 和sparkmagic 服务的pyspark编辑器(没有livy服务会无法使用&&livy需要配集群模式)。sparkmagic 为pyspark提供了许多方便的命令行功能。比如下面的命令。PySpark支持spark1.6以上版本。In [1]:%lsmagicOut[1]:
Available line magics: %_do_not_call_change_endpoint %alias %alias_magic %autocall %automagic %autosave %bookmark %cat %cd %clear %colors %config %connect_info %cp %debug %dhist %dirs %doctest_mode %ed %edit %env %gui %hist %history %install_default_config %install_ext %install_profiles %killbgscripts %ldir %less %lf %lk %ll %load %load_ext %loadpy %logoff %logon %logstart %logstate %logstop %ls %lsmagic %lx %macro %magic %man %matplotlib %mkdir %more %mv %notebook %page %pastebin %pdb %pdef %pdoc %pfile %pinfo %pinfo2 %popd %pprint %precision %profile %prun %psearch %psource %pushd %pwd %pycat %pylab %qtconsole %quickref %recall %rehashx %reload_ext %rep %rerun %reset %reset_selective %rm %rmdir %run %save %sc %set_env %store %sx %system %tb %time %timeit %unalias %unload_ext %who %who_ls %whos %xdel %xmode Available cell magics: %%! %%HTML %%SVG %%_do_not_call_change_language %%_do_not_call_delete_session %%_do_not_call_start_session %%bash %%capture %%cleanup %%configure %%debug %%delete %%file %%help %%html %%info %%javascript %%latex %%local %%logs %%perl %%prun %%pypy %%python %%python2 %%python3 %%ruby %%script %%sh %%spark %%sql %%svg %%sx %%system %%time %%timeit %%writefile Automagic is ON, % prefix IS NOT needed for line magics.
用Livy创建session来连接spark集群,一个session对应一个spark集群。
In [2]:%reload_ext sparkmagic.magicsIn [3]:
%manage_spark
Added endpoint http://10.0.0.1:8998 Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
1 | None | pyspark | idle | ✔ |
SparkSession available as 'spark'.先添加一个Endpoint ,Address: http://10.0.0.1:8998 ,再添加一个Session,Session中 Endpoint:http://10.0.0.1:8998,Name:自定义;Language:python ,这样就连上了spark集群。
写代码片,点shift+enter 执行, livy 会把代码片远程提交到集群上面执行。比如:
In [11]:#from pyspark.conf import SparkConf #spark = SparkSession.builder.master("yarn-client").appName("client_mode_test").config("spark.some.config.option", "some-value").getOrCreate() user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv") header = user_data.first() user_data = user_data.filter(lambda line: line != header) #去掉第一行的csv数据 user_fields = user_data.map(lambda line: line.split(",")) #定义一个字符串转int的函数 def strToInt(str): if str == None: return 0 elif str =="": return 0 else: temp = int(str) return temp user_fields2 = user_fields.map(lambda x: strToInt(x[0])) user_fields2.count()
105912查看执行时间,在代码片开头加上:In [ ]:
%%timeit查看日志In [6]:
%spark logs其它命令In [7]:
%spark?查看spark task信息方式:http://10.0.0.1:8998 或 %%info 命令In [8]:
%%infoCurrent session configs: {u'executorCores': 2, u'kind': 'pyspark', u'driverMemory': u'1000M'}
ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
0 | None | pyspark | idle | |||
1 | None | pyspark | idle |
pyspark使用matplotlib等画图表
In [1]:%matplotlib inline import matplotlib.pyplot as plt plt.plot([1,1.6,3])Out[1]:
[<matplotlib.lines.Line2D at 0x5d51a50>]调整图片属性,比如大小:In [4]:
%matplotlib notebook #调用2次可能会防止一些错误出现 %matplotlib notebook import matplotlib.pyplot as plt plt.plot([1,1.6,3])Out[4]:
[<matplotlib.lines.Line2D at 0x66c0210>]
绘制HDFS中数据的图表
读取csvIn [1]:import numpy as np user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv") header = user_data.first() user_data = user_data.filter(lambda line: line != header)
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
7 | None | pyspark | idle | ✔ |
SparkSession available as 'spark'.读取csv文件中一个参数,并绘制前100行的图表,绘线性图:In [4]:
#定义一个字符串转int的函数查看图片In [5]:
def strToInt(str):
if str == None:
return 0
elif str =="":
return 0
else:
temp = int(str)
return temp
import matplotlib.pyplot as plt
import numpy as np user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv") header = user_data.first() user_data = user_data.filter(lambda line: line != header)#去掉第一行的csv数据
user_fields = user_data.map(lambda line: line.split(","))
user_fields2 = user_fields.map(lambda x: strToInt(x[10]))
#获取100行SECONDS数据
mytestList = user_fields2.take(100)
#x轴数字
numpyList = np.arange(1,len(mytestList)+1,1)
x = numpyList
y = mytestList
plt.plot(x, y)
#保存图片
out_png = '/home/infosouth/jupyter_script/test_seconds_linechart.png'
plt.savefig(out_png, dpi=150)
%matplotlib inline import os from IPython.display import display, Image display(Image('/home/infosouth/jupyter_script/test_seconds_linechart.png', width=500))绘散点图In [1]:
#定义一个字符串转int的函数
def strToInt(str):
if str == None:
return 0
elif str =="":
return 0
else:
temp = int(str)
return temp
import matplotlib.pyplot as plt
import numpy as np user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv") header = user_data.first() user_data = user_data.filter(lambda line: line != header)#去掉第一行的csv数据
user_fields = user_data.map(lambda line: line.split(","))
user_fields2 = user_fields.map(lambda x: strToInt(x[10]))
#获取100行SECONDS数据
mytestList = user_fields2.take(100)
#x轴数字
mytestList = user_fields2.take(100)
x = numpyList
y = mytestList
plt.plot(x, y, 'ro')
#保存图片
out_png = '/home/infosouth/jupyter_script/test_seconds_dotchart.png'
plt.savefig(out_png, dpi=150)
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
2 | None | pyspark | idle | ✔ |
SparkSession available as 'spark'.In [2]:
%matplotlib inline import os from IPython.display import display, Image display(Image('/home/infosouth/jupyter_script/test_seconds_dotchart.png', width=500))绘直方图In [2]:
#定义一个字符串转int的函数In [3]:
def strToInt(str):
if str == None:
return 0
elif str =="":
return 0
else:
temp = int(str)
return temp
import matplotlib.pyplot as plt
import numpy as np user_data = sc.textFile("hdfs://10.0.0.1:9000/amas_tasks_script_results/test_csv_data.csv") header = user_data.first() user_data = user_data.filter(lambda line: line != header)#去掉第一行的csv数据
user_fields = user_data.map(lambda line: line.split(","))
user_fields2 = user_fields.map(lambda x: strToInt(x[10]))
#获取100行SECONDS数据
mytestList = user_fields2.take(100)
#x轴数字
numpyList = np.arange(1,len(mytestList)+1,1)
x = numpyList
y = mytestList
plt.bar(x, y)
#保存图片
test_seconds_zhifangchart_png = '/home/infosouth/jupyter_script/test_seconds_zhifangchart.png'
plt.savefig(test_seconds_zhifangchart_png, dpi=150)
%matplotlib inline import os from IPython.display import display, Image display(Image('/home/infosouth/jupyter_script/test_seconds_zhifangchart.png', width=500))
以上是PySpark编辑器 的使用简介. 执行程序的过程是分布式的.
缺点和不足:
(1)不知是pyspark kernel 执行效率问题,还是 Livy 服务效率原因,每次连接Livy后,头几次执行代码片,执行结果还是比较快返回的,执行次数多了,结果返回反而比较慢,也有类似卡死的情况,就是编辑器一直是"busy"中;(2)在pyspark中绘图,由于用到的绘图包是matlib,以及plot等和python用的绘图包一样. 该绘图时,会把产生的图形临时放在内存中,不会及时清除,导致本人重复执行同一绘图代码片时,会出现图形叠加的情况,不知有哪位知道原因,烦请留言告知.
(3)该pyspark编辑器要实现"远程提交代码片到pyspark集群执行"的功能,需要的组件比较多,安装过程也比较繁杂,也容易出错,实用性会大打折扣,不知有没朋友有更好的解决思路和方案?
相关文章推荐
- Windows 上面搭建 Spark + Pycharm/idea scala/python 本地编写Spark程序,测试通过后再提交到Linux集群上
- scala编写的Spark程序远程提交到服务器集群上运行
- java通过代码登录远程linux服务器并执行linux命令
- 从windows上提交代码到spark集群发现driver地址不通
- Spark组件之SparkR学习3--使用spark-submit向集群提交R代码文件data-manipulation.R
- win10下将spark的程序提交给远程集群中运行
- 通过 IDE 向 Storm 集群远程提交 topology
- 在window上提交spark代码到远程测试环境上运行
- Java Web提交参数到Spark集群执行任务
- Tomcat漏洞之——通过PUT远程代码执行
- 使用Intellij IDEA开发并提交Spark应用到远程Spark集群
- 简单Spark作业编写与提交执行
- 编写Spark程序并提交到集群上运行
- 在Eclipse中提交作业至远程的Hadoop集群上执行
- 3-2、Intellij IDEA开发、集群提交运行Spark代码
- Git 远程仓库(Github) Git 并不像 SVN 那样有个中心服务器。 目前我们使用到的 Git 命令都是在本地执行,如果你想通过 Git 分享你的代码或者与其他开发人员合作。 你就需要
- 意外作出了一个javascript的服务器,可以通过js调用并执行任何java(包括 所有java 内核基本库)及C#类库,并最终由 C# 执行你提交的javascript代码! 不敢藏私,特与大家分
- 分布式sparkSQL引擎应用:从远程通过thriftServer连接spark集群处理hive中的数据
- Web提交参数到Spark集群执行任务
- windons下远程提交任务搭配linux上的spark集群