Java与SparkStreaming的Socket通信,结果保存至mysql
2017-02-27 21:41
495 查看
Java与SparkStreaming Socket通信
还是做毕业设计,到这里出现了与预想中的问题:Java与SparkStreaming,这两者之间互相通信。 Spark的官方文档里面有说用[nc -lk 9999][6] 这样的例子来实现Streaming的数据获取。之前一直是分开来做的,今天两个整合的时候,预想到的问题出现了。整理下整个毕设后端的思路:
1 在Spring里面配置了一个bean,然后提交到taskExecutor :
2 bean 里面就实现Socket 跟SparkStreaming 通信:
[Spring的配置官方的文档][6]里面有说, 我的配置如下:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5"></property> <property name="maxPoolSize" value="10"></property> <property name="queueCapacity" value="25"></property> </bean> <bean id="socketServer" class="com.yhj.beens.inner.ServerThread"> <constructor-arg ref="taskExecutor"></constructor-arg> </bean>
bean的实现原来以为跟之前的Socket一样:
output = new DataOutputStream(this.socket.getOutputStream());
输出:
输出: output.writeUTF(s);
Streaming接收:
val ssc = new StreamingContext(conf, Seconds(10)) val lines = ssc.socketTextStream("192.168.5.101", 10020) //40 val words = lines.flatMap(_.split(" "))
如果用这种方式获取的流会在Spark 里面处理不好弄,我这里测的是处理完数据库里没什么也没有(我的处理结果保存在数据库),要是把上面代码中split中的[分隔符][6]去掉就会把输入的字符串逐个分解开(我很郁闷,我很不懂这里,要是有同学知道为什么可以留言我们一起讨论)。
SparkAPI 写了 socketTextStream 方法接收Socket的格式。这里 我看见网上有 说这种方式也通过SQL 从数据库里面获取数据,我没成功。
我这正确的方法:
Socket流:
String s = "帮同事买的比实体店便宜送货很快安装也很迅速第二天就来装了同事很满意\r\n"; // 回车换行 /*writer = new OutputStreamWriter(socket.getOutputStream());*/ writer.write(s); writer.flush();//这里其实我一直排斥用这个,一直没用后来加了就好了。 缓冲区啊~ 缓冲区 Thread.sleep(5000);
Streaming接收的地方不变。
一定要有回车换行
数据库连接池部分:
object scalaConnectPool { val log = Logger.getLogger(scalaConnectPool.this.getClass) var ds: BasicDataSource = null def getDataSource = { if (ds == null) { ds = new BasicDataSource() ds.setUsername("root") ds.setPassword("123456") ds.setUrl("jdbc:mysql://127.0.0.1:3306/db_shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=true") ds.setDriverClassName("com.mysql.jdbc.Driver") ds.setInitialSize(20) ds.setMaxActive(100) ds.setMinIdle(50) ds.setMaxIdle(100) ds.setMaxWait(1000) ds.setMinEvictableIdleTimeMillis(5 * 60 * 1000) ds.setTimeBetweenEvictionRunsMillis(10 * 60 * 1000) ds.setTestOnBorrow(true) } ds } def getConnection: Connection = { var connect: Connection = null try { if (ds != null) { connect = ds.getConnection } else { connect = getDataSource.getConnection } }catch { case t: Throwable => t.printStackTrace() // TODO: handle error } connect } def shutDownDataSource: Unit = if (ds != null) { ds.close() } def closeConnection(rs: ResultSet, ps: PreparedStatement, connect: Connection): Unit = { if (rs != null) { rs.close } if< 4000 /span> (ps != null) { ps.close } if (connect != null) { connect.close } } }
写入数据库:
pairs_1.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connect = scalaConnectPool.getConnection connect.setAutoCommit(false) val stmt = connect.createStatement() partitionOfRecords.foreach(record => { stmt.addBatch("insert into tb_cominfo (comdate,comdetail,comjudge) values (now(),'" + record._1 + "','" + record._2 + "')") }) stmt.executeBatch() connect.commit() }) })
结果:
到此为止原理都通了。
有的代码是以前看别人的博客的, 时间太长我不记得了。
相关文章推荐
- Spark Streaming createDirectStream保存kafka offset(JAVA实现)
- Java 实现SparkSQL保存查询结果带有字段信息到(header)HDFS
- Spark Streaming---PersistMySQL Work Count(java)
- kafka + spark streaming 实时读取计算 nginx 日志,存储结果到 mongodb/mysql
- Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版
- Spark使用Java读取mysql数据和保存数据到mysql
- 如何将spark streaming处理结果保存到关系型数据库中
- 如何将spark streaming处理结果保存到关系型数据库中
- flex与java之间socket通信的好教程推荐.
- 跨语言之间的socket通信(C--Java的握手)
- java.net.SocketException: Broken pipe /Mysql在经过8小时不使用后会自动关闭已打开的连接
- java即时通信解决方案openfire+spark完整安装指南
- java 网络编程(2.4)-----------采用线程池多线程的Socket 通信
- delphi和 java通过socket通信的中文问题
- java. socket 通信之后要 flush
- java即时通信解决方案openfire+spark完整安装指南
- 将MySQL中sql运行结果保存到文件
- java 网络编程(2.1)-----------多线程的Socket 通信
- java 网络编程(1)-----------最简单的Socket 通信
- Java通信编程之Socket入门