您的位置:首页 > 其它

intellijidea连接spark集群

2015-09-24 09:30 344 查看
今天尝试在intellijidea里连接spark集群,报错:


java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi$$anonfun

原来是没有添加jar包,首先要把项目打成jar包,然后在sparkcontext里添加jar包


/*

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the “License”); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an “AS IS” BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

package org.apache.spark.examples

import scala.math.random

import org.apache.spark._

/* Computes an approximation to pi /

object SparkPi {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName(“Spark Pi”)

//conf.set(“master”,”spark://moon:7077”)

conf.setMaster(“spark://moon:7077”) //尝试在集群上运行 把VM参数-Dspark.master=local去掉了

val spark = new SparkContext(conf)

spark.addJar(“/usr/local/spark/IdeaProjects/out/artifacts/sparkPi/ideaprojects.jar”)//注意这一句一定要加!!

val slices = if (args.length > 0) args(0).toInt else 2

val n = 100000 * slices

//把集合中的元素复制到一个可并行操作的分布式数据集中,slices 表示一个数据集切分的份数,spark会在集群上为每一个切片运行一个任务

//其实spark会自动设置切片数

val count = spark.parallelize(1 to n, slices).map { i =>

val x = random * 2 - 1

val y = random * 2 - 1

if (x*x + y*y < 1) 1 else 0

}.reduce(_ + _)

println(“Pi is roughly ” + 4.0 * count / n)

spark.stop()

}

}

运行结果:

/usr/local/jdk1.7/bin/java -Didea.launcher.port=7532 -Didea.launcher.bin.path=/usr/local/spark/idea-IC-141.1532.4/bin -Dfile.encoding=UTF-8 -classpath /usr/local/jdk1.7/jre/lib/management-agent.jar:/usr/local/jdk1.7/jre/lib/jsse.jar:/usr/local/jdk1.7/jre/lib/plugin.jar:/usr/local/jdk1.7/jre/lib/jfxrt.jar:/usr/local/jdk1.7/jre/lib/javaws.jar:/usr/local/jdk1.7/jre/lib/charsets.jar:/usr/local/jdk1.7/jre/lib/jfr.jar:/usr/local/jdk1.7/jre/lib/jce.jar:/usr/local/jdk1.7/jre/lib/rt.jar:/usr/local/jdk1.7/jre/lib/deploy.jar:/usr/local/jdk1.7/jre/lib/resources.jar:/usr/local/jdk1.7/jre/lib/ext/zipfs.jar:/usr/local/jdk1.7/jre/lib/ext/sunjce_provider.jar:/usr/local/jdk1.7/jre/lib/ext/sunpkcs11.jar:/usr/local/jdk1.7/jre/lib/ext/dnsns.jar:/usr/local/jdk1.7/jre/lib/ext/localedata.jar:/usr/local/jdk1.7/jre/lib/ext/sunec.jar:/usr/local/spark/IdeaProjects/target/scala-2.10/classes:/home/hadoop/.sbt/boot/scala-2.10.4/lib/scala-library.jar:/usr/local/spark/spark-1.4.1-bin-hadoop2.4/lib/spark-assembly-1.4.1-hadoop2.4.0.jar:/usr/local/spark/idea-IC-141.1532.4/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain org.apache.spark.examples.SparkPi
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/09/24 09:25:24 INFO SparkContext: Running Spark version 1.4.1
15/09/24 09:25:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/24 09:25:25 WARN Utils: Your hostname, moon resolves to a loopback address: 127.0.1.1; using 172.18.15.5 instead (on interface wlan0)
15/09/24 09:25:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/09/24 09:25:25 INFO SecurityManager: Changing view acls to: hadoop
15/09/24 09:25:25 INFO SecurityManager: Changing modify acls to: hadoop
15/09/24 09:25:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/09/24 09:25:26 INFO Slf4jLogger: Slf4jLogger started
15/09/24 09:25:26 INFO Remoting: Starting remoting
15/09/24 09:25:26 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.18.15.5:60296]
15/09/24 09:25:26 INFO Utils: Successfully started service 'sparkDriver' on port 60296.
15/09/24 09:25:26 INFO SparkEnv: Registering MapOutputTracker
15/09/24 09:25:26 INFO SparkEnv: Registering BlockManagerMaster
15/09/24 09:25:26 INFO DiskBlockManager: Created local directory at /tmp/spark-0432fcec-c419-49a0-9720-058e532976fd/blockmgr-638c3042-0381-46b8-9c26-0a2dab26aeef
15/09/24 09:25:26 INFO MemoryStore: MemoryStore started with capacity 710.4 MB
15/09/24 09:25:27 INFO HttpFileServer: HTTP File server directory is /tmp/spark-0432fcec-c419-49a0-9720-058e532976fd/httpd-6a0a5378-e3d0-465d-a753-572c6fe5d019
15/09/24 09:25:27 INFO HttpServer: Starting HTTP Server
15/09/24 09:25:27 INFO Utils: Successfully started service 'HTTP file server' on port 41387.
15/09/24 09:25:27 INFO SparkEnv: Registering OutputCommitCoordinator
15/09/24 09:25:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/09/24 09:25:27 INFO SparkUI: Started SparkUI at http://172.18.15.5:4040 15/09/24 09:25:27 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@moon:7077/user/Master...
15/09/24 09:25:27 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150924092527-0002
15/09/24 09:25:27 INFO AppClient$ClientActor: Executor added: app-20150924092527-0002/0 on worker-20150924091347-172.18.15.5-41804 (172.18.15.5:41804) with 1 cores
15/09/24 09:25:27 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150924092527-0002/0 on hostPort 172.18.15.5:41804 with 1 cores, 512.0 MB RAM
15/09/24 09:25:27 INFO AppClient$ClientActor: Executor updated: app-20150924092527-0002/0 is now LOADING
15/09/24 09:25:27 INFO AppClient$ClientActor: Executor updated: app-20150924092527-0002/0 is now RUNNING
15/09/24 09:25:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59727.
15/09/24 09:25:27 INFO NettyBlockTransferService: Server created on 59727
15/09/24 09:25:27 INFO BlockManagerMaster: Trying to register BlockManager
15/09/24 09:25:27 INFO BlockManagerMasterEndpoint: Registering block manager 172.18.15.5:59727 with 710.4 MB RAM, BlockManagerId(driver, 172.18.15.5, 59727)
15/09/24 09:25:27 INFO BlockManagerMaster: Registered BlockManager
15/09/24 09:25:28 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
15/09/24 09:25:28 INFO SparkContext: Added JAR /usr/local/spark/IdeaProjects/out/artifacts/sparkPi/ideaprojects.jar at http://172.18.15.5:41387/jars/ideaprojects.jar with timestamp 1443057928511
15/09/24 09:25:28 INFO SparkContext: Starting job: reduce at SparkPi.scala:40
15/09/24 09:25:29 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:40) with 2 output partitions (allowLocal=false)
15/09/24 09:25:29 INFO DAGScheduler: Final stage: ResultStage 0(reduce at SparkPi.scala:40)
15/09/24 09:25:29 INFO DAGScheduler: Parents of final stage: List()
15/09/24 09:25:29 INFO DAGScheduler: Missing parents: List()
15/09/24 09:25:29 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:36), which has no missing parents
15/09/24 09:25:29 INFO MemoryStore: ensureFreeSpace(1888) called with curMem=0, maxMem=744876933
15/09/24 09:25:29 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1888.0 B, free 710.4 MB)
15/09/24 09:25:29 INFO MemoryStore: ensureFreeSpace(1202) called with curMem=1888, maxMem=744876933
15/09/24 09:25:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1202.0 B, free 710.4 MB)
15/09/24 09:25:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.18.15.5:59727 (size: 1202.0 B, free: 710.4 MB)
15/09/24 09:25:29 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:874
15/09/24 09:25:29 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:36)
15/09/24 09:25:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/24 09:25:30 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@172.18.15.5:56228/user/Executor#76001176]) with ID 0
15/09/24 09:25:30 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 172.18.15.5, PROCESS_LOCAL, 1425 bytes)
15/09/24 09:25:31 INFO BlockManagerMasterEndpoint: Registering block manager 172.18.15.5:43546 with 265.4 MB RAM, BlockManagerId(0, 172.18.15.5, 43546)
15/09/24 09:25:37 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.18.15.5:43546 (size: 1202.0 B, free: 265.4 MB)
15/09/24 09:25:38 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 172.18.15.5, PROCESS_LOCAL, 1482 bytes)
15/09/24 09:25:38 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 7111 ms on 172.18.15.5 (1/2)
15/09/24 09:25:38 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 53 ms on 172.18.15.5 (2/2)
15/09/24 09:25:38 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:40) finished in 8.765 s
15/09/24 09:25:38 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/09/24 09:25:38 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:40, took 9.120462 s
Pi is roughly 3.13826
15/09/24 09:25:38 INFO SparkUI: Stopped Spark web UI at http://172.18.15.5:4040 15/09/24 09:25:38 INFO DAGScheduler: Stopping DAGScheduler
15/09/24 09:25:38 INFO SparkDeploySchedulerBackend: Shutting down all executors
15/09/24 09:25:38 INFO SparkDeploySchedulerBackend: Asking each executor to shut down
15/09/24 09:25:38 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/09/24 09:25:38 INFO Utils: path = /tmp/spark-0432fcec-c419-49a0-9720-058e532976fd/blockmgr-638c3042-0381-46b8-9c26-0a2dab26aeef, already present as root for deletion.
15/09/24 09:25:38 INFO MemoryStore: MemoryStore cleared
15/09/24 09:25:38 INFO BlockManager: BlockManager stopped
15/09/24 09:25:38 INFO BlockManagerMaster: BlockManagerMaster stopped
15/09/24 09:25:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/09/24 09:25:38 INFO SparkContext: Successfully stopped SparkContext
15/09/24 09:25:38 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/09/24 09:25:38 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/09/24 09:25:38 INFO Utils: Shutdown hook called
15/09/24 09:25:38 INFO Utils: Deleting directory /tmp/spark-0432fcec-c419-49a0-9720-058e532976fd

Process finished with exit code 0




运行项目下的其他应用

事实上只需要有一个应用打了jar包,其他应用不需要再次打jar包,只要在文件里添加那两行代码就可以了

/**
* Created by root on 15-8-15.
*/
import java.util.Random

