您的位置:首页 > 其它

Scala之Demo2

2015-10-15 09:31 387 查看
1.socket服务端

package com.test.sparkstreaming

import java.io.ObjectInputStream
import java.io.IOException
import java.net.SocketException
import java.io.DataInputStream
import java.net.Socket
import java.io.DataOutputStream
import scala.util.Random
import java.net.ServerSocket
import scala.collection.mutable.ListBuffer

object StreamingConsummer {

def main(args: Array[String]): Unit = {

try {
val listener = new ServerSocket(10112);
while (true)
new ServerThread(listener.accept()).start();
listener.close()
} catch {
case e: IOException =>
System.err.println("Could not listen on port: 10112.");
System.exit(-1)
}
}

case class ServerThread(socket: Socket) extends Thread("ServerThread") {

override def run(): Unit = {

val rand = new Random(System.currentTimeMillis());

try {

/*val in = socket.getInputStream()

val buf = ListBuffer[Byte]()
var b = in.read()
while (b != -1) {

println(buf)
buf.append(b.byteValue)
b = in.read()

}
buf.toArray
buf.foreach(b=>println(b))

in.close();
socket.close()*/

val in = new ObjectInputStream(new DataInputStream(socket.getInputStream()));

val filter = in.readObject()

while(filter!=null){
println(filter)
}
in.close();
socket.close()

} catch {
case e: SocketException => ()
case e: IOException => e.printStackTrace();
}
}

}
}
2.socket之客户端

package com.test.sparkstreaming

import java.net.InetAddress
import java.io.ObjectOutputStream
import java.io.IOException
import java.io.DataInputStream
import java.net.Socket
import java.io.DataOutputStream
import scala.io.Source
import scala.util.Random

object Client {

def main(args: Array[String]): Unit = {

/* val filter: Int => Boolean = try {
Integer.parseInt(args(0)) match {
case 1 => x: Int => x % 2 != 0
case 2 => x: Int => x % 2 == 0
case _ => x: Int => x != 0
}
} catch {
case _ => x: Int => x < 100
}

try {
val ia = InetAddress.getByName("localhost")
val socket = new Socket(ia, 10111)
val out = new ObjectOutputStream(
new DataOutputStream(socket.getOutputStream()))
val in = new DataInputStream(socket.getInputStream())

out.writeObject((1,2,3))
out.flush()

while (true) {
val x = in.readInt()
println("x = " + x)
}
out.close()
in.close()
socket.close()
} catch {
case e: IOException =>
e.printStackTrace()
}*/

val random = new Random()
val maxEvent = 6

val names = Source.fromFile("E:\\users.csv")
.getLines
.toList
.head
.split(",")
.toSeq

val products = Seq(

"iPhone Cover" -> 9.99,
"HeadPhone" -> 5.49,
"Samsung Galaxy" -> 8.96,
"iPad Cover" -> 7.49)

def generateProductEvent(n: Int) = {
(1 to n).map { i =>

val (product, price) = products(random.nextInt(products.size))

val user = random.shuffle(names).head

(user, product, price)
}
}

try {

val ia = InetAddress.getByName("localhost")
val socket = new Socket(ia, 10112)
val out = new ObjectOutputStream(
new DataOutputStream(socket.getOutputStream()))
val in = new DataInputStream(socket.getInputStream())

while(true){

Thread.sleep(1000)
val num = random.nextInt(maxEvent)
val productEvents = generateProductEvent(num)

out.writeObject(productEvents)
out.flush()

}

out.close()
socket.close()

} catch {
case e: IOException =>
e.printStackTrace()
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: