ETCD for java_etcd-java使用
etcd-java使用
简介
项目地址
特性
Accepts/exposes protobuf-generated types directly to minimize overhead(请求头)
Simpler fluent versions of KV primitives for making async or sync requests(fluent api的方式方便制定sync还是async)
Automatic retries and (re)authentication
Resilient(可快速恢复的) and scalable Lease and Watch abstractions with semantics aligned to most typical usage
Underlying stream RPCs only kept open for as long as they are needed
Comes with some helpful utility classes with local cache, persistent key and leader election abstractions
Simple declarative client configuration (optional) - encapsulate per-cluster connection details in a
JSON document
Support for connecting to multi-endpoint IBM Compose etcd deployments over TLS
Currently doesn't cover all parts of the etcd-server API, in particular those related to cluster administration(集群管理) such as maintenance, cluster and auth management
kv操作
获取client
KvStoreClient client = EtcdClient.forEndpoint("localhost", 2379).withPlainText().build();
//KvStoreClient是一个接口,定义了如下三个client方法
KvClient kvClient = client.getKvClient();
LeaseClient leaseClient = client.getLeaseClient();
LockClient lockClient = client.getLockClient();
EtcdClient实现了Closeable接口的close方法,所以可以使用try-resource的语法,自动释放资源。示例代码见ClientBuilderTest
单个endpoint
try (KvStoreClient client = EtcdClient.forEndpoint("localhost", 2379).withPlainText().build()) {}
多个endpoint
try (KvStoreClient client = EtcdClient.forEndpoints(
"localhost:2379,http://localhost:2379,https://localhost:2379,dns:///localhost:2379")
.withPlainText().build()) {}
put操作
basic put
kvc.put(a, v2).sync();
batch put
kvc.batch().put(pr1).put(pr2).sync();
put with lease
//先创建lease
LeaseGrantResponse lgr = leaseClient.grant(2).sync();
//put的时候指定lease id
//如果lease不存在会抛出异常
kvc.put(bs("hello"), bs("world"), lgr.getID()).sync();
put后获取信息
获取上一次的值
如果key没有对应的值,返回的是KeyValue.getDefaultInstance()
如果之前key有对应的指,返回之前key对应的值
kvc.put(a, v2).prevKv().sync().getPrevKv();
获取header信息
kvc.put(a, v2).sync().getHeader();
get操作
basic get
同步get
RangeResponse rr = kvc.get(bs("a")).sync();
异步get
ListenableFuture rrFut1 = kvc.get(bs("new")).async();
rrFut1.get(1000, TimeUnit.SECONDS); //指定超时时间
batch get
TxnResponse txnResponse = kvc.batch().get(kvc.get(a).asRequest()).get(kvc.get(b).asRequest()).sync();
assertEquals(v1, txnResponse.getResponses(0).getResponseRange().getKvs(0).getValue());
assertEquals(v2, txnResponse.getResponses(1).getResponseRange().getKvs(0).getValue());
指定重试策略
ListenableFuture rrFut2 = kvc.get(bs("new"))
.deadline(Deadline.after(20, TimeUnit.SECONDS))
.backoffRetry().async(); // should work
delete操作
basic delete
获取删除的个数
kvc.delete(bs("notthere")).sync().getDeleted()
获取删除的key对应的value
kvc.delete(a).prevKv().sync().getPrevKvs(0).getValue()
batch delete
kvc.batch().delete(kvc.delete(a).asRequest())
.delete(kvc.delete(b).asRequest()).sync();
transaction
ListenableFuture tresp = kvc.txnIf().cmpEqual(a).value(v1)
.and().cmpNotEqual(b).version(10)
.then().put(kvc.put(bs("new"), bs("newval")).asRequest()).async();
lease操作
普通lease
grant
LeaseGrantResponse lgr = leaseClient.grant(5L).sync(); //不指定leaseId
lgr.getID(); //获取id
lgr.getTTL(); //存活时间
leaseClient.grant(10L).leaseId(456L).sync(); //指定leaseId
keep-alive
leaseClient.keepAliveOnce(id).get(1, SECONDS)
revoke
leaseClient.revoke(id).get(1, SECONDS);
assertEquals(-1L, leaseClient.ttl(id).get().getTTL());//撤销后ttl变成-1
persistent lease
basic
PersistentLease pl2 = lc.maintain().minTtl(minTtl).keepAliveFreq(kaFreq).start();
long newLeaseId = pl2.get(20, SECONDS);
with StreamObserver
PersistentLease pl = lc.maintain().minTtl(minTtl).keepAliveFreq(kaFreq)
.start(new StreamObserver() {
@Override
public void onNext(LeaseState value) {
System.out.println(t(start) + "PL state change: " + value);
observerEvents.add(value);
}
@Override
public void onError(Throwable t) {
System.out.println(t(start) + "PL error: " + t);
observerEvents.add(t);
}
@Override
public void onCompleted() {
System.out.println(t(start) + "PL completed");
observerEvents.add(COMPLETED);
}
});
list client lease
需要注意的是persistent lease是基于client的会话的,如果client挂了的话,lease是会自动释放的
Set lidsFound = client2.getLeaseClient().list().get()
.getLeasesList().stream().map(LeaseStatus::getID).collect(Collectors.toSet());
lock操作
同一个客户端lock不同的lease会报错,详细见LockTest.testWithContention
普通lock
LockResponse lr = lockClient.lock(KeyUtils.bs("mylock"))
.deadline(Deadline.after(500, TimeUnit.MILLISECONDS)).sync(); //获取锁
assertTrue(kvClient.txnIf().exists(lr.getKey()).sync().getSucceeded());
assertNotNull(lockClient.unlock(lockKey).sync()); //释放锁
assertFalse(kvClient.txnIf().exists(lockKey).sync().getSucceeded());
lock使用lease
LockResponse lr = lockClient.lock(KeyUtils.bs("mylock2")).withLease(lgr.getID()).sync();
lock使用persistent lease
PersistentLease pl = lec.maintain().start();
LockResponse lr = lockClient.lock(KeyUtils.bs("mylock3")).withLease(pl).sync();
//如果pl.close后会自动释放
watch操作
简单watch
WatchIterator watchIterator = kvc.watch(bs("/watchtest")).asPrefix().start();
watch指定observer
//创建一个observer
final StreamObserver observer = new StreamObserver() {
@Override
public void onNext(WatchUpdate value) {
System.out.println(t(start) + "watch event: " + value);
watchEvents.add(value);
}
@Override
public void onError(Throwable t) {
System.out.println(t(start) + "watch error: " + t);
watchEvents.add(t);
}
@Override
public void onCompleted() {
System.out.println(t(start) + "watch completed");
watchEvents.add(COMPLETED);
}
};
//启动watcher,并且绑定observer
Watch watch = kvc.watch(bs("/watchtest")).asPrefix().start(observer);
获取watch的事件
WatchUpdate wu = getNextSkipEmpty(watchEvents);
assertNotNull(wu);
assertEquals("event: " + wu, 1, wu.getEvents().size()); //事件个数
assertEquals(EventType.PUT, wu.getEvents().get(0).getType()); //事件类型
assertEquals(bs("a value"), wu.getEvents().get(0).getKv().getValue()); //值
assertEquals(bs("/watchtest/a"), wu.getEvents().get(0).getKv().getKey()); //键
election
简单的case参见示例代码LeaderElectionTest
源码浅析
EtcdClient
KvStoreClient.png
重点方法是line397行,EtcdClient的构造方法,初始化了internalExecutor,channel,grpc,kvClient等
EtcdKvClient
etcd kv操作和watch操作的核心类,实现了KvClient接口,其他的leaseClient和lockClient类似
EtcdKvClient.png
重点关注AbstractFluentRequest,底层的client通过grpc进行远程rpc调用。
所有的操作在底层抽象为sync和async的操作,sync操作其实也是async操作,只是在client层做了get操作。
@Override
public final ListenableFuture async() {
return async(null);
}
@Override
public final RespT sync() {
return client.waitForCall(this::async);
}
GrpcClient
grpc调用的封装类, 底层调用方法
ClientCalls.futureUnaryCall(channel.newCall(method, callOptions), request)
ETCD for java_etcd-java使用相关推荐
- java etcd api_在java中如何使用etcd的v2 和v3 api获取配置,并且对配置的变化进行监控和监听...
etcd 和zookeeper 很像,都可以用来做配置管理.并且etcd可以在目前流行的Kubernetes中使用. 但是etcd 提供了v2版本合v3的版本的两种api.我们现在分别来介绍一下这两个 ...
- 用java实现etcd分布式锁_etcd分布式锁及事务
前言 分布式锁是控制分布式系统之间同步访问共享资源的一种方式.在分布式系统中,常常需要协调他们的动作.如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互 ...
- Java开发高级工程师面试,etcd:一款比Redis更骚的分布式锁的实现方式
Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个范围(前缀机制). 当被 Watch 的 key 或范围发生变化,客户端将收到通知:在实现分布式锁时,如果抢锁失败, 可通 ...
- 【Java】用etcd做服务注册和发现
最近尝试了一下etcd来做服务的注册发现 [etcd服务] 从etcd官网下载二进制文件即可,分配了三台机器做集群 10.0.1.98 etcd-001 10.0.1.205 etcd-002 ...
- etcd学习和实战:4、Java使用etcd实现服务发现和管理
etcd学习和实战:4.Java使用etcd实现服务发现和管理 文章目录 etcd学习和实战:4.Java使用etcd实现服务发现和管理 1. 前言 2. 代码 2.1 服务注册 2.2 服务发现 2 ...
- java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析
介绍 下面介绍 jupiter-0.2.7 版本中 grpc 通过 etcd 实现服务发现与注册. 服务发现与注册的实现解析 服务注册 服务注册的流程图: etcd的服务注册代码模块在 jupiter ...
- 用java实现etcd分布式锁_etcdsync 一个golang的ectd分布式锁实现
etcdsync 介绍 etcdsync is a distributed lock library in Go using etcd. It easy to use like sync.Mutex. ...
- etcd 笔记(08)— 基于 etcd 实现分布式锁
1. 为什么需要分布式锁? 在分布式环境下,数据一致性问题一直是个难点.分布式与单机环境最大的不同在于它不是多线程而是多进程.由于多线程可以共享堆内存,因此可以简单地采取内存作为标记存储位置.而多进程 ...
- 服务发现对比:Zookeeper vs. etcd vs. Consul
欢迎关注方志朋的博客,回复"666"获面试宝典 服务发现工具 手动配置 ZooKeeper ETCD Consul 结论 我们拥有的服务越多,如果我们使用预定义的端口,就会发生冲突 ...
- 5年以上的Java程序员,千万别忽略这一点
做了 3~5 年编程开发,你已经积累了不少项目经验,扩宽了技术广度,也许已发力成为团队管理者.到了这个阶段,大家却常有这种感受:感觉自己卡在瓶颈进步缓慢,技术水平很难像早期一样实现大幅突破? 其实大家 ...
最新文章
- spring 注解@PathVariable
- mybatis一对多关联查询_一对一,一对多,多对多查询及延迟加载(N+1问题)分析
- 生产者消费者模型(条件变量)
- Leetcode PHP题解--D84 371. Sum of Two Integers
- matlab2c使用c++实现matlab函数系列教程-cosh函数
- 【JavaScript】JavaScript闭包
- [渝粤教育] 西南科技大学 企业生产运作管理 在线考试复习资料2021版
- xmpp java_XMPP: Android基于Xmpp的即时通讯
- php本地解密,PHPDecode 在线解密工具
- python爬取双色球2003-2022年所有数据
- 尝试导出Cities Skyline完整城市模型遇到的问题与解决
- ot permission denied while trying to connect to the Docker daemon socket at
- 西门子官网下载Eplan部件库
- idea常用护眼色的RGB值调整
- R语言开发之输出散点图
- 中国人都应记住的“中国”一百句名言
- 05.Laravel post提交失败 返419 | Page Expired
- Jenkins 构建CI/CD(一看就会)
- win11 ms-settings;display(该文件没有与之关联的应用来执行该操
- SCOM2012升级授权License
热门文章
- 做消息推送 8 年的极光,为何做物联网 JIoT 平台?
- 机器智能芯片 10 大新秀!华为抢占一席,Google 占比最多!
- Python 连续三年夺冠、PHP 受排挤,揭晓 IEEE Spectrum 2019 年度编程语言排行榜
- 终于!这本Python书彻底玩大了!
- 大话云上“分布式实践”,理解 B、A、C 并不难!
- 拥抱开源未来 百度Doris进入顶级开源社区Apache
- 即将发布的 JDK 11 包含了什么?
- 帝国php数据库备份,解决帝国cms帝国数据库备份王php5.3下500错误
- day08【后台】权限控制-上
- java glassfish_java - Tomcat,JBoss和Glassfish有什么区别?