【使用zookpeer】模拟 hadoop的 datenode与namenode 的master-slaves的 关系
一:功能需求
总结
用到的知识点 :
1.序列化和反序列化
2.zookpeer监听机制
3.对象流完成 datanode的 信息的注册.
1.功能:
- 当 datanode上线时,namenode中可以感知到.
- 当datanode下线时,namenode中可以感知到.
2.分析: - 当datanode上线时,datanode在zk中创建的节点类型为 瞬时的顺序节点.
2. namenode临听节点的变化情况,一旦有一个datanode下线,则这个节点会删除,namenode会自动回调》 - 请用对象流完成 datanode的 信息的注册.
- datanode的节点信息: System中获取 properties.
二: 前提:
利用zk完成,请先在zk中创建一个 servers 节点.
三 .模拟代码实现
3.1 DataNodeServer类
DataNodeServer该类用于提供外界访问,我把他在maven中pom.xml中加入 指定的主类build 配置 ,这个配置 在jar包中 manidest 文件中加入Main-class指定,就可通过java -Defile.encoding = utf-8 -jar xxx.jar包执行了
public class DataNodeServer {
private String parentNode="/servers";//这是整个集群的根节点
private static ZkHelper zkHelper;
private static ZooKeeper zk;
private static Logger logger = Logger.getLogger(DataNodeServer.class);
private static Scanner sc = new Scanner(System.in);
private static String zkUrl;/*** 客户端与zk服务器连接方法* @throws IOException* @throws InterruptedException*/
private void getConnect() throws IOException, InterruptedException {if(zkUrl!=null){zkHelper.connectString = zkUrl;}zkHelper = new ZkHelper();zk = zkHelper.connect();
}
/*** datanode客户端要将自己的信息打包装成一个Object,将对象序列化成byte[],* 然后再传给 zk 保存到节点的内容中 转化为对象流** @param obj 必须实现java,io,Serilalizable** @return*/
private byte[] toByteArray(Object obj){byte[] bs = null;ByteArrayOutputStream bos = new ByteArrayOutputStream(); //将数据以byte[]写出(输出)到内存ObjectOutputStream oos = null;try{oos = new ObjectOutputStream(bos);//将可以序列化的对象序列化写出到bos中 obj必须实现java.io.Serializableoos.writeObject(obj);oos.flush();//obj中的数据变成 byte[] 存到内存bs = bos.toByteArray();}catch (IOException e){e.printStackTrace();}finally {try{if(oos!=null){oos.close();}if(bos!=null){bos.close();}}catch (IOException e){e.printStackTrace();}}return bs;
}/*** 将客户端的数据注册到zk中*/
private void registerServer(){Properties p = getLocalHostInfo();byte[] data = toByteArray(p); //将序列化的byte数组 赋给 data数组String ip = (String) p.get("ip");try {/*** brokers/idss/1 broker/ids/2 borkers/ids/3* myid : 1 myid :2* 带序号的临时节点 /server/server_192.168.176.200 _000000001 data:P序列化后的byte[] /servers/server_192.168.76.200_0000* acl :world:anyone*CreateMode类型分为4种* 1.PERSISTENT--持久型* 2.PERSISTENT_SEQUENTIAL--持久顺序型* 3.EPHEMERAL--临时型 下线既可以删除* 4.EPHEMERAL_SEQUENTIAL--临时顺序型*/String resultPath = zk.create(parentNode+"/server_"+ip+"_",data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);logger.info("在服务器创建节点成功,路径为:"+resultPath);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}
}/*** 获取当前系统的信息包装成一个Proerties** @return*/
private static Properties getLocalHostInfo() {Runtime r = Runtime.getRuntime(); //jvm信息 : jvm内存Properties props = System.getProperties(); //操作系统信息InetAddress addr = null;try {addr = InetAddress.getLocalHost();//IP地址} catch (UnknownHostException e) {e.printStackTrace();}String ip = addr.getHostAddress(); //取地址props.setProperty("ip",ip);props.setProperty("hostname", addr.getHostName());props.setProperty("totalMem",r.freeMemory()+"");props.setProperty("availableProcessors",r.availableProcessors()+"");System.out.println("ip: " + ip);System.out.println("hostname: "+addr.getHostName());System.out.println("jvm可以使用的总内存: "+ r.totalMemory());System.out.println("JVM可以使用的剩余内存: "+r.freeMemory());System.out.println("JVM可以使用的处理器个数: "+r.availableProcessors());System.out.println("Java的运行环境版本:"+props.getProperty("java.version"));System.out.println("Java的运行环境供应商:"+props.getProperty("java.vendor"));System.out.println("Java供应商的URL:"+props.getProperty("java.vendor.url"));System.out.println("Java的安装路径:"+props.getProperty("java.home"));System.out.println("Java的虚拟机规范版本:"+props.getProperty("java.vm.specification.version"));System.out.println("Java的虚拟机规范供应商:"+props.getProperty("java.vm.specification.vendor"));System.out.println("Java的虚拟机规范名称:"+props.getProperty("java.vm.specification.name"));System.out.println("Java的虚拟机实现版本:"+props.getProperty("java.vm.version"));System.out.println("Java的虚拟机实现供应商:"+props.getProperty("java.vm.vendor"));System.out.println("Java的虚拟机实现名称:"+props.getProperty("java.vm.name"));System.out.println("Java运行时环境规范版本:"+props.getProperty("java.specification.version"));System.out.println("Java运行时环境规范供应商:"+props.getProperty("java.specification.vender"));System.out.println("Java运行时环境规范名称:"+props.getProperty("java.specification.name"));System.out.println("Java的类格式版本号:"+props.getProperty("java.class.version"));System.out.println("Java的类路径:"+props.getProperty("java.class.path"));System.out.println("加载库时搜索的路径列表:"+props.getProperty("java.library.path"));System.out.println("默认的临时文件路径:"+props.getProperty("java.io.tmpdir"));System.out.println("一个或多个扩展目录的路径:"+props.getProperty("java.ext.dirs"));System.out.println("操作系统的名称:"+props.getProperty("os.name"));System.out.println("操作系统的构架:"+props.getProperty("os.arch"));System.out.println("操作系统的版本:"+props.getProperty("os.version"));System.out.println("文件分隔符:"+props.getProperty("file.separator")); //在 unix 系统中是"/"System.out.println("路径分隔符:"+props.getProperty("path.separator")); //在 unix 系统中是":"System.out.println("行分隔符:"+props.getProperty("line.separator")); //在 unix 系统中是"/n"System.out.println("用户的账户名称:"+props.getProperty("user.name"));System.out.println("用户的主目录:"+props.getProperty("user.home"));System.out.println("用户的当前工作目录:"+props.getProperty("user.dir"));return props;
}/*** 业务方法:用于耗时*/private void business(){System.out.println("请输入割圆次数: ");int n = sc.nextInt();double y = 1.0;for (int i = 0; i < n; i++) {double π = 3 * Math.pow( 2 , i) * y;System.out.println("第"+i+"次切割,为正"+(6 + 6*i) + "边形,圆周率π = " + π);y = Math.sqrt( 2 - Math.sqrt( 4 - y*y));}
}/*** 主程序* @param args* @throws InterruptedException* @throws IOException*/
public static void main(String[] args) throws InterruptedException, IOException {if(args != null && args.length >0){zkUrl = args[0];}DataNodeServer server = new DataNodeServer();server.getConnect();server.registerServer(); //第一次datanode后注册到zkboolean flag = false;int choice = 2;while (!flag){System.out.println("请输入你要的操作:\n1.执行计算任务 \n2.退出\n");choice = sc.nextInt();switch (choice){case 1:server.business();break;case 2:zkHelper.close();flag = true;sc.close();break;default:System.out.println("没有这个操作....");}}System.out.println("客户端退出");
}
}
3.2 NameNodeServer
该类用于监视datanodeserve的心跳,可以通过zookpeer的watch来监听上线节点数目,只需要实现Watcher接口再传入
public class NameNodeServer {private static ZkHelper zkHelper;private static ZooKeeper zk;private static Logger logger = Logger.getLogger(NameNodeServer.class);private String parentNode = "/servers";//初始化节点parentdate节点private void initMainNode()throws Exception{zkHelper = new ZkHelper();zk = zkHelper.connect();if(zk==null || zk.getState()!= ZooKeeper.States.CONNECTED){logger.error("没有建立起与zookpeer服务器"+zkHelper.getConnectString()+"的连接");throw new Exception("没有建立起与zookpeer服务器"+zkHelper.getConnectString()+"的连接");}try{zk.exists(parentNode,true);}catch (Exception e){zk.create(parentNode,"this is yc datanode cluster".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}/*** 获取服务器列表哦* @throws KeeperException* @throws InterruptedException*/private void getServerList() throws KeeperException,InterruptedException{CountDownLatch countDownLatch = new CountDownLatch(Integer.MAX_VALUE);MyMatch nm = new MyMatch(zk,parentNode,countDownLatch);List<String> servers = zk.getChildren(parentNode,nm);//*****第一次取/servers节点的列表后,绑定了监听...logger.info("主线程启动,得到当前datanode列表: ");for (String child : servers) {byte[] data = zk.getData(parentNode+"/"+child,false,null);//这个data就是客户端存入的 Properties对象logger.info(child);}countDownLatch.await();//阻塞}/*** 关闭服务器*/public void closeZookpeer(){logger.info("关闭zookpeer的连接....");if(zk!=null && zk.getState() == ZooKeeper.States.CONNECTED){try {zk.close();}catch (InterruptedException e){e.printStackTrace();}}}
//start-all.sh stop.all.shpublic static void main(String[] args) throws Exception {//1.实例化服务器NameNodeServer server = new NameNodeServer();//2.建立连接 NameNodeServer -> zk;//3.判断/servers是否存在 如果不存在 则创建 -》initserver.initMainNode();;//绑定监听 /servers下的/子节点变化// * ls path [watch] ->getChildren//*创建,删除子节点事件起作用//countdownloachserver.getServerList();//阻塞式//5.关闭服务器 释放资源server.closeZookpeer();}}/*** 监听子节点的列表发生变化*/
class MyMatch implements Watcher {private ZooKeeper zk;private String path;private CountDownLatch countDownLatch;private Logger logger = Logger.getLogger(MyMatch.class);private void showChildNodeInfo(List<String> children) {//1.循环子节点列表logger.info("当前在线的datanode有:" + children.size());logger.info("他们是:");logger.info(children); //[xx,xx,xxx]logger.info("各节点的详情:");for (String sonPath : children) {logger.info("*****" + sonPath + "*********");try {byte[] data = zk.getData(path + "/" + sonPath, false, null);//byte[]->反序列化Properties p = (Properties) deserilizable(data);logger.info(p);logger.info("**************");} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}//2.根据子节点 路径 显示信息}}
序列化转为byte数组private Object deserilizable(byte[] data) {Object obj = null;ByteArrayInputStream bis = null;ObjectInputStream ois = null;try {bis = new ByteArrayInputStream(data);ois = new ObjectInputStream(bis);obj = ois.readObject();} catch (ClassNotFoundException | IOException e) {e.printStackTrace();} finally {if (ois != null) {try {ois.close();} catch (IOException e) {e.printStackTrace();}}if (bis != null) {try {bis.close();} catch (IOException e) {e.printStackTrace();}}}return obj;}@Overridepublic void process(WatchedEvent event) {if (event.getType() == NodeChildrenChanged) {//实现子节点列表变化的代码logger.info("监听到了子节点的变化...");try {//重新绑定List<String> children = zk.getChildren(path,MyMatch.this);showChildNodeInfo(children);} catch (KeeperException | InterruptedException e) {e.printStackTrace();logger.error(e.getMessage());}} else if (event.getType() == NodeDataChanged) {try {Stat stat = new Stat();//画龙点睛之笔byte[] data = zk.getData(path, MyMatch.this, stat);//异常不能抛出String dataString = new String(data,"utf-8");logger.info("监听程序中获取节点:"+path+"更新后的数据为:"+dataString);logger.info("节点最新的信息stat为:");String info = YcZnodeUtil.printZnodeInfo(stat);logger.info(info);} catch (Exception e) {e.printStackTrace();}}countDownLatch.countDown(); //递减//重新设置countdown 的初始值 让他循环if (countDownLatch.getCount() == 1) {countDownLatch = new CountDownLatch(Integer.MAX_VALUE);}System.out.println("当前 connectedsignal为: " + countDownLatch.getCount());}public MyMatch(ZooKeeper zk,String path,CountDownLatch countDownLatch){super();this.zk = zk;this.path = path;this.countDownLatch = countDownLatch;}public MyMatch(){super();}public void setZk(ZooKeeper zk){this.zk = zk;}public void setPath(String path){this.path = path;}
}# pom文件配置
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin>
<!--借助assembly插件完成包含项目依赖的jar包--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><appendAssemblyId>false</appendAssemblyId><descriptorRefs>
<!-- 设置将所有的依赖的打进jar包中--><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest>
<!-- 此处main方法入口的class--><mainClass>com.yc.Zookpeek.program2.NameNodeServer</mainClass></manifest></archive>
<!-- //指定输出路径-->
<!-- <outputDirectory>D://a</outputDirectory>--></configuration><executions><execution><id>make-assembly</id>
<!-- 将assembly插件绑定了package中到时候只需要双击package指令即可--><phase>package</phase><goals><goal>assembly</goal></goals></execution></executions></plugin></plugins></build><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency></dependencies></project>
【使用zookpeer】模拟 hadoop的 datenode与namenode 的master-slaves的 关系相关推荐
- 日常问题——hadoop启动后发现namenode没有启动,但是排除了格式化过度的问题
hadoop启动后发现namenode没有启动,网上说的格式化过度的问题我是没有的,因为我只格式化过一次.之后查看日志 vim /opt/hadoop/logs/namenode对应的log文件 发现 ...
- Hadoop HA HDFS启动 NameNode启动失败解析
今天也要努力学习 作者大哥:me2xp https://www.linuxidc.com/Linux/2016-03/129437.htm 看日志真的很重要!!!!!!!!!! 一.问题描述 HA按 ...
- Hadoop集群的NameNode的备份
Hadoop集群中,NameNode节点存储着HDFS上所有文件和目录的元数据信息 如果NameNode挂了,也就意味着整个Hadoop集群也就完了 所以,NameNode节点的备份很重要,可以从以下 ...
- 在虚拟机环境下,电脑间拷贝配置好的伪分布式Hadoop环境,出现namenode不能启动的问题!...
原因:在原来的电脑上配置伪分布的时候,已经将hostname与IP绑定了,所以拷贝到别的电脑的时候,重新启动的时候就会失败,因为新的电脑的IP不见得就和原来的电脑的IP一样!因为在不同的网络中,在NA ...
- hadoop集群-格式化namenode节点后,从节点的datanode不能启动,或者是datanode启动到主节点上。
进行二次namenode格式化前,请注意: hadoop namenode -format 进行着一步前,要把以前格式化的内容删掉,如果,你的路径是在hadoop文件夹内,那么直接格式化即可,如果是在 ...
- hadoop启动DataNode和NameNode的神操作
前言 刚搭建好的hadoop或者启用hadoop时,相信大部分小伙伴都遇到过DataNode和NameNode很神奇地消失了,当然也相信大部分小伙伴为之恼怒过,在此我将讲一讲其中的某些原因以及一个神操 ...
- HDFS namenode 高可用(HA)搭建指南 QJM方式 ——本质是多个namenode选举master,用paxos实现一致性...
一.HDFS的高可用性 1.概述 本指南提供了一个HDFS的高可用性(HA)功能的概述,以及如何配置和管理HDFS高可用性(HA)集群.本文档假定读者具有对HDFS集群的组件和节点类型具有一定理解.有 ...
- 切换namenode报错:Unable to fence NameNode at master
PATH=$PATH:/sbin:/usr/sbin fuser -v -k -n tcp 9000 via ssh: bash: fuser: 未找到命令Unable to fence servic ...
- 今天给大家分享用scratch模拟物理电路电压,电阻值,电流的关系!
获取源码文件,请在公众号中回复消息:"模拟物理电路"
最新文章
- Java学习总结:6
- 使用SoapUI调用安全WCF SOAP服务–第1部分,该服务
- 信息学奥赛一本通 2045:【例5.13】蛇形填数
- Python_多项式拟合
- python之日期与时间处理模块及利用pandas处理时间序列数据
- python 是否可以一键修图_ps如何快速批量修图?
- TeamViewer 收不到邮件该怎么办?
- 中美线规线径对照表(详细版)
- 原生JS实现canvas移动端电子签名板/画板
- ubuntu 安装python mysqldb
- Android keyevent值中文表
- 如何提高信号发生器(信号源)测量时的幅度精度
- python idle背景设置为黑色_python IDLE颜色设置
- 数据可视化的实现技术和工具比较(HTML5 canvas(Echart)、SVG、webGL等等)
- c语言实现鼠标驱动,用C语言写的鼠标驱动程序.doc
- SPSS 下载-安装
- android-pdf阅读器(Android开源项目)
- 手机关机收不到微信消息_为什么手机休眠的时候收不到微信 解决方法
- 怎么设置uboot从u盘启动linux,rt5350使用uboot从u盘启动linux成功含从u盘加载镜像与rootfs...
- 【乐于折腾】黑苹果 - 修炼之路