您的位置:首页 > 数据库 > Redis

go读取超大文件内容并发写入到redis的功能

2017-09-02 00:00 501 查看
摘要: 简易版,将文件拆分成多份启动多个gorouting并发写入到redis

连接redis使用的github.com/garyburd/redigo/redis 包

redis连接代码如下: library/redis.go

package library

import (
"github.com/garyburd/redigo/redis"
)

//redis配置文件使用map来存储
func ConnectRedis(config map[string]string) (redis.Conn, error) {
server := config["host"] + ":" + config["port"]
conn, err := redis.Dial("tcp", server)

if err != nil {
return nil, err
}
if config["auth"] != "" {
if _, err := conn.Do("AUTH", config["auth"]); err != nil {
conn.Close()
return nil, err
}
}

return conn, nil
}

main.go

package main

import (
"bufio"
"bytes"
"fmt"
"io"
"library"
"os"
"runtime"
"strings"
"sync"
"time"

"github.com/garyburd/redigo/redis"
)

var wg sync.WaitGroup

type midList struct {
mid []string
}

//字符串插入到redis中
func (i midList) insertMidToRedis() {

redisCon := make(map[string]string)
redisCon["host"] = "10.222.4.221"
redisCon["port"] = "16560"
redisCon["auth"] = "xxxxxxxxxxxxx"
redisCon["prefix"] = "minihelper:dislike:"

//获取redis连接
redisObj, err := library.ConnectRedis(redisCon)

if err != nil {
fmt.Println("connect redis error", err)
}
defer redisObj.Close()

var buf bytes.Buffer
for _, mid := range i.mid {
if length := len(mid); length != 32 {
continue
} else {
//写入set集合
all_mid_set := redisCon["prefix"] + "user_choose:set"
if exists, err := redis.Int(redisObj.Do("SISMEMBER", all_mid_set, mid)); err != nil {
_, _, line, _ := runtime.Caller(0)
fmt.Println("line: ", line, " redis execute SISMEMBER error", err)
break
} else {
if exists == int(0) {
if _, err := redisObj.Do("SADD", all_mid_set, mid); err != nil {
_, _, line, _ := runtime.Caller(0)
fmt.Println("line: ", line, " redis execute SADD error", err)
fmt.Println("mid is:", mid)
break
} else {
fmt.Println("sadd mid: ", mid, " success")
}
}
}
//写入string键
buf.Reset()
buf.WriteString(redisCon["prefix"])
buf.WriteString(mid)
mid := buf.String()
if exists, err := redis.Bool(redisObj.Do("EXISTS", mid)); err != nil {
_, _, line, _ := runtime.Caller(0)
fmt.Println("line: ", line, " redis execute exists error", err)
fmt.Println("previous mid is:", mid)
break
} else {
if exists == true {
continue
} else {
_, err := redisObj.Do("SET", mid, 1)

if err != nil {
_, _, line, _ := runtime.Caller(0)
fmt.Println("line: ", line, " redis execute set error", err)
break
}
redisObj.Do("EXPIRE", mid, 10*24*3600)
fmt.Println("insert mid: ", mid, " success")
}

}

}

}
wg.Done()
return
}

func main() {
fmt.Println("script begin.... ", time.Now().Format("2006-01-0
3ff0
2 15:04:05"))
timeObj := time.Now()
yesTime := timeObj.AddDate(0, 0, -1)
yesterday := yesTime.Format("20060102")
mid_file := "/data/dislike/triggerdislike/" + yesterday + "/uniq"

fp, err := os.Open(mid_file)
defer fp.Close()
if err != nil {
fmt.Println(mid_file, err)
return
}

var contents []midList
var content midList
buf := bufio.NewReader(fp)
iterator := 0
for {
line, err := buf.ReadString('\n')

if err != nil {
if err == io.EOF {
//                fmt.Println("meet file ending")
} else {
fmt.Println(err)
}
break
}
iterator++

mid := strings.TrimSpace(line)
//将文件切割成每2000行一份
if iterator <= 2000 {
content.mid = append(content.mid, mid)
} else {
iterator = 1
contents = append(contents, content)
content.mid = nil
content.mid = append(content.mid, mid)
}

}
contents = append(contents, content)
for _, mid_list := range contents {
wg.Add(1)   //这里也可以直接使用chan来完成通信
go mid_list.insertMidToRedis()
}
wg.Wait()

fmt.Println("script end.... ", time.Now().Format("2006-01-02 15:04:05"))
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: