Spark-Streaming入门例子
2017-05-11 14:08
363 查看
概述
本文分步骤讲解如何创建一个简单的spark-streaming程序,例子是一个简单的WordCount程序,从socket接收输入的句子,用空格分隔出所有单词,然后统计各个单词出现的次数,最后打印出来。需要说明的是,本文不会详细讲解代码,仅仅是带领大家先体验一把spark-streaming的流式计算功能。话不多说,开始动手…创建工程
首先创建一个maven项目,项目pom中添加spark-streaming相关依赖,pom文件内容如下:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.winwill.spark</groupId> <artifactId>spark-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.5.2</version> </dependency> </dependencies> </project>
写代码
创建一个SparkStreamingDemo类,代码如下:package com.winwill.spark.demo; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; /** * @author qifuguang * @date 15/12/8 14:55 */ public class SparkStreamingDemo { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); // Split each line into words JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } }
启动netcat服务
上一个步骤已经将spark程序代码写完了,现在需要的是模拟一个数据输入源,本例使用netcat工具模拟socket,向本机的9999端口输入数据,供spark程序消费。启动netcat服务,使用的端口是9999:[qifuguang@Mac~]$ nc -lp 9999
启动spark程序
现在运行刚才编写好的spark程序:然后向netcat程序输入一句话:”hello world”:
[qifuguang@Mac~]$ nc -lp 9999 hello world
观察spark程序的输出:
一个简单的例子就这样了,后续文章再详细介绍代码吧。
相关文章推荐
- kafka+spark streaming例子入门
- SparkStreaming的一个入门例子程序
- spark streaming 入门例子
- SparkStreaming入门及例子
- Spark入门实战系列 (做个标签,转载基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现)
- Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍
- Spark Streaming入门
- SparkStream例子HdfsWordCount--Streaming的Job是如何调度的
- SparkStreaming之入门
- spark入门cogroup简单例子(JAVA)
- spark入门cogroup简单例子(JAVA)
- spark streaming的有状态例子
- Spark-Spark Streaming例子整理(三)
- Spark Streaming java实现简单例子(一)
- Spark入门实战系列--7.Spark Streaming(下)--实时流计算Spark Streaming实战
- Spark修炼之道(进阶篇)——Spark入门到精通:第十四节 Spark Streaming 缓存、Checkpoint机制
- Spark-Spark Streaming例子整理(一)
- spark入门cogroup简单例子(JAVA)
- spark入门cogroup简单例子(JAVA)
- spark-streaming入门(三)