FunDA(15)- 示范:任务并行运算 - user task parallel execution
2017-03-30 18:09
477 查看
FunDA的并行运算施用就是对用户自定义函数的并行运算。原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例。这些函数运行实例同时在各自不同的线程里同步运算直至耗尽所有输入。并行运算的具体函数实例数是用fs2-nondeterminism的算法根据CPU内核数、线程池配置和用户指定的最大运算实例数来决定的。我们在这次示范里可以对比一下同样工作内容的并行运算和串形运算效率。在前面示范里我们获取了一个AQMRPT表。但这个表不够合理化(normalized):state和county还没有实现编码与STATES和COUNTIES表的连接。在这次示范里我们就创建一个新表NORMAQM,把AQMRPT表内数据都搬进来。并在这个过程中把STATENAME和COUNTYNAME字段转换成STATES和COUNTIES表的id字段。下面就是NORMAQM表结构:
下面是这个表的初始化铺垫代码:
我们需要设计一个函数从STATES表里用AQMRPT表的STATENAME查询ID。我故意把这个函数设计成一个完整的FunDA程序。这样可以模拟一个比较消耗io和计算资源的独立过程(不要理会任何合理性,目标是增加io和运算消耗):
可以看到getStateID函数每次运算都重复构建stateStream。这样可以达到增加io操作的目的。
同样,我们也需要设计另一个函数来从COUNTIES表里获取id字段:
我们可以如下这样获取这个程序的数据源:
按照正常的FunDA流程我们设计了两个用户自定义函数:一个根据数据行内的state和county字段调用函数getStateID和getCountyID获取相应id后构建一条新的NORMAQM表插入指令行,然后传给下个自定义函数。下个自定义函数就直接运算收到的动作行:
像前面几篇示范那样我们把这两个用户自定义函数与数据源组合起来成为完整的FunDA程序后startRun就可以得到实际效果了:
这个程序运算了579秒,不过这是个单一线程运算。我们想知道并行运算结果。那么我们首先要把这个getIdsThenInsertAction转成一个并行运算函数FDAParTask:
FunDA提供了并行运算器fda_runPar:
我们可以自定义线程池。fda_runPar返回标准的FunDA FDAPipeLine,所以我们可以在后面挂上runInsertAction函数。下面是不同行数的运算时间对比结果:
可以得出,并行运算对越大数据集有更大的效率提高。下面就是这次示范的源代码:
case class NORMAQMModel(rid: Long , mid: Int , state: Int , county: Int , year: Int , value: Int , average: Int ) extends FDAROW class NORMAQMTable(tag: Tag) extends Table[NORMAQMModel](tag, "NORMAQM") { def rid = column[Long]("ROWID",O.AutoInc,O.PrimaryKey) def mid = column[Int]("MEASUREID") def state = column[Int]("STATID") def county = column[Int]("COUNTYID") def year = column[Int]("REPORTYEAR") def value = column[Int]("VALUE") def average = column[Int]("AVG") def * = (rid,mid,state,county,year,value,average) <> (NORMAQMModel.tupled, NORMAQMModel.unapply) } val NORMAQMQuery = TableQuery[NORMAQMTable]
下面是这个表的初始化铺垫代码:
val db = Database.forConfig("h2db") //drop original table schema val futVectorTables = db.run(MTable.getTables) val futDropTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.drop) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ") case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf) //create new table to refine AQMRawTable val actionCreateTable = Models.NORMAQMQuery.schema.create val futCreateTable = db.run(actionCreateTable).andThen { case Success(_) => println("Table created successfully!") case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table Await.ready(futCreateTable,Duration.Inf) //truncate data, only available in slick 3.2.1 val futTruncateTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.truncate) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!") case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf)
我们需要设计一个函数从STATES表里用AQMRPT表的STATENAME查询ID。我故意把这个函数设计成一个完整的FunDA程序。这样可以模拟一个比较消耗io和计算资源的独立过程(不要理会任何合理性,目标是增加io和运算消耗):
//a conceived task for the purpose of resource consumption //getting id with corresponding name from STATES table def getStateID(state: String): Int = { //create a stream for state id with state name implicit def toState(row: StateTable#TableElementType) = StateModel(row.id,row.name) val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _) val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq //constructed a Stream[Task,String] val stateStream = fda_staticSource(stateSeq)() var id = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case StateModel(stid,stname) => //target row type if (stname.contains(state)) { id = stid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } stateStream.appendTask(getid).startRun id }
可以看到getStateID函数每次运算都重复构建stateStream。这样可以达到增加io操作的目的。
同样,我们也需要设计另一个函数来从COUNTIES表里获取id字段:
//another conceived task for the purpose of resource consumption //getting id with corresponding names from COUNTIES table def getCountyID(state: String, county: String): Int = { //create a stream for county id with state name and county name implicit def toCounty(row: CountyTable#TableElementType) = CountyModel(row.id,row.name) val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _) val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq //constructed a Stream[Task,String] val countyStream = fda_staticSource(countySeq)() var id = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case CountyModel(cid,cname) => //target row type if (cname.contains(state) && cname.contains(county)) { id = cid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } countyStream.appendTask(getid).startRun id }
我们可以如下这样获取这个程序的数据源:
//original table listing implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid) val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) val AQMRPTStream = AQMRPTLoader.fda_typedStream(AQMRPTQuery.result)(db)(256,256)()
按照正常的FunDA流程我们设计了两个用户自定义函数:一个根据数据行内的state和county字段调用函数getStateID和getCountyID获取相应id后构建一条新的NORMAQM表插入指令行,然后传给下个自定义函数。下个自定义函数就直接运算收到的动作行:
def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { row match { case aqm: AQMRPTModel => if (aqm.valid) { val stateId = getStateID(aqm.state) val countyId = getCountyID(aqm.state,aqm.county) val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) fda_next(FDAActionRow(action)) } else fda_skip case _ => fda_skip } } val runner = FDAActionRunner(slick.jdbc.H2Profile) def runInsertAction: FDAUserTask[FDAROW] = row => row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case _ => fda_skip }
像前面几篇示范那样我们把这两个用户自定义函数与数据源组合起来成为完整的FunDA程序后startRun就可以得到实际效果了:
AQMRPTStream.take(10000) .appendTask(getIdsThenInsertAction) .appendTask(runInsertAction) .startRun
这个程序运算了579秒,不过这是个单一线程运算。我们想知道并行运算结果。那么我们首先要把这个getIdsThenInsertAction转成一个并行运算函数FDAParTask:
AQMRPTStream.toPar(getIdsThenInsertAction)
FunDA提供了并行运算器fda_runPar:
implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") fda_runPar(AQMRPTStream.take(100000).toPar(getIdsThenInsertAction))(8) //max 8 open computations .appendTask(runInsertAction) .startRun
我们可以自定义线程池。fda_runPar返回标准的FunDA FDAPipeLine,所以我们可以在后面挂上runInsertAction函数。下面是不同行数的运算时间对比结果:
//processing 10000 rows in a single thread in 570 seconds // processing 10000 rows parallelly in 316 seconds //processing 20000 rows in a single thread in 1090 seconds //processing 20000 rows parallelly in 614 seconds //processing 100000 rows in a single thread in 2+ hrs //processing 100000 rows parallelly in 3885 seconds
可以得出,并行运算对越大数据集有更大的效率提高。下面就是这次示范的源代码:
import slick.jdbc.meta._ import com.bayakala.funda._ import api._ import scala.language.implicitConversions import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} import slick.jdbc.H2Profile.api._ import Models._ import fs2.Strategy object ParallelTasks extends App { val db = Database.forConfig("h2db") //drop original table schema val futVectorTables = db.run(MTable.getTables) val futDropTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.drop) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ") case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf) //create new table to refine AQMRawTable val actionCreateTable = Models.NORMAQMQuery.schema.create val futCreateTable = db.run(actionCreateTable).andThen { case Success(_) => println("Table created successfully!") case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table Await.ready(futCreateTable,Duration.Inf) //truncate data, only available in slick 3.2.1 val futTruncateTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.truncate) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!") case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf) //a conceived task for the purpose of resource consumption //getting id with corresponding name from STATES table def getStateID(state: String): Int = { //create a stream for state id with state name implicit def toState(row: StateTable#TableElementType) = StateModel(row.id,row.name) val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _) val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq //constructed a Stream[Task,String] val stateStream = fda_staticSource(stateSeq)() var id = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case StateModel(stid,stname) => //target row type if (stname.contains(state)) { id = stid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } stateStream.appendTask(getid).startRun id } //another conceived task for the purpose of resource consumption //getting id with corresponding names from COUNTIES table def getCountyID(state: String, county: String): Int = { //create a stream for county id with state name and county name implicit def toCounty(row: CountyTable#TableElementType) = CountyModel(row.id,row.name) val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _) val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq //constructed a Stream[Task,String] val countyStream = fda_staticSource(countySeq)() var id = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case CountyModel(cid,cname) => //target row type if (cname.contains(state) && cname.contains(county)) { id = cid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } countyStream.appendTask(getid).startRun id } //original table listing implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid) val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) val AQMRPTStream = AQMRPTLoader.fda_typedStream(AQMRPTQuery.result)(db)(256,256)() def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { row match { case aqm: AQMRPTModel => if (aqm.valid) { val stateId = getStateID(aqm.state) val countyId = getCountyID(aqm.state,aqm.county) val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) fda_next(FDAActionRow(action)) } else fda_skip case _ => fda_skip } } val runner = FDAActionRunner(slick.jdbc.H2Profile) def runInsertAction: FDAUserTask[FDAROW] = row => row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case _ => fda_skip } val cnt_start = System.currentTimeMillis() /* AQMRPTStream.take(100000) .appendTask(getIdsThenInsertAction) .appendTask(runInsertAction) .startRun //println(s"processing 10000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 10000 rows in a single thread in 570 seconds //println(s"processing 20000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 20000 rows in a single thread in 1090 seconds //println(s"processing 100000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 100000 rows in a single thread in 2+ hrs implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") fda_runPar(AQMRPTStream.take(100000).toPar(getIdsThenInsertAction))(8) .appendTask(runInsertAction) .startRun //println(s"processing 10000 rows parallelly in ${(System.currentTimeMillis - cnt_start)/1000} seconds") // processing 10000 rows parallelly in 316 seconds //println(s"processing 20000 rows parallelly in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 20000 rows parallelly in 614 seconds println(s"processing 100000 rows parallelly in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 100000 rows parallelly in 3885 seconds }
相关文章推荐
- FunDA(15)- 示范:任务并行运算 - user task parallel execution
- FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading
- 重新想象 Windows 8 Store Apps (43) - 多线程之任务: Task 基础, 多任务并行执行, 并行运算(Parallel)
- FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading
- 重新想象 Windows 8 Store Apps (43) - 多线程之任务: Task 基础, 多任务并行执行, 并行运算(Parallel)
- FunDA(11)- 数据库操作的并行运算:Parallel data processing
- FunDA(11)- 数据库操作的并行运算:Parallel data processing
- FunDA(16)- 示范:整合并行运算 - total parallelism solution
- FunDA(16)- 示范:整合并行运算 - total parallelism solution
- C#5.0之后推荐使用TPL(Task Parallel Libray 任务并行库) 和PLINQ(Parallel LINQ, 并行Linq). 其次是TAP(Task-based Asynchronous Pattern, 基于任务的异步模式)
- FunDA(13)- 示范:用户自定义操作函数 - user defined tasks
- 一起谈.NET技术,VS2010&.Net 4.0 之并行运算(Parallel)(For、Foreach)
- FunDA(13)- 示范:用户自定义操作函数 - user defined tasks
- 精进不休 .NET 4.0 (5) - C# 4.0 新特性之并行运算(Parallel)
- 并行任务task
- activiti(九)个人任务userTask
- VS2010&.Net 4.0 之并行运算(Parallel)(For、Foreach)
- .NET使用Task动态创建多任务多线程并行程序计算Redis集群keys计算
- .NET 4.0 (5) - C# 4.0 新特性之并行运算(Parallel)
- Activiti学习笔记8 — UserTask私有任务的使用