利用etcd事务机制可以实现分布式锁,利用分布式锁可以做主备切换功能。
Etcd分布式锁实现方式如下:利用etcd事务机制向etcd创建key,若key不存在则创建key,获取锁成功,若key存在则获取锁失败.
主备切换功能实现思路如下:各程序向etcd抢锁,抢到锁即为主,没抢到为从,从程序会监视锁是否释放,若锁释放重新抢锁。考虑到在网络不稳定的情况下,可能出现已经成为主的程序失去了锁的拥有,某一个从程序抢到了锁,但是已经成为主的程序并不知道自己失去了锁,因此成为主的程序需要抢到锁后不断查询目前锁是否为自己拥有,若已经失去则标记自己为从并且重新抢锁。
具体实现如下:
获取锁
func (l *Locker) Lock(pfx string, endpoints []string) error {
//新建etcd客户端
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
logs.Error(err)
return err
}
// defer cli.Close()
cliCtx := cli.Ctx()
//设置租约时间1秒
lgResp, err := cli.Grant(cliCtx, leaseTTL)
if err != nil {
logs.Error(err)
return err
}
leaseID := lgResp.ID
ctx, cancel := context.WithCancel(cliCtx)
//自动续租
keepAlive, err := cli.KeepAlive(ctx, leaseID)
if err != nil || keepAlive == nil {
cancel()
logs.Error(err)
return err
}
//消费掉租约信息
go func() {
for range keepAlive {
}
}()
for {
cmp := clientv3.Compare(clientv3.CreateRevision(pfx), "=", 0)
put := clientv3.OpPut(pfx, "", clientv3.WithLease(leaseID))
get := clientv3.OpGet(pfx)
//创建事务并提交,查看key是否存在,若key不存在则创建key,否则查询key
resp, err := cli.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
if err != nil {
cancel()
logs.Error(err)
return err
}
//判断是否获取到锁
if !resp.Succeeded {
logs.Info("acquired lock for failed, waiting delete")
//若获取锁失败则监视key删除事件,若key删除,则表示释放了锁,程序重新抢锁
err := waitDelete(ctx, cli, pfx)
if err != nil {
cancel()
logs.Error(err)
return err
}
} else {
logs.Info("acquired lock Succeed")
l.cli = cli
l.ctx = ctx
l.pfx = pfx
l.rev = resp.Header.Revision
l.id = leaseID
return nil
}
}
}
监视是否拥有锁
func (l *Locker) UnLock(timeout time.Duration) {
var err error
var resp *clientv3.GetResponse
for {
//设置超时时间,0表示不超时
if timeout == 0 {
resp, err = l.cli.Get(l.ctx, l.pfx)
} else {
ctxTimeout, cancel := context.WithTimeout(l.ctx, timeout)
//查询锁key
resp, err = l.cli.Get(ctxTimeout, l.pfx)
cancel()
}
if err != nil {
logs.Debug(err)
// l.cli.Revoke(l.ctx, l.id)
l.cli.Close()
return
}
// logs.Debug(resp)
//判断key是否存在
if len(resp.Kvs) == 0 {
// l.cli.Revoke(l.ctx, l.id)
l.cli.Close()
return
}
//判断key是否为自己创建的
if l.rev != resp.Kvs[0].CreateRevision {
// l.cli.Revoke(l.ctx, l.id)
l.cli.Close()
return
}
time.Sleep(leaseTTL * time.Second)
}
}
使用方式如下:
var isMaster chan bool
func main() {
isMaster = make(chan bool)
go func() {
for {
// l := &lock.OrderLocker{}
l := &lock.Locker{}
// //获取分布式锁,若key已经存在,该函数会一直阻塞,直到获取锁
// err := l.OrderLock("/lock", []string{"localhost:2379"})
err := l.Lock("/lock/", []string{"localhost:2379"})
if err != nil {
panic(err)
}
isMaster <- true
// 监视锁是否为自己的,若锁没有被意外释放,该函数会一直阻塞,参数为查询的超时时间,若为0则不超时
// l.OrderUnLock(time.Second * 5)
l.UnLock(time.Second * 5)
// l.UnLock(0)
isMaster <- false
}
}()
for {
select {
case b := <-isMaster:
fmt.Println("Become master", b)
// c := make(chan bool)
// <-c
}
}
}
参考
https://studygolang.com/articles/16307?fr=sidebar
https://www.jianshu.com/p/d3068d0ac7c1
https://yq.aliyun.com/articles/70546
"go.etcd.io/etcd/clientv3/concurrency"包的example_mutex_test.go/session.go/mutex.go/key.go
网友评论