您的位置:首页 > 其它

sparkstreaming版本的单词统计

2016-06-04 18:18 253 查看
直接上代码、注释:

package com.scala.my



import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.Duration

import org.apache.spark.streaming.Durations

/**

 * spark streaming 版本的单词统计(通过监听端口)

 * 1\在 hh15上执行 :yum install nc 安装工具nc

 * 第一种本地测试的步骤:

 *     2\在hh15上启动端口:#nc -lk 8888

 *     3\在本地eclipse上run程序

 *     4\在hh15上的dos界面中输入单词,输入的时间要在Durations.seconds(10)的范围内

 *     5\在eclipse界面查看是否成功

 * 第二种服务器上测试的步骤:

 *     2\开启spark集群:(1)启动zookeeeper  (2)在hh15上的spark的sbin下启动spark集群---》#sh start-all.sh

 *     3\在hh15上开启端口:#nc -lk 8888

 *     4\将WordCount.java类打成jar包(sparkStreamWordCount.jar),并且放到hh15上

 *     5\使用standalone模式在hh15上执行:(一般是后台执行)

 *     # nohup sh 

 *     6\在刚才开启端口的窗口输入单词

 *     

 *     

 *     测试结果:第一种方式测试通过,第二种方式未测试

 */

object StreamWordCount {

  def main(args: Array[String]): Unit = {

    //获取context  至少并行度是2

    val sc=new StreamingContext(new SparkConf().setAppName("StreamCount").setMaster("local[2]"),Durations.seconds(8))

    //获取端口输入的文本信息

    val lines=sc.socketTextStream("192.168.142.115",
8888)

    //压扁

    val paris=lines.flatMap(x=>x.split(" ")) 

      //map

    val words=paris.map((_,1))

    //reduce

    val result=words.reduceByKey((x,y)=>x+y)

    //打印前10个数

    result.print()

    //开启start

    sc.start()

    //等待

    sc.awaitTermination()

    //关闭资源

    sc.stop()

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: