您的位置:首页 > 编程语言 > Go语言

Zookeeper Golang客户端:go-zookeeper的基本使用

2017-08-25 00:00 591 查看

Zookeeper Golang客户端:go-zookeeper的基本使用

1.连接到ZK server端

var hosts = []string{"localhost:8000"}//server端host
conn, _, err := zk.Connect(hosts, time.Second*5)
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()


2.增删改查

增加节点

var path="/test"
var data=[]byte("hello zk")
var flags=0
//flags有4种取值:
//0:永久,除非手动删除
//zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除
//zk.FlagSequence  = 2:会自动在节点后面添加序号
//3:Ephemeral和Sequence,即,短暂且自动添加序号
var acls=zk.WorldACL(zk.PermAll)//控制访问权限模式

p,err_create:=conn.Create(path,data,flags,acls)
if err_create != nil {
fmt.Println(err_create)
return
}
fmt.Println("create:",p)

//输出 create:/test


删改与增不同在于其函数中的version参数,其中version是用于 CAS支持

关于CAS可以看这里

func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error)
func (c *Conn) Delete(path string, version int32) error


3.watch机制

Java API中是通过Watcher实现的,在go-zookeeper中则是通过Event。道理都是一样的

全局监听:

1.调用zk.WithEventCallback(callback)设置回调

2.调用conn.ExistsW(path) 或GetW(path)为对应节点设置监听,该监听只生效一次

//如下:
package main

import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"time"
)

var hosts = []string{"localhost:8000"}

var path1 = "/whatzk"

var flags int32 = zk.FlagEphemeral
var data1 = []byte("hello,this is a zk go test demo!!!")
var acls = zk.WorldACL(zk.PermAll)

func main() {
option := zk.WithEventCallback(callback)

conn, _, err := zk.Connect(hosts, time.Second*5, option)
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()

_, _, _, err = conn.ExistsW(path1)
if err != nil {
fmt.Println(err)
return
}

create(conn, path1, data1)

time.Sleep(time.Second * 2)

_, _, _, err = conn.ExistsW(path1)
if err != nil {
fmt.Println(err)
return
}
delete(conn, path1)

}

func callback(event zk.Event) {
fmt.Println("*******************")
fmt.Println("path:", event.Path)
fmt.Println("type:", event.Type.String())
fmt.Println("state:", event.State.String())
fmt.Println("-------------------")
}

func create(conn *zk.Conn, path string, data []byte) {
_, err_create := conn.Create(path, data, flags, acls)
if err_create != nil {
fmt.Println(err_create)
return
}

}

//输出:
*******************
path:
type: EventSession
state: StateConnecting
-------------------
*******************
path:
type: EventSession
state: StateConnected
-------------------
*******************
path:
type: EventSession
state: StateHasSession
-------------------
*******************
path: /whatzk
type: EventNodeCreated
state: Unknown
-------------------
*******************
path: /whatzk
type: EventNodeDeleted
state: Unknown
-------------------


部分监听:

1.调用conn.ExistsW(path) 或GetW(path)为对应节点设置监听,该监听只生效一次

2.开启一个协程处理chanel中传来的event事件

(注意:watchCreataNode一定要放在一个协程中,不能直接在main中调用,不然会阻塞main)

//部分代码如下:
func main() {
conn, _, err := zk.Connect(hosts, time.Second*5, option)
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()

_, _, ech, err := conn.ExistsW(path1)
if err != nil {
fmt.Println(err)
return
}

create(conn, path1, data1)

go watchCreataNode(ech)
}

func watchCreataNode(ech <-chan zk.Event){
event:=<-ech
fmt.Println("*******************")
fmt.Println("path:", event.Path)
fmt.Println("type:", event.Type.String())
fmt.Println("state:", event.State.String())
fmt.Println("-------------------")
}

//输出如下:
*******************
path: /whatyy
type: EventNodeCreated
state: Unknown
-------------------


注意:

1.如果即设置了全局监听有设置了部分监听,那么最终是都会触发的,并且全局监听在先执行

2.如果设置了监听子节点,那么事件的触发是先子节点后父节点

4.客户端随机hostname支持

ZK Java client端,相关链接:

(http://www.jianshu.com/p/1068d0896e65)

最终就是形成一个Round Robin并顺序从中选取。

//使用步骤如下:(相关代码位于dnshostprovider.go中)

var hosts = []string{"host1:8000","host2:8000","host3:8000"}
hostPro:=new(zk.DNSHostProvider)
err:=hostPro.Init(hosts)//先初始化
if err != nil {
fmt.Println(err)
return
}
server,retryStart:=hostPro.Next()//获得host
...
hostPro.Connected()  //连接成功后会调用
}

//上面的一系列步骤都集成在func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error)中
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: