etcd-java使用

简介

项目地址

特性

Accepts/exposes protobuf-generated types directly to minimize overhead(请求头)

Simpler fluent versions of KV primitives for making async or sync requests(fluent api的方式方便制定sync还是async)

Automatic retries and (re)authentication

Resilient(可快速恢复的) and scalable Lease and Watch abstractions with semantics aligned to most typical usage

Underlying stream RPCs only kept open for as long as they are needed

Comes with some helpful utility classes with local cache, persistent key and leader election abstractions

Simple declarative client configuration (optional) - encapsulate per-cluster connection details in a

JSON document

Support for connecting to multi-endpoint IBM Compose etcd deployments over TLS

Currently doesn't cover all parts of the etcd-server API, in particular those related to cluster administration(集群管理) such as maintenance, cluster and auth management

kv操作

获取client

KvStoreClient client = EtcdClient.forEndpoint("localhost", 2379).withPlainText().build();

//KvStoreClient是一个接口,定义了如下三个client方法

KvClient kvClient = client.getKvClient();

LeaseClient leaseClient = client.getLeaseClient();

LockClient lockClient = client.getLockClient();

EtcdClient实现了Closeable接口的close方法,所以可以使用try-resource的语法,自动释放资源。示例代码见ClientBuilderTest

单个endpoint

try (KvStoreClient client = EtcdClient.forEndpoint("localhost", 2379).withPlainText().build()) {}

多个endpoint

try (KvStoreClient client = EtcdClient.forEndpoints(

"localhost:2379,http://localhost:2379,https://localhost:2379,dns:///localhost:2379")

.withPlainText().build()) {}

put操作

basic put

kvc.put(a, v2).sync();

batch put

kvc.batch().put(pr1).put(pr2).sync();

put with lease

//先创建lease

LeaseGrantResponse lgr = leaseClient.grant(2).sync();

//put的时候指定lease id

//如果lease不存在会抛出异常

kvc.put(bs("hello"), bs("world"), lgr.getID()).sync();

put后获取信息

获取上一次的值

如果key没有对应的值,返回的是KeyValue.getDefaultInstance()

如果之前key有对应的指,返回之前key对应的值

kvc.put(a, v2).prevKv().sync().getPrevKv();

获取header信息

kvc.put(a, v2).sync().getHeader();

get操作

basic get

同步get

RangeResponse rr = kvc.get(bs("a")).sync();

异步get

ListenableFuture rrFut1 = kvc.get(bs("new")).async();

rrFut1.get(1000, TimeUnit.SECONDS); //指定超时时间

batch get

TxnResponse txnResponse = kvc.batch().get(kvc.get(a).asRequest()).get(kvc.get(b).asRequest()).sync();

assertEquals(v1, txnResponse.getResponses(0).getResponseRange().getKvs(0).getValue());

assertEquals(v2, txnResponse.getResponses(1).getResponseRange().getKvs(0).getValue());

指定重试策略

ListenableFuture rrFut2 = kvc.get(bs("new"))

.deadline(Deadline.after(20, TimeUnit.SECONDS))

.backoffRetry().async(); // should work

delete操作

basic delete

获取删除的个数

kvc.delete(bs("notthere")).sync().getDeleted()

获取删除的key对应的value

kvc.delete(a).prevKv().sync().getPrevKvs(0).getValue()

batch delete

kvc.batch().delete(kvc.delete(a).asRequest())

.delete(kvc.delete(b).asRequest()).sync();

transaction

ListenableFuture tresp = kvc.txnIf().cmpEqual(a).value(v1)

.and().cmpNotEqual(b).version(10)

.then().put(kvc.put(bs("new"), bs("newval")).asRequest()).async();

lease操作

普通lease

grant

LeaseGrantResponse lgr = leaseClient.grant(5L).sync(); //不指定leaseId

lgr.getID(); //获取id

lgr.getTTL(); //存活时间

leaseClient.grant(10L).leaseId(456L).sync(); //指定leaseId

keep-alive

leaseClient.keepAliveOnce(id).get(1, SECONDS)

revoke

leaseClient.revoke(id).get(1, SECONDS);

assertEquals(-1L, leaseClient.ttl(id).get().getTTL());//撤销后ttl变成-1

persistent lease

basic

PersistentLease pl2 = lc.maintain().minTtl(minTtl).keepAliveFreq(kaFreq).start();

long newLeaseId = pl2.get(20, SECONDS);

with StreamObserver

PersistentLease pl = lc.maintain().minTtl(minTtl).keepAliveFreq(kaFreq)

.start(new StreamObserver() {

@Override

public void onNext(LeaseState value) {

System.out.println(t(start) + "PL state change: " + value);

observerEvents.add(value);

}

@Override

public void onError(Throwable t) {

System.out.println(t(start) + "PL error: " + t);

observerEvents.add(t);

}

@Override

public void onCompleted() {

System.out.println(t(start) + "PL completed");

observerEvents.add(COMPLETED);

}

});

list client lease

需要注意的是persistent lease是基于client的会话的,如果client挂了的话,lease是会自动释放的

Set lidsFound = client2.getLeaseClient().list().get()

.getLeasesList().stream().map(LeaseStatus::getID).collect(Collectors.toSet());

lock操作

同一个客户端lock不同的lease会报错,详细见LockTest.testWithContention

普通lock

LockResponse lr = lockClient.lock(KeyUtils.bs("mylock"))

.deadline(Deadline.after(500, TimeUnit.MILLISECONDS)).sync(); //获取锁

assertTrue(kvClient.txnIf().exists(lr.getKey()).sync().getSucceeded());

assertNotNull(lockClient.unlock(lockKey).sync()); //释放锁

assertFalse(kvClient.txnIf().exists(lockKey).sync().getSucceeded());

lock使用lease

LockResponse lr = lockClient.lock(KeyUtils.bs("mylock2")).withLease(lgr.getID()).sync();

lock使用persistent lease

PersistentLease pl = lec.maintain().start();

LockResponse lr = lockClient.lock(KeyUtils.bs("mylock3")).withLease(pl).sync();

//如果pl.close后会自动释放

watch操作

简单watch

WatchIterator watchIterator = kvc.watch(bs("/watchtest")).asPrefix().start();

watch指定observer

//创建一个observer

final StreamObserver observer = new StreamObserver() {

@Override

public void onNext(WatchUpdate value) {

System.out.println(t(start) + "watch event: " + value);

watchEvents.add(value);

}

@Override

public void onError(Throwable t) {

System.out.println(t(start) + "watch error: " + t);

watchEvents.add(t);

}

@Override

public void onCompleted() {

System.out.println(t(start) + "watch completed");

watchEvents.add(COMPLETED);

}

};

//启动watcher,并且绑定observer

Watch watch = kvc.watch(bs("/watchtest")).asPrefix().start(observer);

获取watch的事件

WatchUpdate wu = getNextSkipEmpty(watchEvents);

assertNotNull(wu);

assertEquals("event: " + wu, 1, wu.getEvents().size()); //事件个数

assertEquals(EventType.PUT, wu.getEvents().get(0).getType()); //事件类型

assertEquals(bs("a value"), wu.getEvents().get(0).getKv().getValue()); //值

assertEquals(bs("/watchtest/a"), wu.getEvents().get(0).getKv().getKey()); //键

election

简单的case参见示例代码LeaderElectionTest

源码浅析

EtcdClient

KvStoreClient.png

重点方法是line397行,EtcdClient的构造方法,初始化了internalExecutor,channel,grpc,kvClient等

EtcdKvClient

etcd kv操作和watch操作的核心类,实现了KvClient接口,其他的leaseClient和lockClient类似

EtcdKvClient.png

重点关注AbstractFluentRequest,底层的client通过grpc进行远程rpc调用。

所有的操作在底层抽象为sync和async的操作,sync操作其实也是async操作,只是在client层做了get操作。

@Override

public final ListenableFuture async() {

return async(null);

}

@Override

public final RespT sync() {

return client.waitForCall(this::async);

}

GrpcClient

grpc调用的封装类, 底层调用方法

ClientCalls.futureUnaryCall(channel.newCall(method, callOptions), request)

