Scala之Demo2
2015-10-15 09:31
387 查看
1.socket服务端
package com.test.sparkstreaming import java.io.ObjectInputStream import java.io.IOException import java.net.SocketException import java.io.DataInputStream import java.net.Socket import java.io.DataOutputStream import scala.util.Random import java.net.ServerSocket import scala.collection.mutable.ListBuffer object StreamingConsummer { def main(args: Array[String]): Unit = { try { val listener = new ServerSocket(10112); while (true) new ServerThread(listener.accept()).start(); listener.close() } catch { case e: IOException => System.err.println("Could not listen on port: 10112."); System.exit(-1) } } case class ServerThread(socket: Socket) extends Thread("ServerThread") { override def run(): Unit = { val rand = new Random(System.currentTimeMillis()); try { /*val in = socket.getInputStream() val buf = ListBuffer[Byte]() var b = in.read() while (b != -1) { println(buf) buf.append(b.byteValue) b = in.read() } buf.toArray buf.foreach(b=>println(b)) in.close(); socket.close()*/ val in = new ObjectInputStream(new DataInputStream(socket.getInputStream())); val filter = in.readObject() while(filter!=null){ println(filter) } in.close(); socket.close() } catch { case e: SocketException => () case e: IOException => e.printStackTrace(); } } } }2.socket之客户端
package com.test.sparkstreaming import java.net.InetAddress import java.io.ObjectOutputStream import java.io.IOException import java.io.DataInputStream import java.net.Socket import java.io.DataOutputStream import scala.io.Source import scala.util.Random object Client { def main(args: Array[String]): Unit = { /* val filter: Int => Boolean = try { Integer.parseInt(args(0)) match { case 1 => x: Int => x % 2 != 0 case 2 => x: Int => x % 2 == 0 case _ => x: Int => x != 0 } } catch { case _ => x: Int => x < 100 } try { val ia = InetAddress.getByName("localhost") val socket = new Socket(ia, 10111) val out = new ObjectOutputStream( new DataOutputStream(socket.getOutputStream())) val in = new DataInputStream(socket.getInputStream()) out.writeObject((1,2,3)) out.flush() while (true) { val x = in.readInt() println("x = " + x) } out.close() in.close() socket.close() } catch { case e: IOException => e.printStackTrace() }*/ val random = new Random() val maxEvent = 6 val names = Source.fromFile("E:\\users.csv") .getLines .toList .head .split(",") .toSeq val products = Seq( "iPhone Cover" -> 9.99, "HeadPhone" -> 5.49, "Samsung Galaxy" -> 8.96, "iPad Cover" -> 7.49) def generateProductEvent(n: Int) = { (1 to n).map { i => val (product, price) = products(random.nextInt(products.size)) val user = random.shuffle(names).head (user, product, price) } } try { val ia = InetAddress.getByName("localhost") val socket = new Socket(ia, 10112) val out = new ObjectOutputStream( new DataOutputStream(socket.getOutputStream())) val in = new DataInputStream(socket.getInputStream()) while(true){ Thread.sleep(1000) val num = random.nextInt(maxEvent) val productEvents = generateProductEvent(num) out.writeObject(productEvents) out.flush() } out.close() socket.close() } catch { case e: IOException => e.printStackTrace() } } }
相关文章推荐
- 公司技术的确定
- jraiser
- mv*
- JS利用cookie记忆当前位置的防刷新导航效果
- Codevs1648 最大和
- Facebook SDK导入eclipse-android
- 实现 oracle数据库转意数据到Mysql,最简单最直接
- 批量导出access某表内容到word文档
- 最长递增子序列-动态规划
- Everything 本地磁盘文件搜索工具下载!
- Everything 本地磁盘文件搜索工具下载!
- Everything 本地磁盘文件搜索工具下载!
- JSP内置对象request
- 大型网站之分布式会话管理
- Windows 内存绘图导出RGB
- 输入法的相关调用
- JavaScript学习之获取URL参数
- 冒泡排序算法
- 999句最常用英语口语
- grunt