Spark实时流计算Java案例
2016-06-07 21:20
495 查看
现在,网上基于spark的代码基本上都是Scala,很多书上也都是基于Scala,没办法,谁叫spark是Scala写出来的了,但是我现在还没系统的学习Scala,所以只能用java写spark程序了,spark支持java,而且Scala也基于JVM,不说了,直接上代码
这是官网上给出的例子,大数据学习中经典案例单词计数
在linux下一个终端 输入 $ nc -lk 9999
然后运行下面的代码
然后再刚刚的终端输入 hello world
就可以通过控制台看到
并且hdfs上也可以看到通过计算生成的实时文件
第二个案例是,不是通过socketTextStream套接字,而是直接通过hdfs上的某个文件目录来作为输入数据源
这样就存在端口一直在监控你的那个目录,只要它有文件生成,就会马上读取到它里面的内容,你可以先运行程序,然后手动添加一个文件到刚刚的目录,就可以看到输出结果了
码字不易,转载请指明出处/article/11843102.html
参考
spark编程指南
这是官网上给出的例子,大数据学习中经典案例单词计数
在linux下一个终端 输入 $ nc -lk 9999
然后运行下面的代码
package com.tg.spark.stream; import java.util.Arrays; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import scala.Tuple2; /** * * @author 汤高 * */ public class SparkStream { public static void main(String[] args) { // Create a local StreamingContext with two working thread and batch // interval of 1 second SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory", "2147480000"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); System.out.println(jssc); // Create a DStream that will connect to hostname:port, like // localhost:9999 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999); //JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream"); // Split each line into words JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { System.out.println(Arrays.asList(x.split(" ")).get(0)); return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); System.out.println(pairs); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); // Print the first ten elements of each RDD generated in this DStream to // the console wordCounts.print(); //wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark", new Text(), new IntWritable(), JavaPairDStream<Text,IntWritable>()); wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/testFile/", "spark"); //wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark",Text,IntWritable); //System.out.println(wordCounts.count()); jssc.start(); //System.out.println(wordCounts.count());// Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } }
然后再刚刚的终端输入 hello world
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world
就可以通过控制台看到
------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ...
并且hdfs上也可以看到通过计算生成的实时文件
第二个案例是,不是通过socketTextStream套接字,而是直接通过hdfs上的某个文件目录来作为输入数据源
package com.tg.spark.stream; import java.util.Arrays; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import scala.Tuple2; /** * * @author 汤高 * */ public class SparkStream2 { public static void main(String[] args) { // Create a local StreamingContext with two working thread and batch // interval of 1 second SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory", "2147480000"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); System.out.println(jssc); // Create a DStream that will connect to hostname:port, like // localhost:9999 //JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999); JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream"); // Split each line into words JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { System.out.println(Arrays.asList(x.split(" ")).get(0)); return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); System.out.println(pairs); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); // Print the first ten elements of each RDD generated in this DStream to // the console wordCounts.print(); //wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark", new Text(), new IntWritable(), JavaPairDStream<Text,IntWritable>()); wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/testFile/", "spark"); //wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark",Text,IntWritable); //System.out.println(wordCounts.count()); jssc.start(); //System.out.println(wordCounts.count());// Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } }
这样就存在端口一直在监控你的那个目录,只要它有文件生成,就会马上读取到它里面的内容,你可以先运行程序,然后手动添加一个文件到刚刚的目录,就可以看到输出结果了
码字不易,转载请指明出处/article/11843102.html
参考
spark编程指南
相关文章推荐
- Spring官网改版后下载
- 解读struts2的原始默认配置文件struts2-default.xml
- Java Map遍历方式方式及性能测试
- leetcode-java-5. Longest Palindromic Substring
- 03.Java 多线程 - synchronized
- Java File类总结和FileUtils类
- 不使用Cygwin,在eclipse中快速开发JNI,一键生成C头文件.h,以及一键使用NDK交叉编译
- [疯狂Java]泛型:泛型的底层原理(类型擦除、原生类型、编译前检查)
- Leetcode 349. Intersection of Two Arrays
- Java并发编程-Executor框架之Callable和Future接口
- Java中Fail-Fast机制、ConcurrentModificationException异常
- SpringMVC知识点梳理
- RxJava 与 Retrofit 结合的最佳实践
- JDK常用的包
- Spring JdbcTemplate方法详解
- Java 内存区域和GC机制
- 跟着小白学~如何使用二维数组打印杨辉三角
- java多线程基础一
- Java-文本下载
- struts2中ajax异步请求action会被自动执行两次