打造先进的内存KV数据库-7 反射以及并发锁
2016-01-26 14:11
489 查看
反射
反射作为一种代码组织形式,带来了极大的不安全因素,同时也带来了许多便利之处,通过方法、对象、类型名称来获得具体实例,可以避免大量if-else分支,使得代码优雅,monkeyDB的服务端代码最后采用反射组织。并发锁
多线程访问同一资源时,需要对资源加锁,否则可能会得到预料之外的后果。由于内存数据库优越的读写性能,锁的粒度可以尽量大,monkeyDB使用库级锁(相当于表级锁)来保证线程安全。MonkeyDB项目已开源至 https://github.com/InsZVA/MonkeyDB 采用GPL协议
代码
//server.go package main // #cgo LDFLAGS: -L ./lib -lmonkeyS // #include "./lib/core.h" // #include <stdlib.h> import "C" import ( "unsafe" "fmt" "net" "strings" "./tcp" "./convert" "reflect" "./minheap" "strconv" "flag" "sync" ) ////////////////////////////////////// 库级锁 //////////////////////////////////////////////////////////////// var mutex map[C.Database]*sync.RWMutex ////////////////////////////////////// MinHeap //////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////// command类型 用于解析处理各种数据库命令 ////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////////////// var ( acceptedCmd = []string{"set","get","delete","remove","createdb","switchdb","dropdb",/*"push","pop","destroy",*/"listdb"} port = flag.String("p","1517","侦听端口号") ) type command []byte func (cmd command) Set(db **C.Database) []byte { //锁并发 _,ok := mutex[**db]; if !ok { mutex[**db] = new(sync.RWMutex) } mutex[**db].Lock() defer mutex[**db].Unlock() response := []byte{} key,next := convert.ParseUntil(cmd,' ',4) value,_ := convert.ParseUntil(cmd,0,next+1) //fmt.Println("set ",key,value) r := C.Set(&(*db).tIndex,(*C.char)(convert.Bytes2C(key)),(convert.Bytes2C(value))) for i := 0;;i++ { response = append(response,byte(r.msg[i])) if response[i] == 0 { break; } } return response } func (cmd command) Get(db **C.Database) []byte { //锁并发 _,ok := mutex[**db]; if !ok { mutex[**db] = new(sync.RWMutex) } mutex[**db].RLock() defer mutex[**db].RUnlock() response := []byte{} key,_ := convert.ParseUntil(cmd,0,4) r := C.Get(&(*db).tIndex,(*C.char)(convert.Bytes2C(key))) if int(r.code) == 0 { for i := 0;;i++ { response = append(response,byte(*(*C.char)(unsafe.Pointer((uintptr(r.pData)+uintptr(i)))))) if response[i] == 0 { break; } } }else { } return response } func (cmd command) Delete(db **C.Database) []byte { return cmd.Remove(db) } func (cmd command) Remove(db **C.Database) []byte { //锁并发 _,ok := mutex[**db]; if !ok { mutex[**db] = new(sync.RWMutex) } mutex[**db].Lock() defer mutex[**db].Unlock() response := []byte{} key,_ := convert.ParseUntil(cmd,0,7) r := C.Delete(&(*db).tIndex,(*C.char)(convert.Bytes2C(key))) for i := 0;;i++ { response = append(response,byte(r.msg[i])) if response[i] == 0 { break; } } return response } func (cmd command) Createdb(db **C.Database) []byte { response := []byte{} key,_ := convert.ParseUntil(cmd,0,9) d := C.CreateDB((*C.char)(convert.Bytes2C(key))) if d != nil { *db = d response = []byte("Already exist,switched\n") }else { response = []byte("Created\n") } return response } func (cmd command) Switchdb(db **C.Database) []byte { response := []byte{} key,_ := convert.ParseUntil(cmd,0,9) d := C.SwitchDB((*C.char)(convert.Bytes2C(key))) if d != nil { *db = d response = []byte("ok\n") }else { response = []byte("fail\n") } return response } func (cmd command) Dropdb(db **C.Database) []byte { //锁并发 _,ok := mutex[**db]; if !ok { mutex[**db] = new(sync.RWMutex) } mutex[**db].Lock() defer mutex[**db].Unlock() response := []byte{} key,_ := convert.ParseUntil(cmd,0,7) *db = C.DropDB((*C.char)(convert.Bytes2C(key))) return response } func (cmd command) Listdb(db **C.Database) []byte { response := []byte{} r := C.ListDB() for i := 0;i < 1024;i++ { b := byte(*(*C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(r))+uintptr(i)))) response = append(response,b) if(b == 0){ break; } } C.free(unsafe.Pointer(r)) return response } func (cmd command) Push(db **C.Database) []byte { var response []byte key,next := convert.ParseUntil(cmd,' ',5) key2,next := convert.ParseUntil(cmd,' ',next+1) value,_ := convert.ParseUntil(cmd,0,next+1) var i int btnode := C.BTree_search((*db).tIndex,(C.KeyType)(C.GetHash((*C.char)(convert.Bytes2C(key)))),(*C.int)(unsafe.Pointer(&i))) if uintptr(unsafe.Pointer(btnode)) != uintptr(0) { //B树上有此堆 heap := (*minheap.MinHeap)(btnode.pRecord[i]) //BUG GO GC 会回收掉上次的内存 k,_ := strconv.Atoi(string(key2)) v,_ := strconv.Atoi(string(value)) heap.Push(minheap.Pair{Key:uint32(k),Value:uint32(v)}) response = []byte("Pushed") }else { //建一个新的堆 heap := minheap.New() C.BTree_insert(&(*db).tIndex, (C.KeyType)(C.GetHash((*C.char)(convert.Bytes2C(key)))), unsafe.Pointer(&heap)) k,_ := strconv.Atoi(string(key2)) v,_ := strconv.Atoi(string(value)) heap.Push(minheap.Pair{Key:uint32(k),Value:uint32(v)}) response = []byte("Created and pushed") } return response } ////////////////////////////////////////////////////////////////////////////////////////////////////////////// func initDB() { //初始化数据库 str0 := "monkey" C.CreateDB((*C.char)(convert.String2C(str0))) //创建基础数据库 str0 = "config" //配置数据库 C.CreateDB((*C.char)(convert.String2C(str0))) key := "passwd" //初始密码 data := "monkey" db := C.SwitchDB((*C.char)(convert.String2C(str0))) C.Set(&(db.tIndex),(*C.char)(convert.String2C(key)),(convert.String2C(data))) } func listen() { servicePort := ":"+*port tcpAddr,err := net.ResolveTCPAddr("tcp4",servicePort) if err != nil { panic(err) } l,err := net.ListenTCP("tcp",tcpAddr) //侦听TCP if err != nil { panic(err) } fmt.Println("Server Started on "+*port+"!") for{ conn,err := l.AcceptTCP() conn.SetKeepAlive(true) conn.SetNoDelay(true) if err != nil { panic(err) } s := tcp.TCPSession{Conn:conn} s.Init() go Handler(&s) } } func main() { flag.Parse() //初始化锁 mutex = make(map[C.Database]*sync.RWMutex) initDB() listen() } func auth(s *tcp.TCPSession) bool { buff := s.ReadMessage() params := strings.Split(string(buff)," ") str0 := "config" db := C.SwitchDB((*C.char)(convert.String2C(str0))) if params[0] != "auth" { s.SendMessage([]byte("Please auth first!")) return false } r := C.Get(&(db.tIndex),(*C.char)(convert.String2C("passwd"))) if int(r.code) == 0 { passwd := []byte{} for i := 0;;i++ { passwd = append(passwd,byte(*(*C.char)(unsafe.Pointer((uintptr(r.pData)+uintptr(i)))))) if passwd[i] == 0 { break } } if convert.Equal(passwd,[]byte(params[1])) { s.SendMessage([]byte("Auth success")) return true } else { s.SendMessage([]byte("Auth fail")) return false } }else { s.SendMessage([]byte("Auth success")) return true } } func Handler(s *tcp.TCPSession) { for !auth(s){ } str := "monkey" db := C.SwitchDB((*C.char)(convert.String2C(str)))//环境变量-当前数据库 for { if s.Closed { return } buff := s.ReadMessage() // if err != nil { // conn.Close() // break // } if len(buff) == 0 { return } //commands := bytes.Split(buff,[]byte{0}) //for _,cmd := range commands { TranslateMessage2(s,&db,buff) //} //解析消息 } } func TranslateMessage2(s *tcp.TCPSession,db **C.Database,message []byte) { com := command(message) //fmt.Println("处理:",string(message)) response := []byte{} for _,cmd := range acceptedCmd { if convert.StartBy(message,cmd) { result := reflect.ValueOf(com).MethodByName(convert.UpperHead(cmd)).Call([]reflect.Value{reflect.ValueOf(db)}) response = result[0].Interface().([]byte) break } } s.SendMessage(response) } ////////////////////////////////////////Dumplated////////////////////////////////////////////////////////////////////////// func TranslateMessage(s *tcp.TCPSession,db **C.Database,message []byte) { command := string(message) params := strings.Split(command," ") //fmt.Println(params) response := []byte{} if params[0] == "set" { r := C.Set(&(*db).tIndex,(*C.char)(convert.String2C(params[1])),(convert.String2C(params[2]))) for i := 0;;i++ { response = append(response,byte(r.msg[i])) if response[i] == 0 { break; } } }else if params[0] == "get" { r := C.Get(&(*db).tIndex,(*C.char)(convert.String2C(params[1]))) // for i := 0;;i++ { // response = append(response,byte(r.msg[i])) // if response[i] == 0 { break; } // } if int(r.code) == 0 { for i := 0;;i++ { response = append(response,byte(*(*C.char)(unsafe.Pointer((uintptr(r.pData)+uintptr(i)))))) if response[i] == 0 { break; } } }else { // for i := 0;;i++ { // response = append(response,byte(r.msg[i])) // if response[i] == 0 { break; } // } } }else if params[0] == "delete" || params[0] == "remove" { r := C.Delete(&(*db).tIndex,(*C.char)(convert.String2C(params[1]))) for i := 0;;i++ { response = append(response,byte(r.msg[i])) if response[i] == 0 { break; } } }else if params[0] == "createdb" { d := C.CreateDB((*C.char)(convert.String2C(params[1]))) if d != nil { *db = d response = []byte("Already exist,switched\n") }else { response = []byte("Created\n") } }else if params[0] == "switchdb" { d := C.SwitchDB((*C.char)(convert.String2C(params[1]))) if d != nil { *db = d response = []byte("ok\n") }else { response = []byte("fail\n") } }else if params[0] == "dropdb" { *db = C.DropDB((*C.char)(convert.String2C(params[1]))) }else if strings.EqualFold("listdb",params[0]) { r := C.ListDB() for i := 0;i < 1024;i++ { b := byte(*(*C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(r))+uintptr(i)))) response = append(response,b) if(b == 0){ break; } } C.free(unsafe.Pointer(r)) }else { //fmt.Println("unkown command:",params[0]) } s.SendMessage(response) }
相关文章推荐
- sql server 存储过程,事务
- ora-14400插入的分区关键字未映射到任何分区---oracle数据库表过期问题
- PL/SQL编辑数据"这些查询结果不可更新,请包括ROWID或使用SELECT...FOR UPDATE获得可更新结果"处理
- mssql的"[]" 标识符的问题
- MongoDB--Getting Started with Java Driver
- MySQL中优化常用的查询sql语
- MySQL中优化常用的查询sql语
- Oracle spool 用法小结
- oracle查询非数字字段的数据
- Linux_Ubuntu_安装mongoDB
- How to use STA(sql tuning advisor)
- mysql join on 联表
- java解决对US7ASCII编码的ORACLE数据库数据解码
- 对事务的理解
- 安全测试之SQL注入攻击
- Mysql手动安装
- 2016.1.23 nosql
- 为什么如果数据库导入data.sql,则zabbix_proxy服务无法启动?
- MySQL DBA 成长之路
- bboss持久层多数据源配置及多数据库事务控制使用方法