[Java]分布式自平衡多文件云传输

  • 概述
  • 基本思想
  • 节点
  • Receiver接收方
  • 资源分配及节点选择策略类
  • 接收服务器端口池
  • 资源请求类
  • 短连接资源请求接口
  • 资源信息类
  • 资源节点关系表(资源管理中心需要维护的Map)
  • 字节发送接收类
  • 文件片段类
  • 断点续传的基础类
  • 资源提供者
  • 文件接收过程
  • 总结

概述

云计算(cloud computing),分布式计算技术的一种,其最基本的概念,是透过网络将庞大的计算处理程序自动分拆成无数个较小的子程序,再交由多部服务器所组成的庞大系统经搜寻、计算分析之后将处理结果回传给用户。透过这项技术,网络服务提供者可以在数秒之内,达成处理数以千万计甚至亿计的信息,达到和“超级计算机”同样强大效能的网络服务。

分布式自平衡多文件云传输:当资源消费者需要资源时,资源管理中心将筛选出多个资源提供者来对其进行发送,当该消费者得到该资源后也将成为该资源的提供者,当资源下载的越来越多,那么提供者也将越来越多,这样将大大减少资源根服务器的压力。

基本思想

  • 资源管理中心负责维护一张以资源唯一标识为键,以资源提供者列表为值的Map(ResourceNodeRelation资源节点关系类),资源管理中心提供资源的注册及注销;当资源消费者向资源管理中心申请资源时,资源管理中心将拥有该资源的节点全部发送给消费者;在所有过程中资源管理中心不断通过心跳检测来将异常的资源提供者节点进行剔除。
  • 资源提供者上线后先对本地的资源进行扫描,并将自己的资源向资源管理中心进行注册,当资源消费者通过RPC请求某资源时,资源提供者与消费者建立临时的长连接并向其发送所申请的资源。
  • 资源消费者上线后先从资源管理中心获取全部的资源信息列表,并扫描自己的资源与获取的列表进行对比生成所需要的资源基本信息(ResourceBaseInfo),然后再向资源管理中心申请资源提供者列表,然后根据自己消费者端的资源分配策略对所需要的资源进行分配,向选择的节点申请资源并建立接受服务器,一片段一片段的对资源进行接受,当所有资源接收完成后,那么该资源消费者也成为该资源的提供者,向资源管理中心注册自己的资源。
    所有的资源消费者都可以成为资源提供者,这样做将大大减少资源服务器的压力,也能够大大增加文件的传输效率

节点

package com.wh.mfct.node;/*** 节点接口* @author 闹闹小丫头**/
public interface INetNode {// 服务器int SERVER = 1;// 客户端int CLIENT = 0;// 一些基本方法String getIp();int getPort();int getRmiSendPort();int getReceivePort();int getType();int getSendTime();void increaseSendTime();
}
package com.wh.mfct.node;/*** 节点信息, 实现INetNode接口* @author 闹闹小丫头**/
public class NetNode implements INetNode{// IP地址private String ip;// 服务端口private int port;// 长连接接收端口private int receivePort;// 短链接端口private int rmiSendPort;// 节点类型private int type;// 发送资源次数private int sendTime;/*** 无参构造*/public NetNode() {}/*** 三参构造* @param ip ip地址* @param port 端口* @param type 接口类型(服务器还是客户端)*/public NetNode(String ip, int port, int type) {this.ip = ip;this.port = port;this.type = type;}public void setRmiSendPort(int rmiSendPort) {this.rmiSendPort = rmiSendPort;}public void setReceivePort(int receivePort) {this.receivePort = receivePort;}@Overridepublic String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}@Overridepublic int getPort() {return port;}public void setPort(int port) {this.port = port;}public void setType(int type) {this.type = type;}@Overridepublic int getType() {return type;}public void setSendTime(int sendtime) {this.sendTime = sendtime;}@Overridepublic int getSendTime() {return sendTime;}@Overridepublic int getRmiSendPort() {return rmiSendPort;}@Overridepublic void increaseSendTime() {++this.sendTime;}@Overridepublic int getReceivePort() {return receivePort;}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + ((ip == null) ? 0 : ip.hashCode());result = prime * result + port;return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;NetNode other = (NetNode) obj;if (ip == null) {if (other.ip != null)return false;} else if (!ip.equals(other.ip))return false;if (port != other.port)return false;return true;}@Overridepublic String toString() {return ip + "(" + port + ") " + (type == INetNode.SERVER ? "服务器" : "客户机");}}
package com.wh.mfct.node;/*** 节点适配器* @author 闹闹小丫头**/
public class NetNodeAdapter implements INetNode{@Overridepublic String getIp() {// TODO Auto-generated method stubreturn null;}@Overridepublic int getPort() {// TODO Auto-generated method stubreturn 0;}@Overridepublic int getType() {// TODO Auto-generated method stubreturn 0;}@Overridepublic int getSendTime() {// TODO Auto-generated method stubreturn 0;}@Overridepublic int getRmiSendPort() {// TODO Auto-generated method stubreturn 0;}@Overridepublic int getReceivePort() {// TODO Auto-generated method stubreturn 0;}@Overridepublic void increaseSendTime() {// TODO Auto-generated method stub}}

Receiver接收方

package com.wh.mfct.receiver;import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 接收文件的RandomAccessFile池, 避免因不断地打开关闭造成系统的额外负担* @author 闹闹小丫头**/
public class RandAccessFilePool {private Map<String, RandomAccessFile> rafPool;/*** 无参构造,生成新的 RandomAccessFile池*/public RandAccessFilePool() {rafPool = new ConcurrentHashMap<>();}/*** 通过文件的绝对路径得到文件的RandomAccessFile,若没有该文件的RandomAccessFile,则* 生成一个新的RandomAccessFile,并把其放入RandomAccessFile池中,最后返回该RandomAccessFile* @param filePath  文件的绝对路径* @return*/RandomAccessFile getRaf(String filePath) {RandomAccessFile raf = rafPool.get(filePath);if(raf == null) {try {raf = new RandomAccessFile(filePath, "rw");rafPool.put(filePath, raf);} catch (FileNotFoundException e) {e.printStackTrace();}}return raf;}}
package com.wh.mfct.receiver;import java.io.DataInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.Socket;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import com.wh.mfct.resource.ResourceBaseInfo;
import com.wh.mfct.resource.ResourceStructInfo;
import com.wh.mfct.section.FileSection;
import com.wh.mfct.section.SectionInfo;
import com.wh.mfct.section.UnreceivedFileSection;
import com.wh.mfct.view.IReceiveViewAction;/*** 接收线程类,用于接收文件* @author 闹闹小丫头**/
public class Receiver implements Runnable{// 发送端private Socket socket;// 接收信道private DataInputStream dis;// 接收界面private IReceiveViewAction receiveView;// 文件基本信息private ResourceBaseInfo rbi;// 文件结构信息Mapprivate Map<Integer, ResourceStructInfo> rsiMap;// 未接受文件片段Mapprivate Map<Integer, UnreceivedFileSection> ufsMap;/*** 直接产生一个线程是不合理的,尤其无法处理整个文件接收结束的判断。<br>* 可以用线程池,且,每一个接收端服务器独立开辟一个线程池;<br>* 或者可以考虑线程池中的group功能。<br>* 无论使用上述哪种方式,核心目的是,在最后一个接受线程结束前,若发现接收线程* 只剩下自己,那么,意味着发送过程全部结束。<br>* 这个结束是必须判断的,不然,无法终止接收进度条、无法进行断点续传的判断等等工作!* @param socket 文件发送方* @param rbi 文件基本信息* @param receiveView 接收界面* @param ufsMap 未接收片段Map*/public Receiver(Socket socket, ResourceBaseInfo rbi, IReceiveViewAction receiveView,Map<Integer, UnreceivedFileSection> ufsMap) {this.socket = socket;this.receiveView = receiveView;this.rbi = rbi;this.rsiMap = new HashMap<Integer, ResourceStructInfo>();this.ufsMap = ufsMap;List<ResourceStructInfo> rsiList = rbi.getRsiList();for(ResourceStructInfo rsi : rsiList) {int fileHandle = rsi.getFileHandle();rsiMap.put(fileHandle, rsi);}// TODO 必须改进之处!new Thread(this).start();}/*** 设置文件结构信息Map* @param rsiMap 文件结构信息Map*/public void setRsiMap(Map<Integer, ResourceStructInfo> rsiMap) {this.rsiMap = rsiMap;}@Overridepublic void run() {String absoluteRoot = rbi.getAbsoluteRoot();RandAccessFilePool rafp = new RandAccessFilePool();try {dis = new DataInputStream(socket.getInputStream());} catch (IOException e) {// 告知View, 无法建立通信信道,这个接收线程失败!return;}try {while(true) {// 一个文件片段一个的接收FileSection fileSection = new FileSection();fileSection.getFileSection(dis);SectionInfo section  = fileSection.getSection();byte[] value = fileSection.getValue();int fileHandle = section.getFileHandle();ResourceStructInfo rsi = rsiMap.get(fileHandle);String filePath = absoluteRoot + rsi.getFilePath();RandomAccessFile raf = rafp.getRaf(filePath);raf.seek(section.getOffset());raf.write(value);UnreceivedFileSection ufs = ufsMap.get(fileHandle);ufs.afterReceiveSection(section);if(receiveView != null) {// TODO 更改文件接收进度条}}} catch (IOException e) {// TODO 本线程的接收操作结束!}}}
package com.wh.mfct.receiver;import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import com.wh.mfct.resource.ResourceBaseInfo;
import com.wh.mfct.resource.ResourceStructInfo;
import com.wh.mfct.section.UnreceivedFileSection;
import com.wh.mfct.view.IReceiveViewAction;/*** 接收服务器类,用于与发送端建立连接,并生成新的接收线程* @author 闹闹小丫头**/
public class ReceiveServer implements Runnable{// 接收端ip地址static String ip;static {try {InetAddress inetAddress = InetAddress.getLocalHost();ip = inetAddress.getHostAddress();} catch (UnknownHostException e) {e.printStackTrace();}}// 文件基本信息private ResourceBaseInfo resourceBaseInfo;// 服务器端private ServerSocket server;// 接收端口private int receivePort;// 发送端个数private int senderCount;private volatile boolean ok;// 未接收文件片段Mapprivate Map<Integer, UnreceivedFileSection> ufsMap;// 接收界面private IReceiveViewAction receiveView;/*** 无参构造*/public ReceiveServer() {}/*** 设置文件基本信息,并初始化 ufsMap、 遍历文件基本信息内的文件基本结构Map文件并对其进行填充* @param resourceBaseInfo 文件基本信息* @return*/public ReceiveServer setResourceBaseInfo(ResourceBaseInfo resourceBaseInfo) {this.resourceBaseInfo = resourceBaseInfo;ufsMap = new HashMap<Integer, UnreceivedFileSection>();List<ResourceStructInfo> rsiList = resourceBaseInfo.getRsiList();for(ResourceStructInfo rsi : rsiList) {int fileHandle = rsi.getFileHandle();int size = (int) rsi.getFileSize();UnreceivedFileSection ufs = new UnreceivedFileSection(fileHandle, size);ufsMap.put(fileHandle, ufs);}return this;}/*** 设置接收界面* @param receiveView 接收界面* @return 这个类*/public ReceiveServer setReceiveView(IReceiveViewAction receiveView) {this.receiveView = receiveView;return this;}/*** 设置接收端口* @param receivePort 接收端口* @return*/public ReceiveServer setReceivePort(int receivePort) {this.receivePort = receivePort;return this;}/*** 设置发送端个数* @param senderCount 发送端个数* @return*/public ReceiveServer setSenderCount(int senderCount) {this.senderCount = senderCount;// 选择性显示View, 告知接收端个数。if(receiveView != null) {receiveView.getSenderCount(senderCount);}return this;}/*** 启动接收服务器*/public void startup() {try {this.server = new ServerSocket(receivePort);this.ok = true;new Thread(this).start();} catch (IOException e) {e.printStackTrace();}}/*** 关闭接收服务器*/public void close() {if(this.server != null && !this.server.isClosed()) {try {this.server.close();} catch (IOException e) {e.printStackTrace();} finally {this.server = null;}}}@Overridepublic void run() {int count = 0;while(ok && count < senderCount) {try {Socket sender = server.accept();// 选择性告知View,连接到一个发送端。if(receiveView != null) {receiveView.linkedOneSender(sender);}// 启动一个接收线程,完成具体接受过程!new Receiver(sender, resourceBaseInfo, receiveView, ufsMap);} catch (IOException e) {e.printStackTrace();}}}}

资源分配及节点选择策略类

这是博主自己的策略,即为系统默认的分配策略,还可以有更多的策略实现,但都必须实现对应的接口;这就解决了自平衡的问题

package com.wh.mfct.receiver;import java.util.ArrayList;
import java.util.List;import com.wh.mfct.node.INetNode;
import com.wh.mfct.resource.ResourceBaseInfo;
import com.wh.mfct.resource.SendSectionInfo;
import com.wh.mfct.section.SectionInfo;
import com.wh.mfct.strategy.IResourceAllocationStrategy;
import com.wh.mfct.strategy.ResourceAllocationStrategy;/*** 资源分配类* @author 闹闹小丫头**/
public class ResourceAllocation {private IResourceAllocationStrategy resourceAllocationStrategy;/*** 无参构造,生成默认的资源分配策略*/public ResourceAllocation() {this.resourceAllocationStrategy = new ResourceAllocationStrategy();}/*** 单参构造,生成指定资源分配策略* @param resourceAllocationStrategy 指定的资源分配策略*/public ResourceAllocation(IResourceAllocationStrategy resourceAllocationStrategy) {this.resourceAllocationStrategy = resourceAllocationStrategy;}/*** 根据指定的接收端和发送端列表将指定的资源基本信息进行拆分,生成发送端个数个的发送片段信息* @param rbi 指定资源基本信息* @param receiveNode 接收端* @param sendNodeList 发送端列表* @return 发送片段信息列表*/public List<SendSectionInfo> allocationResource(ResourceBaseInfo rbi,INetNode receiveNode, List<INetNode> sendNodeList) {int sendCount = sendNodeList.size();List<SectionInfo> orgResource = rbi.getSiList();List<List<SectionInfo>> sectionListList = resourceAllocationStrategy.allocationSection(orgResource, sendCount);List<SendSectionInfo> result = new ArrayList<>();for(int index = 0; index < sendCount; index++) {SendSectionInfo sendSectionInfo = new SendSectionInfo();sendSectionInfo.setReceiveNode(receiveNode);sendSectionInfo.setSendNode(sendNodeList.get(index));ResourceBaseInfo sendRbi = new ResourceBaseInfo(rbi);sendRbi.setSiList(sectionListList.get(index));sendSectionInfo.setRbi(sendRbi);result.add(sendSectionInfo);}return result;}}
package com.wh.mfct.strategy;import java.util.List;import com.wh.mfct.node.INetNode;/*** 节点选择策略需要实现的接口* @author 闹闹小丫头**/
public interface INodeSelectStrategy {// 默认最大发送端个数int DEFAULT_MAX_SENDER_COUNT = 5;// 最小发送端个数int MIN_SENDER_COUNT = 1;/*** 根据指定的原始节点列表,选择出适合的节点* @param orgSendNodeList 原始节点列表* @return 返回选择出的适合的节点*/List<INetNode> selectNodeList(List<INetNode> orgSendNodeList);/*** 是指最大发送节点个数* @param maxSenderCount 最大发送端个数*/void setMaxSenderCount(int maxSenderCount);/*** 设置服务器是否发送* @param serverDoSend 服务器是否发送boolean值*/void setServerDoSend(boolean serverDoSend);
}
package com.wh.mfct.strategy;import java.util.List;import com.wh.mfct.section.SectionInfo;/*** 资源分发策略需要实现的接口* @author 闹闹小丫头**/
public interface IResourceAllocationStrategy {// 默认最大片段长度int DEFAULT_MAX_SECTION_LENGTH = 1 << 22;// 最小片段长度int MIN_SECTION_LENGTH = 1 << 20;/*** 设置最大片段长度* @param maxSectionLength 最大片段长度*/void setMaxsectionLength(int maxSectionLength);/*** 分发指定的原始片段信息列表成指定的发送端个数份* @param orgResource 原始片段信息列表 * @param sendCount 发送端个数* @return 分配好的片段信息列表的列表*/List<List<SectionInfo>> allocationSection(List<SectionInfo> orgResource, int sendCount);
}
package com.wh.mfct.strategy;import java.util.ArrayList;
import java.util.List;import com.wh.mfct.node.INetNode;/*** 节点选择策略类,实现了INodeSelectStrategy* @author 闹闹小丫头**/
public class NodeSelectStrategy implements INodeSelectStrategy {// 最大发送端个数private int maxSenderCount;// 服务器是否参与发送private boolean serverDoSend;/*** 无参构造,最大发送个数为默认值,服务器参与发送*/public NodeSelectStrategy() {this.maxSenderCount = DEFAULT_MAX_SENDER_COUNT;this.serverDoSend = true;}/*** 根据指定发送节点列表选择出适合的最大发送节点列表* @param nodeList 指定发送节点列表* @return 选择出的发送节点列表*/private List<INetNode> selectMinSendCount(List<INetNode> nodeList) {List<INetNode> resultNodeList = new ArrayList<>();INetNode maxNode = nodeList.get(0);// 得到这些节点中最大发送次数for(int index = 0; index < nodeList.size(); index++) {INetNode node = nodeList.get(index);if(maxNode.getSendTime() < node.getSendTime()) {maxNode = node;}}// 生成以最大发送次数为下标的数组,下标为发送次数int[] sendCount = new int[maxNode.getSendTime()];// 遍历这些节点,找出各个发送次数的节点个数for(INetNode node : nodeList) {++sendCount[node.getSendTime()];}int maxSenderCount = this.maxSenderCount;// 从前向后遍历整个数组,根据最大发送端个数,选择各个发送次数节点的个数for(int index = 0; index < sendCount.length; index++) {// 说明节点已经选择完成,则后面的全为0if(maxSenderCount < 0) {sendCount[index] = 0;} else {// 说明节点选择未完成,获得该下标的数组的节点个数maxSenderCount -= sendCount[index];// 说明节点选择过多,从该下标数组选择一部分节点if(maxSenderCount < 0) {sendCount[index] += maxSenderCount;}}}// 根据数组得到合适的节点,数组下标即为发送次数for(INetNode node : nodeList) {int index = node.getSendTime();if(sendCount[index] <= 0) {continue;}resultNodeList.add(node);sendCount[index]--;}return resultNodeList;}@Overridepublic List<INetNode> selectNodeList(List<INetNode> orgSendNodeList) {List<INetNode> nodeList = orgSendNodeList;int senderCount = nodeList.size();if(senderCount <= 1) {return nodeList;}// 说明服务器不参与发送,从这些节点中剔除服务器if(!serverDoSend) {List<INetNode> serverDontSendNodeList = new ArrayList<INetNode>();for(INetNode node : nodeList) {if(node.getType() == INetNode.SERVER) {continue;}serverDontSendNodeList.add(node);}nodeList = serverDontSendNodeList;}senderCount = nodeList.size();// 说明选择的节点多于最大发送端个数,进行选择if(senderCount > maxSenderCount) {nodeList = selectMinSendCount(nodeList);}return nodeList;}@Overridepublic void setMaxSenderCount(int maxSenderCount) {this.maxSenderCount = maxSenderCount < MIN_SENDER_COUNT? MIN_SENDER_COUNT : maxSenderCount;}@Overridepublic void setServerDoSend(boolean serverDoSend) {this.serverDoSend = serverDoSend;}}
package com.wh.mfct.strategy;import java.util.ArrayList;
import java.util.List;import com.wh.mfct.section.SectionInfo;/*** 资源分发策略类,实现了IResourceAllocationStrategy* @author 闹闹小丫头**/
public class ResourceAllocationStrategy implements IResourceAllocationStrategy {// 最大片段长度private int maxSectionLength;public ResourceAllocationStrategy() {}@Overridepublic void setMaxsectionLength(int maxSectionLength) {this.maxSectionLength = maxSectionLength;}@Overridepublic List<List<SectionInfo>> allocationSection(List<SectionInfo> orgResource, int sendCount) {if(sendCount <= 0) {return null;}List<List<SectionInfo>> sectionListList = new ArrayList<List<SectionInfo>>();for(int index = 0; index < sendCount; index++) {List<SectionInfo> sectionList = new ArrayList<>();sectionListList.add(sectionList);}int index = 0;// 遍历整个原始片段信息列表,将其进行较平均分配for(SectionInfo section : orgResource) {int sectionSize = section.getSize();// 说明这个片段信息大小小于最大片段长度,对其进行直接分配if(sectionSize <= maxSectionLength) {List<SectionInfo> sectionList = sectionListList.get(index);sectionList.add(new SectionInfo(section));// 一个列表接着一个列表的循环分配index = (index + 1) % sendCount;continue;}// 说明这个片段大小大于最大片段大小,将其拆分分配long offset = 0L;int restLen = sectionSize;int len;while(restLen > 0) {len = restLen > maxSectionLength ? maxSectionLength : restLen;List<SectionInfo> sectionList = sectionListList.get(index);sectionList.add(new SectionInfo(section.getFileHandle(),offset + section.getOffset(), len));offset += len;restLen -= len;index = (index + 1) % sendCount;}}return sectionListList;}}

接收服务器端口池

因为一个电脑的端口是有限的,而且我们所能使用的端口是有一定范围性的,所以博主对端口的使用提出了以下观点,在对文件接收时使用了一个端口池类

package com.wh.mfct.receiver;/*** 接收服务器端口池类,用于接收时生成端口* @author 闹闹小丫头**/
public class ReceiveServerPortPool {private static final int DEFAULT_MIN_PORT = 55166;private static final int DEFAULT_MAX_PORT = 55277;private static int minPort = 55166;/*** 关于port池,首先有如下基本设定:<br>* prot池的取值范围必须有限,且设置最大和最小值范围;<br>* 其中的port,可以反复使用。<br>* 实现手段有简单、普通、可回收三种:<br>* 简单模式:定义一个整型量,初值为minPort;每次申请后自增;当增加为maxPort后,回绕重新分配<br>* 普通模式:定义一个线程安全的队列,并用从minPort到maxPort的整型量初始化;<br>*         设定如下几个方法:* <ul>*       <li>boolean hasNext();只要队列非空,则,返回真;</li>*      <li>int next();总是返回队首port,并"出队列";</li>*        <li>void returnPort(int port);归还port到队尾。</li>* </ul>* 可回收模式:定义两个线程安全的队列,分别为:已分配port队列和未分配port队列;<br>* 且,其中的已分配port队列的泛型类,包括:<br>* int port;<br>* long time;<br>* ReceiveServer server;<br>* 其中的time是分配时间,以System.currentMilliTime()为值;<br>* server是用port建立的接收服务器;<br>* 并启用DidaDida时钟,将超过30分钟未归还的port,通过server强制关闭,并回收port。<br>* 超时时间可配置;port范围可配置。* @author 闹闹小丫头**/public static int nextPort() {if(minPort > DEFAULT_MAX_PORT) {minPort = DEFAULT_MIN_PORT;return minPort++;}return minPort++;}}

资源请求类

package com.wh.mfct.receiver;import java.util.List;import com.wh.client.core.ClientProxy;
import com.wh.client.core.RPCClient;
import com.wh.mfct.node.INetNode;
import com.wh.mfct.node.NetNodeAdapter;
import com.wh.mfct.resource.IRMIResource;
import com.wh.mfct.resource.ResourceBaseInfo;
import com.wh.mfct.section.SectionInfo;
import com.wh.mfct.sender.IResourceSender;
import com.wh.mfct.strategy.IResourceAllocationStrategy;
import com.wh.mfct.strategy.ResourceAllocationStrategy;
import com.wh.mfct.view.IReceiveViewAction;/*** 资源请求类,像资源中心进行资源请求,启动接收服务器,并告知各个发送端发送资源* @author 闹闹小丫头**/
public class ResourceRequestor {// 资源中心默认ip地址public static final String DEFAULT_RMI_IP = "192.168.1.29";// 资源中心默认短连接端口public static final int DEFAULT_RMI_PORT = 51778;// 资源中心ip地址private String resourceCenterRmiIp;// 资源中心短连接端口private int resourceCenterRmiPort;// 接收界面动作实现类private IReceiveViewAction receiveViewAction;// 资源分配策略private IResourceAllocationStrategy resourceAllocationStrategy;private static volatile ClientProxy clientProxy;public ResourceRequestor() {this.resourceCenterRmiIp = DEFAULT_RMI_IP;this.resourceCenterRmiPort = DEFAULT_RMI_PORT;this.resourceAllocationStrategy = new ResourceAllocationStrategy();}public void setResourceCenterRmiIp(String resourceCenterRmiIp) {this.resourceCenterRmiIp = resourceCenterRmiIp;}public void setResourceCenterRmiPort(int resourceCenterRmiPort) {this.resourceCenterRmiPort = resourceCenterRmiPort;}public void setReceiveViewAction(IReceiveViewAction receiveViewAction) {this.receiveViewAction = receiveViewAction;}public void setResourceAllocationStrategy(IResourceAllocationStrategy resourceAllocationStrategy) {this.resourceAllocationStrategy = resourceAllocationStrategy;}/*** 短连接生成代理类, 生成资源请求的代理* @return*/private ClientProxy prepareRMIProxy() {if(clientProxy == null) {synchronized (ResourceRequestor.class) {if(clientProxy == null) {RPCClient rpcClient = new RPCClient();rpcClient.setServerIp(resourceCenterRmiIp);rpcClient.setServerPort(resourceCenterRmiPort);clientProxy = new ClientProxy(rpcClient);}}}return clientProxy;}/*** 资源请求类, 向资源中心请求发送端,并根据发送端分配需要接收的资源,并对其向各个发送端进行分发* @param resourceBaseInfo 需要接收的资源基本信息*/public void requestResource(ResourceBaseInfo resourceBaseInfo) {// 像资源中心请求发送端ClientProxy proxy = prepareRMIProxy();IRMIResource rmiResource = proxy.jdkProxy(IRMIResource.class);List<INetNode> senderList = rmiResource.requestResource(resourceBaseInfo);if(senderList.isEmpty()) {if(receiveViewAction != null) {receiveViewAction.hasNoSender();}return;}int senderCount = senderList.size();// 得到一个端口用作接收资源int port = ReceiveServerPortPool.nextPort();// 生成接收服务器ReceiveServer receiveServer = new ReceiveServer().setReceiveView(receiveViewAction).setReceivePort(port).setSenderCount(senderCount);// 启动接收服务器receiveServer.startup();// 生成接收节点信息INetNode me = new ReceiveNetNode(ReceiveServer.ip, port);// 发送端个数将资源基本信息进行分配生成信息列表的列表List<List<SectionInfo>> sectionMatrix = resourceAllocationStrategy.allocationSection(resourceBaseInfo.getSiList(), senderCount);// 将上一步生成信息列表的列表向这些发送端进行分发int count = distributeSendSection(me, senderList, resourceBaseInfo, sectionMatrix);if(count < senderCount) {receiveServer.setSenderCount(count);} }/*** 分发资源类,遍历发送端个数,在其中对指定的接收方和片段信息列表的列表进行组装,* 通过代理机制短连接调用发送端的发送方法,对信息进行分发* @param receiver 接收方节点信息* @param senderList 发送端节点列表* @param orgResourceBaseInfo 原始资源基本信息* @param sectionMatrix 片段信息列表的列表* @return 返回真正发送端的个数*/private int distributeSendSection(INetNode receiver, List<INetNode> senderList,ResourceBaseInfo orgResourceBaseInfo, List<List<SectionInfo>> sectionMatrix) {int senderCount = senderList.size();// 生成客户端代理模板ClientProxy rpcClientProxy = new ClientProxy();RPCClient rpcClient = new RPCClient();int reallySendCount = 0;// 遍历发送端个数for(int index = 0; index < senderCount; index++) {INetNode sender = senderList.get(index);// 完成客户端代理参数rpcClient.setServerIp(sender.getIp());rpcClient.setServerPort(sender.getRmiSendPort());rpcClientProxy.setRPCClient(rpcClient);IResourceSender resourceSender = rpcClientProxy.jdkProxy(IResourceSender.class);// 组装分发信息ResourceBaseInfo targetResource = new ResourceBaseInfo(orgResourceBaseInfo);targetResource.setSiList(sectionMatrix.get(index));try {// 短连接调用发送端方法,并对信息进行分发resourceSender.sendSectionInfo(receiver, targetResource);reallySendCount++;} catch (Exception e) {// 如果短连接调用失败,将未发送的内容需要扔给下一个节点发送}}return reallySendCount;}/*** 接收方节点信息,继承适配器NetNodeAdapter* @author 闹闹小丫头**/class ReceiveNetNode extends NetNodeAdapter{private String ip;private int port;public ReceiveNetNode(String ip, int port) {this.ip = ip;this.port = port;}@Overridepublic String getIp() {return ip;}@Overridepublic int getReceivePort() {return port;}}}

短连接资源请求接口

package com.wh.mfct.resource;import java.util.List;import com.wh.mfct.node.INetNode;/*** 短连接资源请求接口,用于代理机制调用* @author 闹闹小丫头**/
public interface IRMIResource {/*** 注册指定节点及其资源基本信息列表* @param node 指定节点* @param rbiList 指定资源基本信息列表*/void registryNode(INetNode node, List<ResourceBaseInfo> rbiList);/*** 注册指定节点及其资源基本信息* @param node 指定节点* @param rbi 指定资源基本信息*/void registryNode(INetNode node, ResourceBaseInfo rbi);/*** 注销指定节点* @param node 指定节点*/void logoutNode(INetNode node);/*** 增加指定节点的发送次数* @param node*/void increaseSendTime(INetNode node);/*** 向资源中心请求资源* @param rbi 指定资源基本信息* @return 发送端的节点信息列表*/List<INetNode> requestResource(ResourceBaseInfo rbi);
}

资源信息类

package com.wh.mfct.resource;import java.io.File;
import java.util.ArrayList;
import java.util.List;import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactoryConfigurationError;import com.wh.mfct.section.SectionInfo;
import com.wh.util.XMLEditor;/*** 资源基本信息类,包括appName、资源编号、资源版本、资源绝对路径、资源结构信息列表和资源片段信息列表* @author 闹闹小丫头**/
public class ResourceBaseInfo {// app配置文件绝对路径public static final String RESOURCE_XML_PATH = "C:\\.mecResource\\resource.xml"; // app名private String appName;// 资源编号private String id;// 资源版本private String version;// 资源绝对路径private String absoluteRoot;// 资源结构信息列表private List<ResourceStructInfo> rsiList;// 片段信息列表,需要请求的片段信息列表private List<SectionInfo> siList;// 用于APP配置文件private XMLEditor editor;public ResourceBaseInfo() {}/*** 单参构造,根据指定的资源基本信息构造* @param rbi 指定资源基本信息*/public ResourceBaseInfo(ResourceBaseInfo rbi) {this.appName = rbi.appName;this.id = rbi.id;this.version = rbi.version;this.absoluteRoot = rbi.absoluteRoot;}/*** 根据指定绝对路径和该资源基本信息构造资源基本信息* @param absoluteRoot 指定绝对路径* @return 新的资源基本信息*/public ResourceBaseInfo getRequestBaseInfo(String absoluteRoot) {ResourceBaseInfo rbi = new ResourceBaseInfo();rbi.setAppName(this.appName);rbi.setId(this.id);rbi.setVersion(this.version);rbi.setAbsoluteRoot(absoluteRoot);rbi.getRequestSectionList(this.rsiList);return rbi;}/*** 根据指定结构信息列表和自己的绝对路径得到片段信息列表设置自己的片段信息列表* 即为需要请求的片段信息列表* @param rsiList 指定结构信息列表*/public void getRequestSectionList(List<ResourceStructInfo> rsiList) {this.siList = new ArrayList<SectionInfo>();for(ResourceStructInfo rsi : rsiList) {String filePath = absoluteRoot + rsi.getFilePath();if(isRight(rsi, filePath)) {continue;}SectionInfo sectionInfo = new SectionInfo(rsi.getFileHandle(), 0L, (int)rsi.getFileSize());this.siList.add(sectionInfo);}}/*** 判断是否需要请求资源,即为片段信息列表是否为空* @return*/public boolean neededResource() {return !this.siList.isEmpty();}/*** 将指定资源结构信息和指定绝对路径的文件进行比较,看是否正确* @param resourceStructInfo 指定资源结构信息* @param filePath 指定绝对路径* @return */private boolean isRight(ResourceStructInfo resourceStructInfo, String filePath) {File file = new File(filePath);if(!file.exists()) {return false;}if(file.length() != resourceStructInfo.getFileSize()) {return false;}// TODO 还可以进行其他验证return true;}/*** 初始化editor*/private void createXmlEditor() {try {editor = new XMLEditor();} catch (TransformerConfigurationException e) {e.printStackTrace();} catch (ParserConfigurationException e) {e.printStackTrace();} catch (TransformerFactoryConfigurationError e) {e.printStackTrace();}}/*** 将该资源基本信息中除了资源结构信息列表和片段信息列表保存在app默认的配置文件中*/public  void saveResource() {if(editor == null) {createXmlEditor();}List<ResourceStructInfo> tempStructList = rsiList;List<SectionInfo> tempSectionList = siList;rsiList = null;siList = null;editor.insert(RESOURCE_XML_PATH, this);rsiList = tempStructList;siList = tempSectionList;}/*** 得到默认配置文件中生成的资源基本信息* @return*/public ResourceBaseInfo getResourceBaseInfo() {if(editor == null) {createXmlEditor();}return editor.get(RESOURCE_XML_PATH, getClass(),"appName", this.appName);}/*** 扫描该资源基本信息的绝对路径,并生成资源结构信息列表*/public void exploreResource() {exploreResource(null);}/*** 扫描指定app绝对路径扫描该绝对路径下的文件,并生成资源结构信息列表* @param appRoot*/public void exploreResource(String appRoot) {appRoot = appRoot == null ? absoluteRoot : appRoot;File file = new File(appRoot);this.rsiList = new ArrayList<ResourceStructInfo>();// 扫描这个文件目录及所有子目录下的文件,并由此构成<资源结构信息>列表scanResourceRoot(rsiList, 1, appRoot, file);}/*** 根据指定的fileHandle、绝对路径和指定文件生成新的结构信息放入指定的结构信息列表内* @param rsiList 指定的结构信息列表* @param fileHandle 指定文件句柄* @param absoluteRoot 指定绝对路径* @param curFile 指定文件* @return*/private int createResourceStructInfo(List<ResourceStructInfo> rsiList, int fileHandle,String absoluteRoot, File curFile) {ResourceStructInfo rsi = new ResourceStructInfo();rsi.setFileHandle(fileHandle++);rsi.setFileSize(curFile.length());rsi.setFilePath(curFile.getAbsolutePath().replace(absoluteRoot, ""));rsiList.add(rsi);return fileHandle;}/*** 扫描指定目录下的所有文件并放入指定结构信息列表* 根据指定fileHandle、指定绝对路径和指定文件目录(或者文件)生成新的结构信息放入指定结构信息列表内* @param rsiList 指定结构信息列表* @param firstFileHandle 指定文件句柄* @param absoluteRoot 指定绝对路径* @param file 指定文件目录(或文件)* @return 最后一个文件句柄*/private int scanResourceRoot(List<ResourceStructInfo> rsiList,int firstFileHandle,String absoluteRoot, File file) {// 判断是否为文件if(file.isFile()) {// 对文件进行处理return createResourceStructInfo(rsiList, firstFileHandle, absoluteRoot, file);} // 说明这是个目录,得到该目录下所有文件File[] fileList = file.listFiles();// 遍历所有文件for(File f : fileList) {// 判断该文件是否为目录if(f.isDirectory()) {// 说明该文件还是个目录,循环调用这个方法扫描该目录firstFileHandle = scanResourceRoot(rsiList, firstFileHandle, absoluteRoot, f);} else {// 说明这是个文件,进行文件处理firstFileHandle = createResourceStructInfo(rsiList, firstFileHandle, absoluteRoot, f);}}return firstFileHandle;}public String getAppName() {return appName;}public void setAppName(String appName) {this.appName = appName;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getVersion() {return version;}public void setVersion(String version) {this.version = version;}public List<ResourceStructInfo> getRsiList() {return rsiList;}public void setRsiList(List<ResourceStructInfo> rsiList) {this.rsiList = rsiList;}public List<SectionInfo> getSiList() {return siList;}public void setSiList(List<SectionInfo> siList) {this.siList = siList;}public String getAbsoluteRoot() {return absoluteRoot;}public void setAbsoluteRoot(String absoluteRoot) {this.absoluteRoot = absoluteRoot;}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + ((appName == null) ? 0 : appName.hashCode());return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;ResourceBaseInfo other = (ResourceBaseInfo) obj;if (appName == null) {if (other.appName != null)return false;} else if (!appName.equals(other.appName)) return false;return true;}@Overridepublic String toString() {StringBuffer result = new StringBuffer();result.append("AppName=").append(appName).append('\n').append("id=").append(id).append('\n').append("version=").append(version).append('\n').append("absoluteRoot=").append(absoluteRoot);if (rsiList != null) {result.append("\nstruct list:");for (ResourceStructInfo rsi : rsiList) {result.append("\n\t").append(rsi);}}if (siList != null) {result.append("\nrequest list:");for (SectionInfo si : siList) {result.append("\n\t").append(si);}}return result.toString();}}
package com.wh.mfct.resource;/*** 资源结构信息类,包含文件句柄、文件相对路径和文件大小* @author 闹闹小丫头**/
public class ResourceStructInfo {private int fileHandle;private String filePath;private long fileSize;public ResourceStructInfo() {}public int getFileHandle() {return fileHandle;}public void setFileHandle(int fileHandle) {this.fileHandle = fileHandle;}public String getFilePath() {return filePath;}public void setFilePath(String filePath) {this.filePath = filePath;}public long getFileSize() {return fileSize;}public void setFileSize(long fileSize) {this.fileSize = fileSize;}@Overridepublic String toString() {return "fileHandle=" + fileHandle + ", filePath=" + filePath + ", fileSize=" + fileSize;}}
package com.wh.mfct.resource;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 资源池类,存储各个服务器以及客户端向其注册的资源* 以app名称为键,资源基本信息为值* @author 闹闹小丫头**/
public class ResourcePool {public static final Map<String, ResourceBaseInfo> rbiPool = new ConcurrentHashMap<String, ResourceBaseInfo>();public ResourcePool() {}/*** 接收从APP服务器发送的资源基本信息和框架信息;<br>* 1、本地第一次接收到该资源信息:<br>*        将rbi存储到rbiPool中。<br>* 2、本地已经拥有该资源,需要比较版本信息;<br>*      若版本不存在冲突,则,结束。<br>*         若版本存在冲突,则,按第一种情况处理。* @param rbi*/public boolean addResource(ResourceBaseInfo rbi) {String appName = rbi.getAppName();ResourceBaseInfo oldRbi = rbiPool.get(appName);if(oldRbi != null) {if(rbi.getVersion().equals(oldRbi.getVersion())) {return false;}}rbiPool.put(appName, rbi);return true;}/*** 得到指定appName的资源* @param appName* @return*/public static ResourceBaseInfo getResourceBaseInfo(String appName) {return rbiPool.get(appName);}}

资源节点关系表(资源管理中心需要维护的Map)

新版本中直接将分配策略放给消费者自己完成,即消费者自己选择发送的节点,这大大减少了资源管理中心的压力。

package com.wh.mfct.resource;import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;import com.wh.mfct.node.INetNode;
import com.wh.mfct.strategy.INodeSelectStrategy;
import com.wh.mfct.strategy.NodeSelectStrategy;
import com.wh.properties.tool.PropertiesParser;/*** 资源节点关系类,实现了IRMIResource接口* relationMap是资源编号——节点编号列表关系Map* nodePool是节点编号——节点信息关系Map* resourcePool是资源编号——资源基本信息关系Map* @author 闹闹小丫头**/
public class ResourceNodeRelation implements IRMIResource{// 默认阈值public static final double DEFAULT_THRESHOLD = 1.0;// 节点分配策略,新版本这个将不在使用private static volatile INodeSelectStrategy nodeSelectStrategy;private static final Map<Integer, List<Integer>> relationMap = new ConcurrentHashMap<Integer, List<Integer>>();private static final Map<Integer, INetNode> nodePool = new ConcurrentHashMap<Integer, INetNode>();private static final Map<Integer, ResourceBaseInfo> resourcePool = new ConcurrentHashMap<Integer, ResourceBaseInfo>();// 需要删除的节点个数private static int removeNodeCount;// 阈值,用于紧缩资源——节点关系Mapprivate static double validityThreshold = DEFAULT_THRESHOLD;/*** 无参构造,若节点分配策略为空,则使用默认的节点分配策略,新版本这个将不在使用*/public ResourceNodeRelation() {if(nodeSelectStrategy == null) {synchronized (ResourceNodeRelation.class) {if(nodeSelectStrategy == null) {nodeSelectStrategy = new NodeSelectStrategy();}}}}/*** 获得nodePool* @return nodePool*/public static Map<Integer, INetNode> getNodePool() {return nodePool;}/*** 加载配置文件中的节点分配策略,新版本不使用*/private void loadNodeSelectStrategy() {String nssString = PropertiesParser.value("NodeSelectStrategy");if(nssString != null) {try {Class<?> klass = Class.forName(nssString);nodeSelectStrategy = (INodeSelectStrategy)klass.newInstance();} catch (ClassNotFoundException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();}}String strMaxSenderCount = PropertiesParser.value("maxSenderCount");if(strMaxSenderCount != null) {int maxSenderCount = 0;maxSenderCount = Integer.valueOf(strMaxSenderCount);nodeSelectStrategy.setMaxSenderCount(maxSenderCount);}String strServerDoSend = PropertiesParser.value("serverDoSend");if(strServerDoSend != null) {boolean serverDoSend = true;try {serverDoSend = Boolean.valueOf(strServerDoSend);nodeSelectStrategy.setServerDoSend(serverDoSend);} catch (Exception e) {}}String strValidityThreshold = PropertiesParser.value("validityThreshold");if(strValidityThreshold != null) {try {double threshold = Double.valueOf(strValidityThreshold);validityThreshold = threshold;} catch (Exception e) {}}}/*** 加载指定路径的节点分配策略配置文件,新版本不使用* @param resConfigPath*/public void loadNetNodeStrategyConfig(String resConfigPath) {PropertiesParser.loadProperties(resConfigPath);loadNodeSelectStrategy();}@Overridepublic void registryNode(INetNode node, List<ResourceBaseInfo> rbiList) {int nodeHashcode = node.hashCode();nodePool.put(nodeHashcode, node);for(ResourceBaseInfo rbi : rbiList) {int rbiHashCode = rbi.hashCode();resourcePool.put(rbiHashCode, rbi);synchronized (relationMap) {if(!relationMap.containsKey(rbiHashCode)) {List<Integer> nodeList = new LinkedList<Integer>();relationMap.put(rbiHashCode, nodeList);}List<Integer> nodeList = relationMap.get(rbiHashCode);nodeList.add(nodeHashcode);}}}@Overridepublic void registryNode(INetNode node, ResourceBaseInfo rbi) {ArrayList<ResourceBaseInfo> rbiList = new ArrayList<ResourceBaseInfo>();rbiList.add(rbi);registryNode(node, rbiList);}@Overridepublic void logoutNode(INetNode node) {int nodeHashcode = node.hashCode();nodePool.remove(nodeHashcode);removeNodeCount++;if(nodePool.size() <= 0) {return;}if(removeNodeCount / nodePool.size() >= validityThreshold) {// 紧缩资源-节点关系图cleanRelationMap();}}/*** 紧缩资源——节点关系Map*/private void cleanRelationMap() {synchronized (relationMap) {for(Integer resourceId : relationMap.keySet()) {List<Integer> nodeIdList = new ArrayList<Integer>();List<Integer> orgNodeIdList = relationMap.get(resourceId);for(Integer nodeId : orgNodeIdList) {INetNode node = nodePool.get(nodeId);if(node == null) {continue;}nodeIdList.add(nodeId);}relationMap.put(resourceId, nodeIdList);}removeNodeCount = 0;}}@Overridepublic void increaseSendTime(INetNode node) {int nodeId = node.hashCode();INetNode targetNode = ResourceNodeRelation.getNodePool().get(nodeId);if(targetNode == null) {return;}targetNode.increaseSendTime();}@Overridepublic List<INetNode> requestResource(ResourceBaseInfo rbi) {int rbiHashcode = rbi.hashCode();List<INetNode> nodeList = new ArrayList<>();synchronized (relationMap) {List<Integer> nodeIdList = relationMap.get(rbiHashcode);for(int index = 0; index < nodeIdList.size(); index++) {int nodeId = nodeIdList.get(index);INetNode node = nodePool.get(nodeId);if(node != null) {nodeList.add(node);}}}// 新版本不在使用,直接将得到的列表发送给消费者,消费者自己进行选择return nodeSelectStrategy.selectNodeList(nodeList);}
}

字节发送接收类

package com.wh.mfct.section;import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;/*** 字节发送* @author 闹闹小丫头**/
public class ByteSendReceive implements IBytesSendRceive {// 接收数据每次处理的字节数private int bufferSize;/*** 无参构造,使用默认处理字节数*/public ByteSendReceive() {this(DEFAULT_BUFFER_SIZE);}/*** 单参构造,指定每次处理的字节数* @param bufferSize 每次处理的字节数*/public ByteSendReceive(int bufferSize) {this.bufferSize = bufferSize;}@Overridepublic void send(DataOutputStream dos, byte[] data) throws IOException {dos.write(data);}@Overridepublic byte[] receive(DataInputStream dis, int size) throws IOException {byte[] data = new byte[size];int restLen = size;int readLen;int len;int offset = 0;while(restLen > 0) {len = restLen > this.bufferSize ? bufferSize : restLen;readLen = dis.read(data, offset, len);offset += readLen;restLen -= readLen;}return data;}}
package com.wh.mfct.section;import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;/*** 字节发送接收接口* @author 闹闹小丫头**/
public interface IBytesSendRceive {// 默认处理字节数int DEFAULT_BUFFER_SIZE = 1 << 16;/*** 通过指定的通信信道发送指定字节数据* @param dos 指定通信信道* @param data 指定字节数据* @throws IOException*/void send(DataOutputStream dos, byte[] data) throws IOException ;/*** 通过指定通信信道接收指定大小的字节数据并返回* @param dis 指定通信信道* @param size 指定大小* @return 接收到的指定数据* @throws IOException*/byte[] receive(DataInputStream dis, int size) throws IOException;
}

文件片段类

package com.wh.mfct.section;import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;/*** 文件片段类,包含片段信息和文件数据* @author 闹闹小丫头**/
public class FileSection {private SectionInfo section;  // 片段信息private byte[] value;    // 片段内容(数据)private IBytesSendRceive bytesSendRceive;public FileSection() {bytesSendRceive = new ByteSendReceive();}public byte[] getValue() {return value;}public void setValue(byte[] value) {this.value = value;}public void setSection(SectionInfo section) {this.section = section;}public void setBytesSendRceive(IBytesSendRceive bytesSendRceive) {this.bytesSendRceive = bytesSendRceive;}public SectionInfo getSection() {return section;}/*** 改进提示:<br>* 为了能明确发送方已完成发送,可以令发送方在发送完成后,单独发送一个fileHandle值为-1<br>* 的sectionHead;接收时,若fileHandle为-1,可以通过返回fileHandle告知方法调用方接收结束。* @param dis 文件接收信道* @throws IOException * @throws Exception*/public void getFileSection(DataInputStream dis) throws IOException {byte[] sectionHead = bytesSendRceive.receive(dis, SectionInfo.SECTION_INFO_LENGTH);this.section = new SectionInfo(sectionHead);this.value = bytesSendRceive.receive(dis, this.section.getSize());}/*** 通过指定通信信道发送这个文件片段,分两次发送* 第一次发送的是片段信息,第二次发送片段片段数据* @param dos 指定通信信道* @throws IOException*/public void sendSection(DataOutputStream dos) throws IOException {bytesSendRceive.send(dos, section.toBytes());bytesSendRceive.send(dos, value);}}
package com.wh.mfct.section;import com.wh.util.ByteToString;/*** 片段信息类,包含文件的文件句柄、该片段偏移量和该片段的数据大小* @author 闹闹小丫头**/
public class SectionInfo {public static final int SECTION_INFO_LENGTH = 16;private int fileHandle; // 文件句柄private long offset; // 偏移量private int size; // 数据大小/*** 无参构造*/public SectionInfo() {}/*** 单参构造,根据指定的片段信息进行构造* @param section 指定的片段信息*/public SectionInfo(SectionInfo section) {this.fileHandle = section.fileHandle;this.offset = section.offset;this.size = section.size;}/*** 单参构造,根据指定字节值(必须为16B)构造片段信息* @param value*/public SectionInfo(byte[] value) {if(value.length != SECTION_INFO_LENGTH) {System.out.println("长度不为16B!");return;}byte[] bFileHandle = ByteToString.getBytesAt(value, 0, 4);byte[] bOffset = ByteToString.getBytesAt(value, 4, 8);byte[] bSize = ByteToString.getBytesAt(value, 12, 4);this.fileHandle = ByteToString.bytesToint(bFileHandle);this.offset = ByteToString.bytesToLong(bOffset);this.size = ByteToString.bytesToint(bSize);}/*** 三参构造* @param fileHandle 指定文件句柄* @param offset 指定偏移量* @param size 指定大小*/public SectionInfo(int fileHandle, long offset, int size) {this.fileHandle = fileHandle;this.offset = offset;this.size = size;}/*** 判断指定偏移量和大小是否和该片段信息匹配* @param offset 指定偏移量* @param size 指定大小* @return*/public boolean isRightSection(long offset, int size) {return this.offset <= offset && this.offset + this.size >= offset + size;}public int getFileHandle() {return fileHandle;}public void setFileHandle(int fileHandle) {this.fileHandle = fileHandle;}public long getOffset() {return offset;}public void setOffset(long offset) {this.offset = offset;}public int getSize() {return size;}public void setSize(int size) {this.size = size;}/*** 根据该片段信息转化为16B的字节* @return 转化后的16B字节*/public byte[] toBytes() {byte[] bFileHandle = ByteToString.intToBytes(fileHandle);byte[] bOffset = ByteToString.longToBytes(offset);byte[] bSize = ByteToString.intToBytes(size);byte[] result = new byte[16];ByteToString.setByteAt(result, 0, bFileHandle);ByteToString.setByteAt(result, 4, bOffset);ByteToString.setByteAt(result, 12,  bSize);return result;}@Overridepublic String toString() {StringBuffer res = new StringBuffer("fileHandle:");res.append(this.fileHandle).append(",").append("offset:").append(this.offset).append(",").append("size:").append(this.size);return res.toString();}}

断点续传的基础类

package com.wh.mfct.section;import java.util.LinkedList;
import java.util.List;/*** 未接收文件片段类,包含文件句柄和片段信息列表* @author 闹闹小丫头**/
public class UnreceivedFileSection {private int fileHandle;private List<SectionInfo> sections;/*** 双参构造* @param fileHandle 指定文件句柄* @param size 指定大小*/public UnreceivedFileSection(int fileHandle, int size) {this.fileHandle = fileHandle;sections = new LinkedList<SectionInfo>();SectionInfo section = new SectionInfo(fileHandle, 0, size);sections.add(section);}/*** 用于判断接收是否完成* @return 片段信息列表为空,返回true; 片段信息列表不为空,返回false*/public boolean isReceiveCompleted() {return sections.isEmpty();}/*** 根据指定片段信息,从片段信息列表中得到正确的片段信息下标* @param section 指定片段信息* @return 返回得到的正确片段信息的下标* @throws Exception 若找不到指定片段信息,抛出异常*/public int getRightSection(SectionInfo section) throws Exception {long offset = section.getOffset();int size = section.getSize();int index;for(index = 0; index < sections.size(); index++) {SectionInfo orgSection = sections.get(index);if(orgSection.isRightSection(offset, size)) {return index;}}throw new Exception("片段" + section + "异常!");}/*** 接收完指定片段信息后的动作,从片段信息列表中剔除这个接收过的片段* @param section*/public synchronized void afterReceiveSection(SectionInfo section){try {int   index = getRightSection(section);SectionInfo org = sections.get(index);long orgOffset = org.getOffset();int orgSize = org.getSize();long curOffset = section.getOffset();int curSize = section.getSize();long leftOffset = orgOffset;int leftSize = (int)(curOffset - orgOffset);long rightOffset = curOffset + curSize;int rightSize = (int) (orgOffset + orgSize - rightOffset);sections.remove(index);if(leftSize > 0) {sections.add(new SectionInfo(fileHandle, leftOffset, leftSize));}if(rightSize > 0) {sections.add(new SectionInfo(fileHandle, rightOffset, rightSize));}} catch (Exception e) {e.printStackTrace();}}}

资源提供者

package com.wh.mfct.sender;import com.wh.mfct.node.INetNode;
import com.wh.mfct.resource.ResourceBaseInfo;/*** 资源发送方需要实现的接口* @author 闹闹小丫头**/
public interface IResourceSender {/*** 向指定接收方发送根据指定的资源基本信息得到的文件* @param receiver 指定的接收方* @param rbi 指定的资源基本信息*/void sendSectionInfo(INetNode receiver, ResourceBaseInfo rbi);
}
package com.wh.mfct.sender;import com.wh.annotation.Scanning;
import com.wh.mfct.node.INetNode;
import com.wh.mfct.resource.ResourceBaseInfo;/*** 资源发送类,实现IResourceSender接口,用于被远程调用* @author 闹闹小丫头**/
@Scanning(klass = {IResourceSender.class})
public class ResourceSender implements IResourceSender {public ResourceSender() {}@Overridepublic void sendSectionInfo(INetNode receiver, ResourceBaseInfo rbi) {SectionSender sectionSender = new SectionSender().setRbi(rbi).setReceiver(receiver);new Thread(sectionSender).start();}}
package com.wh.mfct.sender;import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import com.wh.mfct.node.INetNode;
import com.wh.mfct.resource.ResourceBaseInfo;
import com.wh.mfct.resource.ResourcePool;
import com.wh.mfct.resource.ResourceStructInfo;
import com.wh.mfct.section.FileSection;
import com.wh.mfct.section.SectionInfo;/*** 片段发送类,用于连接到接收端并发送相关信息* 包含文件绝对路径——RandomAccessFile Map、接收节点信息、从接收端得到的资源基本信息* 、发送服务器、发送通信信道* @author 闹闹小丫头**/
public class SectionSender implements Runnable{private Map<String, RandomAccessFile> rafBuffer;private INetNode receiver;private ResourceBaseInfo rbiFromServer;private Socket socket;private DataOutputStream dos;public SectionSender() {}SectionSender setReceiver(INetNode receiver) {this.receiver = receiver;return this;}SectionSender setRbi(ResourceBaseInfo rbi) {this.rbiFromServer = rbi;return this;}/*** 连接到接收服务器端* @throws UnknownHostException* @throws IOException*/void connectToReceiver() throws UnknownHostException, IOException {this.socket = new Socket(receiver.getIp(), receiver.getPort());this.dos = new DataOutputStream(this.socket.getOutputStream());}/*** 根据指定的fileHandle得到指定的 List<ResourceStructInfo>内的资源结构信息* @param fileHandle 指定的文件句柄* @param rsiList 指定的资源结构信息列表* @return 得到的资源结构信息,如果没有返回null*/private ResourceStructInfo getRsiByFileHandle(int fileHandle, List<ResourceStructInfo> rsiList) {for(ResourceStructInfo rsi : rsiList) {int handle = rsi.getFileHandle();if(handle == fileHandle) {return rsi;}}return null;}/*** 从Map中得到指定绝对路径文件的RandomAccessFile* @param filePath 指定绝对路径* @return 返回指定绝对路径文件的RandomAccessFile* @throws FileNotFoundException*/private RandomAccessFile getRaf(String filePath) throws FileNotFoundException {RandomAccessFile raf = rafBuffer.get(filePath);if(raf != null) {return raf;}raf = new RandomAccessFile(filePath, "r");rafBuffer.put(filePath, raf);return raf;}/*** 关闭Map中所有的RandomAccessFile*/private void closeFile() {for(RandomAccessFile raf : rafBuffer.values()) {try {raf.close();} catch (IOException e) {e.printStackTrace();}}}/*** 向接收端发送文件*/void sendSection() {// 得到默认配置文件中的资源基本信息,从而得到这个App的绝对路径ResourceBaseInfo rbi = new ResourceBaseInfo();String absoluteRoot = rbi.getResourceBaseInfo().getAbsoluteRoot();// 扫描这个绝对路径下的文件,生成结构信息列表rbi.exploreResource(absoluteRoot);List<ResourceStructInfo> rsiList = rbi.getRsiList();List<SectionInfo> sectionList = rbiFromServer.getSiList();rafBuffer = new HashMap<>();// 遍历整个所需要发送的片段信息列表for(SectionInfo section : sectionList) {int fileHandle = section.getFileHandle();// 从这个App的资源结构列表中得到指定fileHandle的资源结构信息ResourceStructInfo rsi = getRsiByFileHandle(fileHandle, rsiList);String filePath = absoluteRoot + rsi.getFileHandle();long offset = section.getOffset();int size = section.getSize();// 获得这个App的指定资源片段,组装并进行发送try {RandomAccessFile raf = getRaf(filePath);raf.seek(offset);byte[] buffer = new byte[size];raf.read(buffer);FileSection fSection = new FileSection();fSection.setSection(section);fSection.setValue(buffer);fSection.sendSection(dos);} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}closeFile();// TODO 告知资源分发服务器,本地完成一次资源发送}private void close() {try {if(dos != null) {dos.close();}} catch (IOException e) {e.printStackTrace();} finally {dos = null;}try {if(socket != null && !socket.isClosed()) {socket.close();}} catch (IOException e) {e.printStackTrace();} finally {socket = null;}}@Overridepublic void run() {try {// 连接到接收服务器connectToReceiver();// 发送文件sendSection();close();} catch (UnknownHostException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}}

文件接收过程

总结

在先前版本当中博主把节点的选择以及资源的分配放在资源管理中心中完成,在慢慢的重复改善优化过程中考虑到资源管理中心的负载压力问题,就对其进行了优化,把整个节点选择及其资源的分配放在了资源消费者端,这样用户还可以通过使用自己的分配策略对节点进行选择,既减少了服务器压力,有具备了更多的人性化。这个实现方式是多种多样的,有更好的意见也可以和博主进行交流,代码本身就是不断优化的过程

[Java]分布式自平衡多文件云传输相关推荐

