redis分布式锁学习总结
目录
一、简介
二、基于redis的分布式锁
三、示例
四、Redisson源码追踪
五、总结
一、简介
- 概念:分布式锁,指的是在分布式部署环境下,通过加锁来控制共享资源在多个客户端之间的互斥访问,即同一时刻只能有一个客户端对共享资源进行操作,保证数据一致性。
- 特点:
- 互斥性:同一时刻多个客户端对共享资源的访问存在互斥性;
- 防死锁:对锁设置超时时间,防止客户端一直占用着锁,即防止死锁;
- 可重入性:一个客户端上的同一个线程如果获取锁之后还可以再次获取这个锁(客户端封装,可以使用threadlocal存储持有锁的信息);
- 持锁人解锁: 客户端自己加的锁自己解除,不能将别人加的锁给删掉了;
- 为什么需要分布式锁?
- 现在很少系统是单体架构了,基本上都是部署在多台服务器上,也就是分布式部署,那么在分布式环境中,必须要解决的一个问题就是:数据一致性的问题。在某个资源在多个系统之间共享的时候,如何保证只有一个客户端对其进行处理,不能并发地执行,否则一个客户端写一个客户端读,就会造成数据一致性的问题。在分布式系统中,传统线程之间的锁机制,如synchronized等就没作用了,系统会有多份并且部署在不同的服务器中,这些资源已经不是在线程之间共享了,而是属于进程(服务器)之间共享的资源。为了解决这个互斥访问的问题,我们就需要引入分布式锁。
二、基于redis的分布式锁
因为redis是单线程模型,保证了指令的顺序执行,redis分布式锁是基于setnx命令来实现的。
- setnx lockKey wsh
#将key设置值为value,如果key不存在,这种情况下等同SET命令。 当key存在时,什么也不做。SETNX是”SET if Not eXists”的简写
#返回1表示key设置成功
#返回0表示key没有设置成功
setnx key value
在获取锁之后,需要设置一下锁的超时时间,防止出现宕机的时候,还没来得及释放锁导致出现死锁现象:
- expire lockKey 30
接着执行业务逻辑,在执行完业务逻辑之后,释放锁,也就是删掉lockKey:
- del lockKey
上面存在一个问题:如果获取锁之后,在设置锁超时时间之前,服务器宕机了,完了,锁永远没法释放了,这怎么办。
出现上面的问题是因为,获取锁和设置超时时间不是原子操作,redis提供了setex命令,能在设置值的时候同时设置超时时间。
- setex key seconds value
如:这样就保证了设置值和超时时间是原子操作:
- setex lockKey 30 wsh
三、示例
下面通过一个简单的商品超卖现象说明redis分布式锁的实现方式以及优化方案。
【a】原始代码
@RequestMapping("/test01")
public String test() {//假设有两个线程同一时刻执行完这一句代码,查询出来的库存都为30String productStock = stringRedisTemplate.opsForValue().get("product_stock");if (StringUtils.isBlank(productStock)) {return "productStock is empty";}int stock = Integer.parseInt(productStock);if (stock > 0) {int currentStock = stock - 1;//然后两个线程都用各自拿到的剩余库存30件减掉1件,也就是两个线程减完都还剩29件,然后将29件更新回redis中//显然这就存在资源共享(超卖的情况)问题了,两个线程正常减库存,应该剩余28件才对。stringRedisTemplate.opsForValue().set("product_stock", Integer.toString(currentStock));System.out.println("减库存成功,当前库存剩余" + currentStock);} else {System.out.println("库存不足...");}return "success";
}
原始代码存在的问题:多个线程同时减库存,但是库存只减掉一件,线程不安全;
【b】单体架构中的锁
如果实在单体架构中,要确保多个线程中只有一个线程访问,可以引入synchronized锁,具体代码如下:注意效率可能会大大降低。
@RequestMapping("/test02")
public String test02() {//锁住减库存这一块代码,使得多个线程进来只能有一个线程进去,其他线程必须在外面等待。//必须注意的是: 这种解决方案只能使用在单体架构中,但是现在公司基本不会使用单体架构,不可能只有一个实例,//一般都会集群分布式部署多个实例,如果同步块使用到分布式部署环境下,那还是一样会存在之前的问题,原因是://synchronized是JVM进程内部的锁,集群部署肯定是有多个JVM进程。synchronized (this) {String productStock = stringRedisTemplate.opsForValue().get("product_stock");if (StringUtils.isBlank(productStock)) {return "productStock is empty";}int stock = Integer.parseInt(productStock);if (stock > 0) {int currentStock = stock - 1;stringRedisTemplate.opsForValue().set("product_stock", Integer.toString(currentStock));System.out.println("减库存成功,当前库存剩余" + currentStock);} else {System.out.println("库存不足...");}}return "success";
}
如果上面的代码放在分布式环境下,又存在其他问题:synchronized在分布式环境下没效果;
【c】分布式锁 - 版本1
对上面的代码进行改进,引入redis分布式锁:
@RequestMapping("/test03")public String test03() {//对每个商品对应上一把锁String lockKey = "product_001";try {//setnx key valueBoolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wsh");//如果redis中已经存在lockKey中,直接返回,只有一个线程能够执行下面的减库存操作。if (!result) {return "当前人数过多,请稍后重试!";}String productStock = stringRedisTemplate.opsForValue().get("product_stock");if (StringUtils.isBlank(productStock)) {return "productStock is empty";}int stock = Integer.parseInt(productStock);if (stock > 0) {int currentStock = stock - 1;stringRedisTemplate.opsForValue().set("product_stock", Integer.toString(currentStock));System.out.println("减库存成功,当前库存剩余" + currentStock);} else {System.out.println("库存不足...");}} finally {//解锁、释放锁stringRedisTemplate.delete(lockKey);}return "success";}
正常情况下,上面的代码能够实现一个比较简单的分布式锁,但是存在的问题还是很多的:
- 锁并没有设置超时时间,假如在获取锁之后,处理业务逻辑之前,服务器宕机,那么不会执行finally里面释放锁的逻辑,会出现死锁现象;
【d】分布式锁 - 版本2
根据上面的死锁问题,我们继续改进:增加锁的超时时间设置
@RequestMapping("/test03")public String test03() {//对每个商品对应上一把锁String lockKey = "product_001";try {//下面两条语句之间如果发生宕机,会导致超时时间没有设置,也会存在问题Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wsh");stringRedisTemplate.expire(lockKey,30, TimeUnit.SECONDS);//如果redis中已经存在lockKey中,直接返回,只有一个线程能够执行下面的减库存操作。if (!result) {return "当前人数过多,请稍后重试!";}String productStock = stringRedisTemplate.opsForValue().get("product_stock");if (StringUtils.isBlank(productStock)) {return "productStock is empty";}int stock = Integer.parseInt(productStock);if (stock > 0) {int currentStock = stock - 1;stringRedisTemplate.opsForValue().set("product_stock", Integer.toString(currentStock));System.out.println("减库存成功,当前库存剩余" + currentStock);} else {System.out.println("库存不足...");}} finally {//解锁、释放锁stringRedisTemplate.delete(lockKey);}return "success";}
这里设置了锁的超时时间为30秒,但是仔细想一下,如果在获取锁之后,在设置超时时间之前,服务器发生宕机,完蛋,超时时间又没设置,又有问题了。
出现这种问题是因为获取锁代码和设置锁超时时间的代码不是原子操作,redis提供了一个设置值的同时设置超时时间的命令:setex对应到stringRedisTemplate中就是setIfAbsent()方法。
【d】分布式锁 - 版本3
继续改进上面存在的问题,如下:
@RequestMapping("/test03")public String test03() {//对每个商品对应上一把锁String lockKey = "product_001";try {//同时设置值以及超时时间,这样两者之间就是原子操作Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wsh", 30, TimeUnit.SECONDS);//如果redis中已经存在lockKey中,直接返回,只有一个线程能够执行下面的减库存操作。if (!result) {return "当前人数过多,请稍后重试!";}String productStock = stringRedisTemplate.opsForValue().get("product_stock");if (StringUtils.isBlank(productStock)) {return "productStock is empty";}int stock = Integer.parseInt(productStock);if (stock > 0) {int currentStock = stock - 1;stringRedisTemplate.opsForValue().set("product_stock", Integer.toString(currentStock));System.out.println("减库存成功,当前库存剩余" + currentStock);} else {System.out.println("库存不足...");}} finally {//解锁、释放锁stringRedisTemplate.delete(lockKey);}return "success";}
再仔细分析上面的代码,假如有两个线程同时抢购同一个商品,线程1获取了锁,线程2没有获取到锁,假设线程1执行业务逻辑的时间很长,超过了锁的超时时间30秒,这样在线程1处理业务逻辑中间的时候,锁到期了,这时候线程2获取到锁,继续执行线程2的业务逻辑,假设这个时候线程1执行完了它的业务,执行释放锁的逻辑,删掉了lockKey,仔细想想,这个锁现在被线程2占用着,你线程1居然把我的锁给删掉了,导致线程2也没法抢购商品。这就是锁失效的问题。
【e】分布式锁 - 版本4
针对上面的问题,我们可以这样处理:设置当前客户端对应锁的唯一标识,如当前线程ID、UUID等都可以。
@RequestMapping("/test04")
public String test04() {//对每个商品对应上一把锁String lockKey = "product_001";String clientId = UUID.randomUUID().toString();try {//同时设置值以及超时时间,这样两者之间就是原子操作Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);//如果redis中已经存在lockKey中,直接返回,只有一个线程能够执行下面的减库存操作。if (!result) {return "当前人数过多,请稍后重试!";}String productStock = stringRedisTemplate.opsForValue().get("product_stock");if (StringUtils.isBlank(productStock)) {return "productStock is empty";}int stock = Integer.parseInt(productStock);if (stock > 0) {int currentStock = stock - 1;stringRedisTemplate.opsForValue().set("product_stock", Integer.toString(currentStock));System.out.println("减库存成功,当前库存剩余" + currentStock);} else {System.out.println("库存不足...");}} finally {//解锁、释放锁//只释放自己加的锁if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {stringRedisTemplate.delete(lockKey);}}return "success";
}
注意,finally里面的代码,只解锁自己加的锁,不能将别的线程的锁误删除。
通过几个步骤的改进,我们的分布式锁已经比较好一点了,存在的问题也相对少了一点,但是还是存在问题的:
- 如何保证业务时间小于锁的超时时间?
这里有一个比较通用的解决方案:
获取锁之后开启一个后台线程,使用定时器或者while(true)循环,判断当前锁是否为当前线程所有,如果是则刷新redis中lockKey的过期时间,重新设置为30秒,这样就能保证业务时间一定是小于锁的超时时间的。
注意,在解锁的时候,需要关闭定时器。
其实redis还提供了一个更强大的客户端redisson用于实现分布式锁,我们在开发中也可以使用这个锁,它自动帮我们处理了很多事情,我们只需要简单地加锁解锁就能实现一个分布式锁了。下面是redisson实现分布式锁的代码:
@RequestMapping("/test05")
public String test05() {String lockKey = "product_001";RLock lock = redisson.getLock(lockKey);try {lock.lock(30, TimeUnit.SECONDS);String productStock = stringRedisTemplate.opsForValue().get("product_stock");if (StringUtils.isBlank(productStock)) {return "productStock is empty";}int stock = Integer.parseInt(productStock);if (stock > 0) {int currentStock = stock - 1;stringRedisTemplate.opsForValue().set("product_stock", Integer.toString(currentStock));System.out.println("减库存成功,当前库存剩余" + currentStock);} else {System.out.println("库存不足...");}} finally {lock.unlock();}return "success";
}
优化到这里,可能对于一些并发不大的已经够用了,当然还是存在一些问题的:
- 主从复制架构中,master宕机,在同步锁到slave从节点的时候master发生宕机,导致锁失效的问题(如果不能容忍偶尔一次的锁失效问题,可以使用RedLock进行优化或者使用Zookeeper实现分布式锁,这个有时间的话后面再去研究研究);
四、Redisson源码追踪
【a】获取锁:RLock lock = redisson.getLock(lockKey);
我们追踪一下getLock的源码:
public RLock getLock(String name) {return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}//构造方法
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;//UUID字符串this.id = commandExecutor.getConnectionManager().getId(); (实际上是调用UUID getId();)//锁过期时间this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
【b】加锁:lock.lock(30, TimeUnit.SECONDS);
继续看一下其中关键部分的源码:
//RLock.class
void lock(long var1, TimeUnit var3);//RedissonLock.class
public void lock(long leaseTime, TimeUnit unit) {try {this.lockInterruptibly(leaseTime, unit);} catch (InterruptedException var5) {Thread.currentThread().interrupt();}
}public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {//当前线程IDlong threadId = Thread.currentThread().getId();//尝试获取锁 Long ttl = this.tryAcquire(leaseTime, unit, threadId);//ttl不为空,表示获取锁失败if (ttl != null) {//PUBSUB.subscribe(this.getEntryName(), this.getChannelName(), this.commandExecutor.getConnectionManager().getSubscribeService());//订阅该锁的频道,等待锁释放的消息RFuture<RedissonLockEntry> future = this.subscribe(threadId);this.commandExecutor.syncSubscription(future);try {//开启死循环监听while(true) {//尝试获取锁对象ttl = this.tryAcquire(leaseTime, unit, threadId);//ttl为空,获取锁成功,直接返回if (ttl == null) {return;}//ttl大于0, 则等待ttl时间后继续尝试获取锁if (ttl >= 0L) {this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {this.getEntry(threadId).getLatch().acquire();}}} finally {//取消订阅this.unsubscribe(future, threadId);}}
}
【c】获取锁
获取锁的逻辑主要在tryAcquire()方法中,下面是tryAcquireAsync()方法的源码:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {//如果是带有过期时间的锁,则按照普通方式获取锁if (leaseTime != -1L) {return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//this.lockWatchdogTimeout = 30000L;//先按照30秒的过期时间来执行获取锁的方法RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);//定时刷新锁的过期时间,自动延时,确保业务逻辑时间小于锁的过期时间ttlRemainingFuture.addListener(new FutureListener<Long>() {public void operationComplete(Future<Long> future) throws Exception {if (future.isSuccess()) {Long ttlRemaining = (Long)future.getNow();if (ttlRemaining == null) {RedissonLock.this.scheduleExpirationRenewal(threadId);}}}});return ttlRemainingFuture;}
}
接着往下看,tryLockInnerAsync方法是真正执行获取锁的逻辑:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {//过期时间this.internalLockLeaseTime = unit.toMillis(leaseTime);//return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
主要理解下面三个判断:
- 通过exists判断,如果锁不存在,则设置值和过期时间,加锁成功;
- 通过hexists判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功;
- 如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间,加锁失败;
【d】解锁:lock.unlock();
下面我们追踪一下解锁的关键代码:
//Lock.class
void unlock();//RedissonLock.class
public void unlock() {Boolean opStatus = (Boolean)this.get(this.unlockInnerAsync(Thread.currentThread().getId()));//如果返回空,说明解锁的线程和当前锁不是同一个线程,抛出异常//这里防止误删除别人持有的锁对象if (opStatus == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + Thread.currentThread().getId());} else {if (opStatus) {//解锁成功,取消定时刷新锁过期时间this.cancelExpirationRenewal();}}
}
然后我们看一下unlockInnerAsync()方法:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}
跟获取锁一样,主要理解三个判断:
- 如果锁已经不存在,通过publish发布锁释放的消息,解锁成功;
- 如果解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常;
- 通过hincrby递减1,先释放一次锁。若剩余次数还大于0,则证明当前锁是重入锁,刷新过期时间;若剩余次数小于0,删除key并发布锁释放的消息,解锁成功;
五、总结
以上就是关于redis分布式锁的一些学习总结,不能说理解得很透彻,有时间还得继续深入学习一下RedLock和ZK实现分布式锁的方式,上文如果有不对之处,还请小伙伴们指正,相互学习,一起进步嘛。
参考资料:
- https://www.cnblogs.com/fixzd/p/9479970.html
- https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8
- https://www.jianshu.com/p/47fd7f86c848
redis分布式锁学习总结相关推荐
- 快来学习Redis 分布式锁的背后原理
以前在学校做小项目的时候,用到Redis,基本也只是用来当作缓存.可阿粉在工作中发现,Redis在生产中并不只是当作缓存这么简单.在阿粉接触到的项目中,Redis起到了一个分布式锁的作用,具体情况是这 ...
- 还不知道 Redis 分布式锁的背后原理?还不赶快学习一下
前言 以前在学校做小项目的时候,用到Redis,基本也只是用来当作缓存.可阿粉在工作中发现,Redis在生产中并不只是当作缓存这么简单.在阿粉接触到的项目中,Redis起到了一个分布式锁的作用,具体情 ...
- 关于分布式锁原理的一些学习与思考:redis分布式锁,zookeeper分布式锁
点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:牛人 20000 字的 Spring Cloud 总结,太硬核了~ 作者:队长给我球. 出处:https://w ...
- zookeeper 分布式锁_关于redis分布式锁,zookeeper分布式锁原理的一些学习与思考
编辑:业余草来源:https://www.xttblog.com/?p=4946 首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法 ...
- redis cluster 分布式锁_关于分布式锁原理的一些学习与思考redis分布式锁,zookeeper分布式锁...
首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法.变量. 在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在j ...
- 【使用Redis分布式锁实现优惠券秒杀功能】-Redis学习笔记05
前言 本章节主要实现限时.限量优惠券秒杀功能,并利用分布式锁解决<超卖问题>.<一人一单问题>. 一.优惠券下单基本功能实现 1.功能介绍及流程图 2.代码实现 @Resour ...
- 十步学习 Redis 分布式锁
文章目录 1. 单机版没有加锁 2. 单机版加锁 3. 引入 Redis 分布式锁 4. 加锁 解锁,lock/unlock 必须同时出现并保证调用 5. 加入锁过期时间 6. 加锁且携带锁过期时间 ...
- 关于redis分布式锁,zookeeper分布式锁原理的一些学习与思考
编辑:业余草 来源:https://www.xttblog.com/?p=4946 首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方 ...
- redis分布式锁 在集群模式下如何实现_收藏慢慢看系列:简洁实用的Redis分布式锁用法...
在微服务中很多情况下需要使用到分布式锁功能,而目前比较常见的方案是通过Redis来实现分布式锁,网上关于分布式锁的实现方式有很多,早期主要是基于Redisson等客户端,但在Spring Boot2. ...
- Redis分布式锁使用不当,酿成一个重大事故,超卖了100瓶飞天茅台!!!
点击关注公众号,Java干货及时送达 来源:juejin.cn/post/6854573212831842311 基于Redis使用分布式锁在当今已经不是什么新鲜事了. 本篇文章主要是基于我们实际项目 ...
最新文章
- ASP.NET MVC4中@model使用多个类型实例的方法
- 一种另类的解决URL中文乱码问题--对中文进行加密、解密处理
- 信息检索Information Retrieval评价指标
- 存储过程提示data truncation_手机DATA重新分区教程(超详细)
- DIY Virtual Wall for Roomba – Part One
- 中国风喜庆传统新年元旦海报PSD分层模板
- IOS开发-我的第一个IOS程序
- Zend Framework 入门(1)—快速上手
- LCD驱动芯片/LCD段式液晶显示驱动芯片-VK0192M/VK0256/B/C技术资料简介
- arduiono电子音乐代码_使用Word2003的EQ域代码制作音乐简谱
- SPSS独立样本t检验结果分析
- 芝麻信用获世界级安全认证
- [VOA美国人物] Jackie Robinson: The First Black Player in Modern Major League
- linux中rcf命名管道,RCF的简单使用教程以及什么是回调函数
- SDK第一课(Windows SDK编程入门)
- c语言编程文章排版,一种简单英文词典排版系统的实现 C语言编程
- 大局已定,应届生三面京东成功拿下20K的Offer。
- 文件服务器fuse,FUSE 扩展
- 【记录】kali制作绕过火绒检测的木马(仅通过MSF的方式)
- linux启动redis进程,Linux安装Redis实现过程及报错解决方案
热门文章
- 锚框 anchor box bounding box 动手学深度学习v2 pytorch
- TensorFlow by Google CNN卷积神经网络 Machine Learning Foundations: Ep #3 - Convolutions and pooling
- Jupyter Notebook从入门到精通
- git pull/git fetch更新分支
- 利用反射机制,多个请求对应一个Servlet!附源代码
- html页面显示代码插件,客户端显示web网页支持html5的第三方内核插件
- python的字典合并有相同的_将列表中的重复项合并到python字典中
- tcpip路由技术卷一_计算机网络题库考(2020.9.10晚18.320.30 北京卷)
- 编写爬虫遇到的问题总结
- java使用io上传文件_文件传输基础——Java IO流