spark java api通过run as java application运行的方法
2014-07-08 16:40
489 查看
先上代码:
这是spark 自带的一个example 之前只能将代码达成jar包然后在spark的bin目录下面通过spark-class来运行,这样我们就没办法将spark的程序你很好的融合到现有的系统中,所以我希望通过java函数调用的方式运行这段程序,在一段时间的摸索和老师的指导下发现根据报错的意思应该是没有将jar包提交到spark的worker上面 导致运行的worker找不到被调用的类,会报如下错误:
解决方案:将要运行的程序达成jar包,然后调用JavaSparkContext的addJar方法将该jar包提交到spark集群中,然后spark的master会将该jar包分发到各个worker上面,
代码如下:
这样运行时就不会出现java.lang.ClassNotFoundException: JavaWordCount$1这样的错误了
运行如下:
spark://localhost:7077 hdfs://localhost:9000/input/test.txt hdfs://localhost:9000/input/result.txt
然后会eclipse控制台中会有如下log
程序执行结果如下:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.util.Arrays; import java.util.regex.Pattern; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public final class JavaWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaWordCount <master> <file>"); System.exit(1); } JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class)); ctx.addJar("/home/hadoop/Desktop/JavaSparkT.jar"); JavaRDD<String> lines = ctx.textFile(args[1], 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); counts.saveAsTextFile(args[2]); // counts.s /*List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?,?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); }*/ System.exit(0); } }
这是spark 自带的一个example 之前只能将代码达成jar包然后在spark的bin目录下面通过spark-class来运行,这样我们就没办法将spark的程序你很好的融合到现有的系统中,所以我希望通过java函数调用的方式运行这段程序,在一段时间的摸索和老师的指导下发现根据报错的意思应该是没有将jar包提交到spark的worker上面 导致运行的worker找不到被调用的类,会报如下错误:
4/07/07 10:26:10 INFO TaskSetManager: Serialized task 1.0:0 as 2194 bytes in 104 ms 14/07/07 10:26:11 WARN TaskSetManager: Lost TID 0 (task 1.0:0) 14/07/07 10:26:11 WARN TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: JavaWordCount$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
解决方案:将要运行的程序达成jar包,然后调用JavaSparkContext的addJar方法将该jar包提交到spark集群中,然后spark的master会将该jar包分发到各个worker上面,
代码如下:
这样运行时就不会出现java.lang.ClassNotFoundException: JavaWordCount$1这样的错误了
运行如下:
spark://localhost:7077 hdfs://localhost:9000/input/test.txt hdfs://localhost:9000/input/result.txt
然后会eclipse控制台中会有如下log
14/07/08 16:03:06 INFO Utils: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/08 16:03:06 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.200.233 instead (on interface eth0) 14/07/08 16:03:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 14/07/08 16:03:07 INFO Slf4jLogger: Slf4jLogger started 14/07/08 16:03:07 INFO Remoting: Starting remoting 14/07/08 16:03:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.200.233:52469] 14/07/08 16:03:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.200.233:52469] 14/07/08 16:03:07 INFO SparkEnv: Registering BlockManagerMaster 14/07/08 16:03:07 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140708160307-0a89 14/07/08 16:03:07 INFO MemoryStore: MemoryStore started with capacity 484.2 MB. 14/07/08 16:03:08 INFO ConnectionManager: Bound socket to port 47731 with id = ConnectionManagerId(192.168.200.233,47731) 14/07/08 16:03:08 INFO BlockManagerMaster: Trying to register BlockManager 14/07/08 16:03:08 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager 192.168.200.233:47731 with 484.2 MB RAM 14/07/08 16:03:08 INFO BlockManagerMaster: Registered BlockManager 14/07/08 16:03:08 INFO HttpServer: Starting HTTP Server 14/07/08 16:03:08 INFO HttpBroadcast: Broadcast server started at http://192.168.200.233:58077 14/07/08 16:03:08 INFO SparkEnv: Registering MapOutputTracker 14/07/08 16:03:08 INFO HttpFileServer: HTTP File server directory is /tmp/spark-86439c44-9a36-4bda-b8c7-063c5c2e15b2 14/07/08 16:03:08 INFO HttpServer: Starting HTTP Server 14/07/08 16:03:08 INFO SparkUI: Started Spark Web UI at http://192.168.200.233:4040 14/07/08 16:03:08 INFO AppClient$ClientActor: Connecting to master spark://localhost:7077... 14/07/08 16:03:09 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140708160309-0000 14/07/08 16:03:09 INFO AppClient$ClientActor: Executor added: app-20140708160309-0000/0 on worker-20140708160246-localhost-34775 (localhost:34775) with 4 cores 14/07/08 16:03:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140708160309-0000/0 on hostPort localhost:34775 with 4 cores, 512.0 MB RAM 14/07/08 16:03:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/08 16:03:09 INFO AppClient$ClientActor: Executor updated: app-20140708160309-0000/0 is now RUNNING 14/07/08 16:03:10 INFO SparkContext: Added JAR /home/hadoop/Desktop/JavaSparkT.jar at http://192.168.200.233:52827/jars/JavaSparkT.jar with timestamp 1404806590353 14/07/08 16:03:10 INFO MemoryStore: ensureFreeSpace(138763) called with curMem=0, maxMem=507720499 14/07/08 16:03:10 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.5 KB, free 484.1 MB) 14/07/08 16:03:12 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@localhost:42090/user/Executor#-1434031133] with ID 0 14/07/08 16:03:13 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager localhost:56831 with 294.9 MB RAM 14/07/08 16:03:13 INFO FileInputFormat: Total input paths to process : 1 14/07/08 16:03:13 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 14/07/08 16:03:13 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 14/07/08 16:03:13 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 14/07/08 16:03:13 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 14/07/08 16:03:13 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 14/07/08 16:03:13 INFO SparkContext: Starting job: saveAsTextFile at JavaWordCount.java:66 14/07/08 16:03:13 INFO DAGScheduler: Registering RDD 4 (reduceByKey at JavaWordCount.java:60) 14/07/08 16:03:13 INFO DAGScheduler: Got job 0 (saveAsTextFile at JavaWordCount.java:66) with 1 output partitions (allowLocal=false) 14/07/08 16:03:13 INFO DAGScheduler: Final stage: Stage 0 (saveAsTextFile at JavaWordCount.java:66) 14/07/08 16:03:13 INFO DAGScheduler: Parents of final stage: List(Stage 1) 14/07/08 16:03:13 INFO DAGScheduler: Missing parents: List(Stage 1) 14/07/08 16:03:13 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at JavaWordCount.java:60), which has no missing parents 14/07/08 16:03:13 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at JavaWordCount.java:60) 14/07/08 16:03:13 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/07/08 16:03:13 INFO TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: localhost (PROCESS_LOCAL) 14/07/08 16:03:13 INFO TaskSetManager: Serialized task 1.0:0 as 2252 bytes in 39 ms 14/07/08 16:03:17 INFO TaskSetManager: Finished TID 0 in 3310 ms on localhost (progress: 1/1) 14/07/08 16:03:17 INFO DAGScheduler: Completed ShuffleMapTask(1, 0) 14/07/08 16:03:17 INFO DAGScheduler: Stage 1 (reduceByKey at JavaWordCount.java:60) finished in 3.319 s 14/07/08 16:03:17 INFO DAGScheduler: looking for newly runnable stages 14/07/08 16:03:17 INFO DAGScheduler: running: Set() 14/07/08 16:03:17 INFO DAGScheduler: waiting: Set(Stage 0) 14/07/08 16:03:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/07/08 16:03:17 INFO DAGScheduler: failed: Set() 14/07/08 16:03:17 INFO DAGScheduler: Missing parents for Stage 0: List() 14/07/08 16:03:17 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[7] at saveAsTextFile at JavaWordCount.java:66), which is now runnable 14/07/08 16:03:17 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[7] at saveAsTextFile at JavaWordCount.java:66) 14/07/08 16:03:17 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/07/08 16:03:17 INFO TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: localhost (PROCESS_LOCAL) 14/07/08 16:03:17 INFO TaskSetManager: Serialized task 0.0:0 as 11717 bytes in 0 ms 14/07/08 16:03:17 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@localhost:37990 14/07/08 16:03:17 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 127 bytes 14/07/08 16:03:18 INFO DAGScheduler: Completed ResultTask(0, 0) 14/07/08 16:03:18 INFO TaskSetManager: Finished TID 1 in 1074 ms on localhost (progress: 1/1) 14/07/08 16:03:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/07/08 16:03:18 INFO DAGScheduler: Stage 0 (saveAsTextFile at JavaWordCount.java:66) finished in 1.076 s 14/07/08 16:03:18 INFO SparkContext: Job finished: saveAsTextFile at JavaWordCount.java:66, took 4.719158065 s
程序执行结果如下:
[hadoop@localhost sbin]$ hadoop fs -ls hdfs://localhost:9000/input/result.txt 14/07/08 16:04:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items -rw-r--r-- 3 hadoop supergroup 0 2014-07-08 16:03 hdfs://localhost:9000/input/result.txt/_SUCCESS -rw-r--r-- 3 hadoop supergroup 56 2014-07-08 16:03 hdfs://localhost:9000/input/result.txt/part-00000 [hadoop@localhost sbin]$ hadoop fs -cat hdfs://localhost:9000/input/result.txt/part-00000 14/07/08 16:04:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (caozw,1) (hello,3) (hadoop,1) (2.2.0,1) (world,1) [hadoop@localhost sbin]$
相关文章推荐
- spark java api通过run as java application运行的方法
- spark java api通过run as java application运行的方法
- eclipse 的项目存在main()方法 但是没有 run as java application
- 使用java代码提交Spark的hive sql任务,run as java application
- java之运行本类时-----结果运行的是其他类--------没有run as application
- eclipse 的项目存在main()方法 但是没有 run as java application
- eclipse中运行时java application与javaee run on server 他们分别的作用
- 运行Java应用必须通过main()方法吗?
- eclipse通过Remote Java Application调试Uiautomator的方法
- Jetty的配置、部署与API使用——(2)通过Jetty的Java API运行一个Jetty服务器
- 在java中如何防止从其它线程类运行run方法
- hadoop学习;hdfs操作;运行抛出权限异常: Permission denied;api查看源码方法;源码不停的向里循环;抽象类通过debug查找源码
- Java通过google map api实现地址解析的方法
- java通过共享变量结束run停止线程的方法示例
- 在非web环境下,直接用java命令行运行程序时,对于通过getResources找不到jar包中的资源和目录的解决方法
- run as server和run as run java application
- Ubuntu运行Chrome出现“Google Chrome can not be run as root”的解决方法
- Ubuntu上运行Chrome出现“Google Chrome can not be run as root”错误的解决方法
- eclipse 没有 run as java application
- Java: How to compile and run Java application in command line(在命令行中编译运行java程序)