ETCD for java_etcd-java使用相关推荐

  1. java etcd api_在java中如何使用etcd的v2 和v3 api获取配置,并且对配置的变化进行监控和监听...

    etcd 和zookeeper 很像,都可以用来做配置管理.并且etcd可以在目前流行的Kubernetes中使用. 但是etcd 提供了v2版本合v3的版本的两种api.我们现在分别来介绍一下这两个 ...

  2. 用java实现etcd分布式锁_etcd分布式锁及事务

    前言 分布式锁是控制分布式系统之间同步访问共享资源的一种方式.在分布式系统中,常常需要协调他们的动作.如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互 ...

  3. Java开发高级工程师面试,etcd:一款比Redis更骚的分布式锁的实现方式

    Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个范围(前缀机制). 当被 Watch 的 key 或范围发生变化,客户端将收到通知:在实现分布式锁时,如果抢锁失败, 可通 ...

  4. 【Java】用etcd做服务注册和发现

    最近尝试了一下etcd来做服务的注册发现 [etcd服务] 从etcd官网下载二进制文件即可,分配了三台机器做集群 10.0.1.98    etcd-001 10.0.1.205 etcd-002 ...

  5. etcd学习和实战:4、Java使用etcd实现服务发现和管理

    etcd学习和实战:4.Java使用etcd实现服务发现和管理 文章目录 etcd学习和实战:4.Java使用etcd实现服务发现和管理 1. 前言 2. 代码 2.1 服务注册 2.2 服务发现 2 ...

  6. java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析

    介绍 下面介绍 jupiter-0.2.7 版本中 grpc 通过 etcd 实现服务发现与注册. 服务发现与注册的实现解析 服务注册 服务注册的流程图: etcd的服务注册代码模块在 jupiter ...

  7. 用java实现etcd分布式锁_etcdsync 一个golang的ectd分布式锁实现

    etcdsync 介绍 etcdsync is a distributed lock library in Go using etcd. It easy to use like sync.Mutex. ...

  8. etcd 笔记(08)— 基于 etcd 实现分布式锁

    1. 为什么需要分布式锁? 在分布式环境下,数据一致性问题一直是个难点.分布式与单机环境最大的不同在于它不是多线程而是多进程.由于多线程可以共享堆内存,因此可以简单地采取内存作为标记存储位置.而多进程 ...

  9. 服务发现对比:Zookeeper vs. etcd vs. Consul

    欢迎关注方志朋的博客,回复"666"获面试宝典 服务发现工具 手动配置 ZooKeeper ETCD Consul 结论 我们拥有的服务越多,如果我们使用预定义的端口,就会发生冲突 ...

  10. 5年以上的Java程序员,千万别忽略这一点

    做了 3~5 年编程开发,你已经积累了不少项目经验,扩宽了技术广度,也许已发力成为团队管理者.到了这个阶段,大家却常有这种感受:感觉自己卡在瓶颈进步缓慢,技术水平很难像早期一样实现大幅突破? 其实大家 ...

最新文章

  1. spring 注解@PathVariable
  2. mybatis一对多关联查询_一对一,一对多,多对多查询及延迟加载(N+1问题)分析
  3. 生产者消费者模型(条件变量)
  4. Leetcode PHP题解--D84 371. Sum of Two Integers
  5. matlab2c使用c++实现matlab函数系列教程-cosh函数
  6. 【JavaScript】JavaScript闭包
  7. [渝粤教育] 西南科技大学 企业生产运作管理 在线考试复习资料2021版
  8. xmpp java_XMPP: Android基于Xmpp的即时通讯
  9. php本地解密,PHPDecode 在线解密工具
  10. python爬取双色球2003-2022年所有数据
  11. 尝试导出Cities Skyline完整城市模型遇到的问题与解决
  12. ot permission denied while trying to connect to the Docker daemon socket at
  13. 西门子官网下载Eplan部件库
  14. idea常用护眼色的RGB值调整
  15. R语言开发之输出散点图
  16. 中国人都应记住的“中国”一百句名言
  17. 05.Laravel post提交失败 返419 | Page Expired
  18. Jenkins 构建CI/CD(一看就会)
  19. win11 ms-settings;display(该文件没有与之关联的应用来执行该操
  20. SCOM2012升级授权License

热门文章

  1. 做消息推送 8 年的极光,为何做物联网 JIoT 平台?
  2. 机器智能芯片 10 大新秀!华为抢占一席,Google 占比最多!
  3. Python 连续三年夺冠、PHP 受排挤,揭晓 IEEE Spectrum 2019 年度编程语言排行榜
  4. 终于!这本Python书彻底玩大了!
  5. 大话云上“分布式实践”,理解 B、A、C 并不难!
  6. 拥抱开源未来 百度Doris进入顶级开源社区Apache
  7. 即将发布的 JDK 11 包含了什么?
  8. 帝国php数据库备份,解决帝国cms帝国数据库备份王php5.3下500错误
  9. day08【后台】权限控制-上
  10. java glassfish_java - Tomcat,JBoss和Glassfish有什么区别?