  1. java 分布式电子商务云平台b2b b2c o2o需要准备哪些技术??

    鸿鹄云商大型企业分布式互联网电子商务平台,推出PC+微信+APP+云服务的云商平台系统,其中包括B2B.B2C.C2C.O2O.新零售.直播电商等子平台. 分布式.微服务.云架构电子商务平台 java ...

  2. java用NIO实现文件传输_Java Nio 实现文件的传输

    使用Java Nio实现文件的传输 1.ServerSocket.java package ch2; import java.io.File; import java.io.FileNotFoundE ...

  3. Java利用TCP进行文件的传输

    采用TCP进行通讯,需要服务器和客户端两个部分,因此程序包含SendFileServer.java和SendFileClient.java两个部分. 两个文件的IP,端口都在程序中指定 传输的文件路径 ...

  4. [JAVA]递归实现客户端与服务端之间的文件与文件夹传输

    JAVA实现文件与文件夹传输 声明 其他方法 客户端: 服务端: 声明 本代码的文件夹传输并非完全由本人完成,本人只是在实现递归的基本思想上,稍微处理与改动了原作者的代码的结构,从而实现了文件与文件夹 ...

  5. java p2p 下载_java p2p文件传输(含服务器端与jsp源码)

    [实例简介] [实例截图] [核心代码] import java.net.*; import java.util.List; import java.awt.*; import javax.swing ...

  6. 在Java中实现SFTP协议文件传输的两种解决方案

    在Java中实现SFTP协议文件传输的两种解决方案 1.1 背景 1.2 关于 FTP /FTPS 1.3 关于SFTP 解决方案一:使用 JSch 库 解决方案二:使用sshj 库 这篇博文来聊聊在 ...

  7. java调用pscp_PuTTY 提供的文件传输工具PSCP (PuTTY Secure Copy client) 基本使用说明

    通过 SSH 连接,在两台机器之间安全的传输文件,可以用于任何 SSH(包括 SSH v1.SSH v2) 服务器. PSCP 的使用 在控制台直接执行 pscp 可以看到帮助 C:\>pscp ...

  8. JAVA检测文件是否传输完成

    /**      * 检测文件是否传输完成      * @param fileName      * @return      * @throws Exception      */     pub ...

  9. Java分布式篇6——RabbitMQ

    Java分布式篇6--RabbitMQ 1.MQ(Message Queue)消息队列 消息队列中间件,是分布式系统中的重要组件 主要解决,异步处理,应用解耦,流量削峰等问题 实现高性能,高可用,可伸 ...

最新文章

  1. 云炬Android开发笔记 19参考面包多商城优化“我的”页面
  2. oracle中noguarantee,关于undo guarantee
  3. SQL查询除了某一列的其他列
  4. PostgreSQL开放自由
  5. C++:编译实验之LR分析器
  6. bzoj1770: [Usaco2009 Nov]lights 燈(折半搜索)
  7. CryptoQuant CEO:比特币大规模从Coinbase流出是最强劲的看涨信号
  8. 使用GDI+绘制高质量图和字体(2)
  9. 江西理工大学c语言考试题库,江西理工大学C语言程序设计竞赛(初级组)(示例代码)...
  10. errortext为什么不显示?原来是rowtemplate的高度作怪要=20
  11. Linux中grep命令查找文件,Linux中使用grep命令搜索文件名及文件内容的方法
  12. android+图标自动排列,Android用RecyclerView实现图标拖拽排序以及增删管理
  13. linux系统添加任务栏蓝牙图标,深度系统中(deepin os)如何使用蓝牙适配器
  14. 基于javaSwing、MySQL的酒店客房管理系统(附源码)
  15. 内存的管理方式有哪些
  16. vue侧边栏菜单一二级模板
  17. 年薪60w的程序员与年薪6w的极品程序员,差距怎么这么大呢?
  18. word突然不能保存的解决方法
  19. Excel数据透视表排序
  20. 行波iq调制器_行波电光相位调制器输出响应的定量分析

热门文章

  1. 【转】程序员10大境界【走在路上,潜心修行】
  2. 迅雷服务器响应超时无法加速,迅雷无法加速敏感资源怎么办?迅雷解除无法加速敏感资源限制教程...
  3. NASA庆祝地球日:50年地球最精美图片亮相(转载)
  4. Secret Milking Machine POJ - 2455
  5. oracle数据库中spool的作用,Oracle中Spool命令如何使用 Oracle中Spool命令使用方法
  6. failed to create network error response from daemon filed to setup ip tables问题
  7. 软件测试的完整案例分析,软件测试案例分析完整版
  8. python查看文件行数_python如何获取打开文件的行数?
  9. 编程练习:编写一个函数,用于计算某长方形面积的函数
  10. 小学生图片_2020中秋节对家人的祝福语 送手抄报小学生图片大全简单又漂亮