import scala.math.exp

import breeze.linalg.{Vector, DenseVector}

import org.apache.spark._

/**
* Logistic regression based classification.
* Usage: SparkLR [slices]
*/
object SparkLR {
val N = 10000  // Number of data points
val D = 10   // Numer of dimensions
val R = 0.7  // Scaling factor
val ITERATIONS = 5
val rand = new Random(42)

case class DataPoint(x: Vector[Double], y: Double)

def generateData = {
def generatePoint(i: Int) = {
val y = if(i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
Array.tabulate(N)(generatePoint)
}

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("SparkLR")
sparkConf.setMaster("spark://moon:7077") //尝试在集群上运行,会忽略VM参数-Dspark.master=local
val sc = new SparkContext(sparkConf)

sc.addJar("/usr/local/spark/IdeaProjects/out/artifacts/sparkPi/ideaprojects.jar")//add
val numSlices = if (args.length > 0) args(0).toInt else 2
val points = sc.parallelize(generateData, numSlices).cache()

// Initialize w to a random value
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}

println("Final w: " + w)
sc.stop()
}
}


/usr/local/jdk1.7/bin/java -Dspark.master=local -Didea.launcher.port=7534 -Didea.launcher.bin.path=/usr/local/spark/idea-IC-141.1532.4/bin -Dfile.encoding=UTF-8 -classpath /usr/local/jdk1.7/jre/lib/management-agent.jar:/usr/local/jdk1.7/jre/lib/jsse.jar:/usr/local/jdk1.7/jre/lib/plugin.jar:/usr/local/jdk1.7/jre/lib/jfxrt.jar:/usr/local/jdk1.7/jre/lib/javaws.jar:/usr/local/jdk1.7/jre/lib/charsets.jar:/usr/local/jdk1.7/jre/lib/jfr.jar:/usr/local/jdk1.7/jre/lib/jce.jar:/usr/local/jdk1.7/jre/lib/rt.jar:/usr/local/jdk1.7/jre/lib/deploy.jar:/usr/local/jdk1.7/jre/lib/resources.jar:/usr/local/jdk1.7/jre/lib/ext/zipfs.jar:/usr/local/jdk1.7/jre/lib/ext/sunjce_provider.jar:/usr/local/jdk1.7/jre/lib/ext/sunpkcs11.jar:/usr/local/jdk1.7/jre/lib/ext/dnsns.jar:/usr/local/jdk1.7/jre/lib/ext/localedata.jar:/usr/local/jdk1.7/jre/lib/ext/sunec.jar:/usr/local/spark/IdeaProjects/target/scala-2.10/classes:/home/hadoop/.sbt/boot/scala-2.10.4/lib/scala-library.jar:/usr/local/spark/spark-1.4.1-bin-hadoop2.4/lib/spark-assembly-1.4.1-hadoop2.4.0.jar:/usr/local/spark/idea-IC-141.1532.4/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain SparkLR
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/09/24 09:48:52 INFO SparkContext: Running Spark version 1.4.1
15/09/24 09:48:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/24 09:48:52 WARN Utils: Your hostname, moon resolves to a loopback address: 127.0.1.1; using 172.18.15.5 instead (on interface wlan0)
15/09/24 09:48:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/09/24 09:48:52 INFO SecurityManager: Changing view acls to: hadoop
15/09/24 09:48:52 INFO SecurityManager: Changing modify acls to: hadoop
15/09/24 09:48:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/09/24 09:48:53 INFO Slf4jLogger: Slf4jLogger started
15/09/24 09:48:53 INFO Remoting: Starting remoting
15/09/24 09:48:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.18.15.5:39678]
15/09/24 09:48:53 INFO Utils: Successfully started service 'sparkDriver' on port 39678.
15/09/24 09:48:53 INFO SparkEnv: Registering MapOutputTracker
15/09/24 09:48:53 INFO SparkEnv: Registering BlockManagerMaster
15/09/24 09:48:53 INFO DiskBlockManager: Created local directory at /tmp/spark-401b12ac-229a-42ba-9544-851910a37225/blockmgr-aca1b77d-f9c7-47bd-95bd-4391783f7e82
15/09/24 09:48:53 INFO MemoryStore: MemoryStore started with capacity 710.4 MB
15/09/24 09:48:54 INFO HttpFileServer: HTTP File server directory is /tmp/spark-401b12ac-229a-42ba-9544-851910a37225/httpd-6db521af-7f5d-491a-9312-6e0670a89e13
15/09/24 09:48:54 INFO HttpServer: Starting HTTP Server
15/09/24 09:48:54 INFO Utils: Successfully started service 'HTTP file server' on port 39263.
15/09/24 09:48:54 INFO SparkEnv: Registering OutputCommitCoordinator
15/09/24 09:48:54 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/09/24 09:48:54 INFO SparkUI: Started SparkUI at http://172.18.15.5:4040 15/09/24 09:48:54 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@moon:7077/user/Master...
15/09/24 09:48:54 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150924094854-0005
15/09/24 09:48:54 INFO AppClient$ClientActor: Executor added: app-20150924094854-0005/0 on worker-20150924091347-172.18.15.5-41804 (172.18.15.5:41804) with 1 cores
15/09/24 09:48:54 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150924094854-0005/0 on hostPort 172.18.15.5:41804 with 1 cores, 512.0 MB RAM
15/09/24 09:48:54 INFO AppClient$ClientActor: Executor updated: app-20150924094854-0005/0 is now LOADING
15/09/24 09:48:54 INFO AppClient$ClientActor: Executor updated: app-20150924094854-0005/0 is now RUNNING
15/09/24 09:48:54 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45416.
15/09/24 09:48:54 INFO NettyBlockTransferService: Server created on 45416
15/09/24 09:48:54 INFO BlockManagerMaster: Trying to register BlockManager
15/09/24 09:48:54 INFO BlockManagerMasterEndpoint: Registering block manager 172.18.15.5:45416 with 710.4 MB RAM, BlockManagerId(driver, 172.18.15.5, 45416)
15/09/24 09:48:54 INFO BlockManagerMaster: Registered BlockManager
15/09/24 09:48:55 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
15/09/24 09:48:55 INFO SparkContext: Added JAR /usr/local/spark/IdeaProjects/out/artifacts/sparkPi/ideaprojects.jar at http://172.18.15.5:39263/jars/ideaprojects.jar with timestamp 1443059335396
Initial w: DenseVector(-0.8066603352924779, -0.5488747509304204, -0.7351625370864459, 0.8228539509375878, -0.6662446067860872, -0.33245457898921527, 0.9664202269036932, -0.20407887461434115, 0.4120993933386614, -0.8125908063470539)
On iteration 1
15/09/24 09:48:56 INFO SparkContext: Starting job: reduce at SparkLR.scala:52
15/09/24 09:48:56 INFO DAGScheduler: Got job 0 (reduce at SparkLR.scala:52) with 2 output partitions (allowLocal=false)
15/09/24 09:48:56 INFO DAGScheduler: Final stage: ResultStage 0(reduce at SparkLR.scala:52)
15/09/24 09:48:56 INFO DAGScheduler: Parents of final stage: List()
15/09/24 09:48:56 INFO DAGScheduler: Missing parents: List()
15/09/24 09:48:56 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkLR.scala:50), which has no missing parents
15/09/24 09:48:56 INFO MemoryStore: ensureFreeSpace(2304) called with curMem=0, maxMem=744876933
15/09/24 09:48:56 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.3 KB, free 710.4 MB)
15/09/24 09:48:56 INFO MemoryStore: ensureFreeSpace(1481) called with curMem=2304, maxMem=744876933
15/09/24 09:48:56 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1481.0 B, free 710.4 MB)
15/09/24 09:48:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.18.15.5:45416 (size: 1481.0 B, free: 710.4 MB)
15/09/24 09:48:57 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:874
15/09/24 09:48:57 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkLR.scala:50)
15/09/24 09:48:57 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/24 09:48:57 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@172.18.15.5:60251/user/Executor#1343581973]) with ID 0
15/09/24 09:48:57 INFO BlockManagerMasterEndpoint: Registering block manager 172.18.15.5:50188 with 265.4 MB RAM, BlockManagerId(0, 172.18.15.5, 50188)
15/09/24 09:48:57 WARN TaskSetManager: Stage 0 contains a task of very large size (597 KB). The maximum recommended task size is 100 KB.
15/09/24 09:48:57 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.18.15.5:50188 (size: 1481.0 B, free: 265.4 MB)
15/09/24 09:49:05 INFO BlockManagerInfo: Added rdd_0_0 in memory on 172.18.15.5:50188 (size: 800.8 KB, free: 264.6 MB)
15/09/24 09:49:05 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 7803 ms on 172.18.15.5 (1/2)
15/09/24 09:49:05 INFO BlockManagerInfo: Added rdd_0_1 in memory on 172.18.15.5:50188 (size: 800.8 KB, free: 263.8 MB)
15/09/24 09:49:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 424 ms on 172.18.15.5 (2/2)
15/09/24 09:49:05 INFO DAGScheduler: ResultStage 0 (reduce at SparkLR.scala:52) finished in 8.374 s
15/09/24 09:49:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/09/24 09:49:05 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/09/24 09:49:05 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
15/09/24 09:49:05 INFO DAGScheduler: Job 0 finished: reduce at SparkLR.scala:52, took 9.449757 s
On iteration 2
15/09/24 09:49:05 INFO SparkContext: Starting job: reduce at SparkLR.scala:52
15/09/24 09:49:05 INFO DAGScheduler: Got job 1 (reduce at SparkLR.scala:52) with 2 output partitions (allowLocal=false)
15/09/24 09:49:05 INFO DAGScheduler: Final stage: ResultStage 1(reduce at SparkLR.scala:52)
15/09/24 09:49:05 INFO DAGScheduler: Parents of final stage: List()
15/09/24 09:49:05 INFO DAGScheduler: Missing parents: List()
15/09/24 09:49:05 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[2] at map at SparkLR.scala:50), which has no missing parents
15/09/24 09:49:05 INFO MemoryStore: ensureFreeSpace(2304) called with curMem=3785, maxMem=744876933
15/09/24 09:49:05 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 710.4 MB)
15/09/24 09:49:05 INFO MemoryStore: ensureFreeSpace(1478) called with curMem=6089, maxMem=744876933
15/09/24 09:49:05 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1478.0 B, free 710.4 MB)
15/09/24 09:49:05 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.18.15.5:45416 (size: 1478.0 B, free: 710.4 MB)
15/09/24 09:49:05 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:874
15/09/24 09:49:05 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[2] at map at SparkLR.scala:50)
15/09/24 09:49:05 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/09/24 09:49:06 WARN TaskSetManager: Stage 1 contains a task of very large size (597 KB). The maximum recommended task size is 100 KB.
15/09/24 09:49:06 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.18.15.5:50188 (size: 1478.0 B, free: 263.8 MB)
15/09/24 09:49:06 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:06 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 179 ms on 172.18.15.5 (1/2)
15/09/24 09:49:06 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 172.18.15.5:45416 in memory (size: 1481.0 B, free: 710.4 MB)
15/09/24 09:49:06 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 172.18.15.5:50188 in memory (size: 1481.0 B, free: 263.8 MB)
15/09/24 09:49:06 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 114 ms on 172.18.15.5 (2/2)
15/09/24 09:49:06 INFO DAGScheduler: ResultStage 1 (reduce at SparkLR.scala:52) finished in 0.268 s
15/09/24 09:49:06 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/09/24 09:49:06 INFO DAGScheduler: Job 1 finished: reduce at SparkLR.scala:52, took 0.297350 s
On iteration 3
15/09/24 09:49:06 INFO SparkContext: Starting job: reduce at SparkLR.scala:52
15/09/24 09:49:06 INFO DAGScheduler: Got job 2 (reduce at SparkLR.scala:52) with 2 output partitions (allowLocal=false)
15/09/24 09:49:06 INFO DAGScheduler: Final stage: ResultStage 2(reduce at SparkLR.scala:52)
15/09/24 09:49:06 INFO DAGScheduler: Parents of final stage: List()
15/09/24 09:49:06 INFO DAGScheduler: Missing parents: List()
15/09/24 09:49:06 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3] at map at SparkLR.scala:50), which has no missing parents
15/09/24 09:49:06 INFO MemoryStore: ensureFreeSpace(2304) called with curMem=3782, maxMem=744876933
15/09/24 09:49:06 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.3 KB, free 710.4 MB)
15/09/24 09:49:06 INFO MemoryStore: ensureFreeSpace(1481) called with curMem=6086, maxMem=744876933
15/09/24 09:49:06 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1481.0 B, free 710.4 MB)
15/09/24 09:49:06 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.18.15.5:45416 (size: 1481.0 B, free: 710.4 MB)
15/09/24 09:49:06 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:874
15/09/24 09:49:06 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[3] at map at SparkLR.scala:50)
15/09/24 09:49:06 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
15/09/24 09:49:06 WARN TaskSetManager: Stage 2 contains a task of very large size (597 KB). The maximum recommended task size is 100 KB.
15/09/24 09:49:06 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:06 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.18.15.5:50188 (size: 1481.0 B, free: 263.8 MB)
15/09/24 09:49:06 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:06 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 108 ms on 172.18.15.5 (1/2)
15/09/24 09:49:06 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5) in 64 ms on 172.18.15.5 (2/2)
15/09/24 09:49:06 INFO DAGScheduler: ResultStage 2 (reduce at SparkLR.scala:52) finished in 0.157 s
15/09/24 09:49:06 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
15/09/24 09:49:06 INFO DAGScheduler: Job 2 finished: reduce at SparkLR.scala:52, took 0.174698 s
On iteration 4
15/09/24 09:49:06 INFO SparkContext: Starting job: reduce at SparkLR.scala:52
15/09/24 09:49:06 INFO DAGScheduler: Got job 3 (reduce at SparkLR.scala:52) with 2 output partitions (allowLocal=false)
15/09/24 09:49:06 INFO DAGScheduler: Final stage: ResultStage 3(reduce at SparkLR.scala:52)
15/09/24 09:49:06 INFO DAGScheduler: Parents of final stage: List()
15/09/24 09:49:06 INFO DAGScheduler: Missing parents: List()
15/09/24 09:49:06 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[4] at map at SparkLR.scala:50), which has no missing parents
15/09/24 09:49:06 INFO MemoryStore: ensureFreeSpace(2304) called with curMem=7567, maxMem=744876933
15/09/24 09:49:06 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.3 KB, free 710.4 MB)
15/09/24 09:49:06 INFO MemoryStore: ensureFreeSpace(1483) called with curMem=9871, maxMem=744876933
15/09/24 09:49:06 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1483.0 B, free 710.4 MB)
15/09/24 09:49:06 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.18.15.5:45416 (size: 1483.0 B, free: 710.4 MB)
15/09/24 09:49:06 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874
15/09/24 09:49:06 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 3 (MapPartitionsRDD[4] at map at SparkLR.scala:50)
15/09/24 09:49:06 INFO TaskSchedulerImpl: Adding task set 3.0 with 2 tasks
15/09/24 09:49:06 WARN TaskSetManager: Stage 3 contains a task of very large size (597 KB). The maximum recommended task size is 100 KB.
15/09/24 09:49:06 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 6, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:06 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.18.15.5:50188 (size: 1483.0 B, free: 263.8 MB)
15/09/24 09:49:06 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 7, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:06 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 6) in 119 ms on 172.18.15.5 (1/2)
15/09/24 09:49:06 INFO DAGScheduler: ResultStage 3 (reduce at SparkLR.scala:52) finished in 0.158 s
15/09/24 09:49:06 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 7) in 58 ms on 172.18.15.5 (2/2)
15/09/24 09:49:06 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
15/09/24 09:49:06 INFO DAGScheduler: Job 3 finished: reduce at SparkLR.scala:52, took 0.169303 s
On iteration 5
15/09/24 09:49:06 INFO SparkContext: Starting job: reduce at SparkLR.scala:52
15/09/24 09:49:06 INFO DAGScheduler: Got job 4 (reduce at SparkLR.scala:52) with 2 output partitions (allowLocal=false)
15/09/24 09:49:06 INFO DAGScheduler: Final stage: ResultStage 4(reduce at SparkLR.scala:52)
15/09/24 09:49:06 INFO DAGScheduler: Parents of final stage: List()
15/09/24 09:49:06 INFO DAGScheduler: Missing parents: List()
15/09/24 09:49:06 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[5] at map at SparkLR.scala:50), which has no missing parents
15/09/24 09:49:06 INFO MemoryStore: ensureFreeSpace(2304) called with curMem=11354, maxMem=744876933
15/09/24 09:49:06 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.3 KB, free 710.4 MB)
15/09/24 09:49:06 INFO MemoryStore: ensureFreeSpace(1480) called with curMem=13658, maxMem=744876933
15/09/24 09:49:06 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1480.0 B, free 710.4 MB)
15/09/24 09:49:06 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 172.18.15.5:45416 (size: 1480.0 B, free: 710.4 MB)
15/09/24 09:49:06 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:874
15/09/24 09:49:06 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 4 (MapPartitionsRDD[5] at map at SparkLR.scala:50)
15/09/24 09:49:06 INFO TaskSchedulerImpl: Adding task set 4.0 with 2 tasks
15/09/24 09:49:06 WARN TaskSetManager: Stage 4 contains a task of very large size (597 KB). The maximum recommended task size is 100 KB.
15/09/24 09:49:06 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 8, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:06 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 172.18.15.5:50188 (size: 1480.0 B, free: 263.8 MB)
15/09/24 09:49:06 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 9, 172.18.15.5, PROCESS_LOCAL, 611608 bytes)
15/09/24 09:49:06 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 8) in 88 ms on 172.18.15.5 (1/2)
15/09/24 09:49:06 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 9) in 55 ms on 172.18.15.5 (2/2)
15/09/24 09:49:06 INFO DAGScheduler: ResultStage 4 (reduce at SparkLR.scala:52) finished in 0.126 s
15/09/24 09:49:06 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
15/09/24 09:49:06 INFO DAGScheduler: Job 4 finished: reduce at SparkLR.scala:52, took 0.142455 s
Final w: DenseVector(5816.075967498865, 5222.008066011391, 5754.751978607454, 3853.1772062206846, 5593.565827145932, 5282.387874201054, 3662.9216051953435, 4890.78210340607, 4223.371512250292, 5767.368579668863)
15/09/24 09:49:06 INFO SparkUI: Stopped Spark web UI at http://172.18.15.5:4040 15/09/24 09:49:06 INFO DAGScheduler: Stopping DAGScheduler
15/09/24 09:49:06 INFO SparkDeploySchedulerBackend: Shutting down all executors
15/09/24 09:49:06 INFO SparkDeploySchedulerBackend: Asking each executor to shut down
15/09/24 09:49:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/09/24 09:49:06 INFO Utils: path = /tmp/spark-401b12ac-229a-42ba-9544-851910a37225/blockmgr-aca1b77d-f9c7-47bd-95bd-4391783f7e82, already present as root for deletion.
15/09/24 09:49:06 INFO MemoryStore: MemoryStore cleared
15/09/24 09:49:06 INFO BlockManager: BlockManager stopped
15/09/24 09:49:06 INFO BlockManagerMaster: BlockManagerMaster stopped
15/09/24 09:49:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/09/24 09:49:06 INFO SparkContext: Successfully stopped SparkContext
15/09/24 09:49:06 INFO Utils: Shutdown hook called
15/09/24 09:49:06 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/09/24 09:49:06 INFO Utils: Deleting directory /tmp/spark-401b12ac-229a-42ba-9544-851910a37225
15/09/24 09:49:06 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

Process finished with exit code 0




10/12日在公司电脑上项目打包出现Please try specifying another one using the -encoding option

原因竟然是使用中文注释后,编码变为了gbk。无语

10/26日

windows环境下,在intellij idea中连接spark集群,并运行代码

这个。。。走了许多弯路。。。。。

总结如下:

1,修改windows的host文件,添加集群ip映射

2,代码中使用主机名代替ip地址

3,注意设置spark核数,否则可能出现

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory


package org.apache.spark.examples

import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
//conf.set("master","spark://moon:7077")
conf.setMaster("spark://moon:7077")。setJars(List("E:\\cloud\\ideaproject\\sparkTest\\out\\artifacts\\sparkTest\\sparkTest.jar") )
conf.set("spark.cores.max","4")//这里设置要使用的核数
val spark = new SparkContext(conf)

val slices = if (args.length > 0) args(0).toInt else 2
val n = 100000 * slices
//把集合中的元素复制到一个可并行操作的分布式数据集中,slices 表示一个数据集切分的份数,spark会在集群上为每一个切片运行一个任务
//其实spark会自动设置切片数
val count = spark.parallelize(1 to n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: