一:功能需求

总结

用到的知识点 :
1.序列化和反序列化
2.zookpeer监听机制
3.对象流完成 datanode的 信息的注册.

1.功能:

  1. 当 datanode上线时,namenode中可以感知到.
  2. 当datanode下线时,namenode中可以感知到.
    2.分析:
  3. 当datanode上线时,datanode在zk中创建的节点类型为 瞬时的顺序节点. 
    2. namenode临听节点的变化情况,一旦有一个datanode下线,则这个节点会删除,namenode会自动回调》
  4. 请用对象流完成 datanode的 信息的注册.
  5. datanode的节点信息: System中获取 properties.

二: 前提:

利用zk完成,请先在zk中创建一个 servers 节点.

三 .模拟代码实现

3.1 DataNodeServer类

DataNodeServer该类用于提供外界访问,我把他在maven中pom.xml中加入 指定的主类build 配置 ,这个配置 在jar包中 manidest 文件中加入Main-class指定,就可通过java -Defile.encoding = utf-8 -jar xxx.jar包执行了

public class DataNodeServer {

private String parentNode="/servers";//这是整个集群的根节点
private static ZkHelper zkHelper;
private static ZooKeeper zk;
private static Logger logger = Logger.getLogger(DataNodeServer.class);
private static Scanner sc = new Scanner(System.in);
private static String zkUrl;/*** 客户端与zk服务器连接方法* @throws IOException* @throws InterruptedException*/
private void getConnect() throws IOException, InterruptedException {if(zkUrl!=null){zkHelper.connectString = zkUrl;}zkHelper = new ZkHelper();zk = zkHelper.connect();
}
/*** datanode客户端要将自己的信息打包装成一个Object,将对象序列化成byte[],* 然后再传给 zk 保存到节点的内容中 转化为对象流** @param obj  必须实现java,io,Serilalizable** @return*/
private byte[]  toByteArray(Object obj){byte[] bs = null;ByteArrayOutputStream bos = new ByteArrayOutputStream(); //将数据以byte[]写出(输出)到内存ObjectOutputStream oos = null;try{oos = new ObjectOutputStream(bos);//将可以序列化的对象序列化写出到bos中   obj必须实现java.io.Serializableoos.writeObject(obj);oos.flush();//obj中的数据变成 byte[] 存到内存bs = bos.toByteArray();}catch (IOException e){e.printStackTrace();}finally {try{if(oos!=null){oos.close();}if(bos!=null){bos.close();}}catch (IOException e){e.printStackTrace();}}return bs;
}/*** 将客户端的数据注册到zk中*/
private void registerServer(){Properties p = getLocalHostInfo();byte[] data = toByteArray(p);  //将序列化的byte数组 赋给 data数组String ip = (String) p.get("ip");try {/***    brokers/idss/1   broker/ids/2  borkers/ids/3*    myid : 1 myid :2*    带序号的临时节点 /server/server_192.168.176.200 _000000001   data:P序列化后的byte[]  /servers/server_192.168.76.200_0000*    acl :world:anyone*CreateMode类型分为4种* 1.PERSISTENT--持久型* 2.PERSISTENT_SEQUENTIAL--持久顺序型* 3.EPHEMERAL--临时型  下线既可以删除* 4.EPHEMERAL_SEQUENTIAL--临时顺序型*/String resultPath = zk.create(parentNode+"/server_"+ip+"_",data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);logger.info("在服务器创建节点成功,路径为:"+resultPath);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}
}/*** 获取当前系统的信息包装成一个Proerties** @return*/
private static Properties getLocalHostInfo() {Runtime r = Runtime.getRuntime(); //jvm信息   : jvm内存Properties props = System.getProperties();  //操作系统信息InetAddress addr = null;try {addr = InetAddress.getLocalHost();//IP地址} catch (UnknownHostException e) {e.printStackTrace();}String ip = addr.getHostAddress();   //取地址props.setProperty("ip",ip);props.setProperty("hostname", addr.getHostName());props.setProperty("totalMem",r.freeMemory()+"");props.setProperty("availableProcessors",r.availableProcessors()+"");System.out.println("ip: " + ip);System.out.println("hostname: "+addr.getHostName());System.out.println("jvm可以使用的总内存: "+ r.totalMemory());System.out.println("JVM可以使用的剩余内存: "+r.freeMemory());System.out.println("JVM可以使用的处理器个数: "+r.availableProcessors());System.out.println("Java的运行环境版本:"+props.getProperty("java.version"));System.out.println("Java的运行环境供应商:"+props.getProperty("java.vendor"));System.out.println("Java供应商的URL:"+props.getProperty("java.vendor.url"));System.out.println("Java的安装路径:"+props.getProperty("java.home"));System.out.println("Java的虚拟机规范版本:"+props.getProperty("java.vm.specification.version"));System.out.println("Java的虚拟机规范供应商:"+props.getProperty("java.vm.specification.vendor"));System.out.println("Java的虚拟机规范名称:"+props.getProperty("java.vm.specification.name"));System.out.println("Java的虚拟机实现版本:"+props.getProperty("java.vm.version"));System.out.println("Java的虚拟机实现供应商:"+props.getProperty("java.vm.vendor"));System.out.println("Java的虚拟机实现名称:"+props.getProperty("java.vm.name"));System.out.println("Java运行时环境规范版本:"+props.getProperty("java.specification.version"));System.out.println("Java运行时环境规范供应商:"+props.getProperty("java.specification.vender"));System.out.println("Java运行时环境规范名称:"+props.getProperty("java.specification.name"));System.out.println("Java的类格式版本号:"+props.getProperty("java.class.version"));System.out.println("Java的类路径:"+props.getProperty("java.class.path"));System.out.println("加载库时搜索的路径列表:"+props.getProperty("java.library.path"));System.out.println("默认的临时文件路径:"+props.getProperty("java.io.tmpdir"));System.out.println("一个或多个扩展目录的路径:"+props.getProperty("java.ext.dirs"));System.out.println("操作系统的名称:"+props.getProperty("os.name"));System.out.println("操作系统的构架:"+props.getProperty("os.arch"));System.out.println("操作系统的版本:"+props.getProperty("os.version"));System.out.println("文件分隔符:"+props.getProperty("file.separator")); //在 unix 系统中是"/"System.out.println("路径分隔符:"+props.getProperty("path.separator")); //在 unix 系统中是":"System.out.println("行分隔符:"+props.getProperty("line.separator")); //在 unix 系统中是"/n"System.out.println("用户的账户名称:"+props.getProperty("user.name"));System.out.println("用户的主目录:"+props.getProperty("user.home"));System.out.println("用户的当前工作目录:"+props.getProperty("user.dir"));return props;
}/*** 业务方法:用于耗时*/private void business(){System.out.println("请输入割圆次数: ");int n = sc.nextInt();double y = 1.0;for (int i = 0; i < n; i++) {double π   =  3 * Math.pow( 2 , i) * y;System.out.println("第"+i+"次切割,为正"+(6 + 6*i) + "边形,圆周率π = " + π);y = Math.sqrt( 2 - Math.sqrt( 4 - y*y));}
}/*** 主程序* @param args* @throws InterruptedException* @throws IOException*/
public static void main(String[] args) throws InterruptedException, IOException {if(args != null && args.length >0){zkUrl = args[0];}DataNodeServer server = new DataNodeServer();server.getConnect();server.registerServer(); //第一次datanode后注册到zkboolean flag = false;int choice = 2;while (!flag){System.out.println("请输入你要的操作:\n1.执行计算任务  \n2.退出\n");choice = sc.nextInt();switch (choice){case 1:server.business();break;case 2:zkHelper.close();flag = true;sc.close();break;default:System.out.println("没有这个操作....");}}System.out.println("客户端退出");
}

}

3.2 NameNodeServer

该类用于监视datanodeserve的心跳,可以通过zookpeer的watch来监听上线节点数目,只需要实现Watcher接口再传入

public class NameNodeServer {private static ZkHelper zkHelper;private static ZooKeeper zk;private static Logger logger = Logger.getLogger(NameNodeServer.class);private String parentNode = "/servers";//初始化节点parentdate节点private void initMainNode()throws  Exception{zkHelper = new ZkHelper();zk = zkHelper.connect();if(zk==null || zk.getState()!= ZooKeeper.States.CONNECTED){logger.error("没有建立起与zookpeer服务器"+zkHelper.getConnectString()+"的连接");throw new Exception("没有建立起与zookpeer服务器"+zkHelper.getConnectString()+"的连接");}try{zk.exists(parentNode,true);}catch (Exception e){zk.create(parentNode,"this is yc datanode cluster".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}/*** 获取服务器列表哦* @throws KeeperException* @throws InterruptedException*/private void getServerList() throws KeeperException,InterruptedException{CountDownLatch countDownLatch = new CountDownLatch(Integer.MAX_VALUE);MyMatch nm = new MyMatch(zk,parentNode,countDownLatch);List<String> servers = zk.getChildren(parentNode,nm);//*****第一次取/servers节点的列表后,绑定了监听...logger.info("主线程启动,得到当前datanode列表: ");for (String child : servers) {byte[] data = zk.getData(parentNode+"/"+child,false,null);//这个data就是客户端存入的 Properties对象logger.info(child);}countDownLatch.await();//阻塞}/*** 关闭服务器*/public void closeZookpeer(){logger.info("关闭zookpeer的连接....");if(zk!=null && zk.getState() == ZooKeeper.States.CONNECTED){try {zk.close();}catch (InterruptedException e){e.printStackTrace();}}}
//start-all.sh   stop.all.shpublic static void main(String[] args) throws Exception {//1.实例化服务器NameNodeServer server = new NameNodeServer();//2.建立连接 NameNodeServer -> zk;//3.判断/servers是否存在 如果不存在 则创建 -》initserver.initMainNode();;//绑定监听 /servers下的/子节点变化// * ls path [watch] ->getChildren//*创建,删除子节点事件起作用//countdownloachserver.getServerList();//阻塞式//5.关闭服务器 释放资源server.closeZookpeer();}}/*** 监听子节点的列表发生变化*/
class MyMatch implements Watcher {private ZooKeeper zk;private String path;private CountDownLatch countDownLatch;private Logger logger = Logger.getLogger(MyMatch.class);private void showChildNodeInfo(List<String> children) {//1.循环子节点列表logger.info("当前在线的datanode有:" + children.size());logger.info("他们是:");logger.info(children); //[xx,xx,xxx]logger.info("各节点的详情:");for (String sonPath : children) {logger.info("*****" + sonPath + "*********");try {byte[] data = zk.getData(path + "/" + sonPath, false, null);//byte[]->反序列化Properties p = (Properties) deserilizable(data);logger.info(p);logger.info("**************");} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}//2.根据子节点 路径 显示信息}}
序列化转为byte数组private Object deserilizable(byte[] data) {Object obj = null;ByteArrayInputStream bis = null;ObjectInputStream ois = null;try {bis = new ByteArrayInputStream(data);ois = new ObjectInputStream(bis);obj = ois.readObject();} catch (ClassNotFoundException | IOException e) {e.printStackTrace();} finally {if (ois != null) {try {ois.close();} catch (IOException e) {e.printStackTrace();}}if (bis != null) {try {bis.close();} catch (IOException e) {e.printStackTrace();}}}return obj;}@Overridepublic void process(WatchedEvent event) {if (event.getType() == NodeChildrenChanged) {//实现子节点列表变化的代码logger.info("监听到了子节点的变化...");try {//重新绑定List<String> children = zk.getChildren(path,MyMatch.this);showChildNodeInfo(children);} catch (KeeperException | InterruptedException e) {e.printStackTrace();logger.error(e.getMessage());}} else if (event.getType() == NodeDataChanged) {try {Stat stat = new Stat();//画龙点睛之笔byte[] data = zk.getData(path, MyMatch.this, stat);//异常不能抛出String dataString = new String(data,"utf-8");logger.info("监听程序中获取节点:"+path+"更新后的数据为:"+dataString);logger.info("节点最新的信息stat为:");String info = YcZnodeUtil.printZnodeInfo(stat);logger.info(info);} catch (Exception e) {e.printStackTrace();}}countDownLatch.countDown();  //递减//重新设置countdown 的初始值 让他循环if (countDownLatch.getCount() == 1) {countDownLatch = new CountDownLatch(Integer.MAX_VALUE);}System.out.println("当前  connectedsignal为:   " + countDownLatch.getCount());}public MyMatch(ZooKeeper zk,String path,CountDownLatch countDownLatch){super();this.zk = zk;this.path = path;this.countDownLatch = countDownLatch;}public MyMatch(){super();}public void setZk(ZooKeeper zk){this.zk = zk;}public void setPath(String path){this.path = path;}
}# pom文件配置
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin>
<!--借助assembly插件完成包含项目依赖的jar包--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><appendAssemblyId>false</appendAssemblyId><descriptorRefs>
<!--                        设置将所有的依赖的打进jar包中--><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest>
<!--                          此处main方法入口的class--><mainClass>com.yc.Zookpeek.program2.NameNodeServer</mainClass></manifest></archive>
<!--                    //指定输出路径-->
<!--                    <outputDirectory>D://a</outputDirectory>--></configuration><executions><execution><id>make-assembly</id>
<!--                        将assembly插件绑定了package中到时候只需要双击package指令即可--><phase>package</phase><goals><goal>assembly</goal></goals></execution></executions></plugin></plugins></build><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency></dependencies></project>

【使用zookpeer】模拟 hadoop的 datenode与namenode 的master-slaves的 关系相关推荐

  1. 日常问题——hadoop启动后发现namenode没有启动,但是排除了格式化过度的问题

    hadoop启动后发现namenode没有启动,网上说的格式化过度的问题我是没有的,因为我只格式化过一次.之后查看日志 vim /opt/hadoop/logs/namenode对应的log文件 发现 ...

  2. Hadoop HA HDFS启动 NameNode启动失败解析

    今天也要努力学习 作者大哥:me2xp  https://www.linuxidc.com/Linux/2016-03/129437.htm 看日志真的很重要!!!!!!!!!! 一.问题描述 HA按 ...

  3. Hadoop集群的NameNode的备份

    Hadoop集群中,NameNode节点存储着HDFS上所有文件和目录的元数据信息 如果NameNode挂了,也就意味着整个Hadoop集群也就完了 所以,NameNode节点的备份很重要,可以从以下 ...

  4. 在虚拟机环境下,电脑间拷贝配置好的伪分布式Hadoop环境,出现namenode不能启动的问题!...

    原因:在原来的电脑上配置伪分布的时候,已经将hostname与IP绑定了,所以拷贝到别的电脑的时候,重新启动的时候就会失败,因为新的电脑的IP不见得就和原来的电脑的IP一样!因为在不同的网络中,在NA ...

  5. hadoop集群-格式化namenode节点后,从节点的datanode不能启动,或者是datanode启动到主节点上。

    进行二次namenode格式化前,请注意: hadoop namenode -format 进行着一步前,要把以前格式化的内容删掉,如果,你的路径是在hadoop文件夹内,那么直接格式化即可,如果是在 ...

  6. hadoop启动DataNode和NameNode的神操作

    前言 刚搭建好的hadoop或者启用hadoop时,相信大部分小伙伴都遇到过DataNode和NameNode很神奇地消失了,当然也相信大部分小伙伴为之恼怒过,在此我将讲一讲其中的某些原因以及一个神操 ...

  7. HDFS namenode 高可用(HA)搭建指南 QJM方式 ——本质是多个namenode选举master,用paxos实现一致性...

    一.HDFS的高可用性 1.概述 本指南提供了一个HDFS的高可用性(HA)功能的概述,以及如何配置和管理HDFS高可用性(HA)集群.本文档假定读者具有对HDFS集群的组件和节点类型具有一定理解.有 ...

  8. 切换namenode报错:Unable to fence NameNode at master

    PATH=$PATH:/sbin:/usr/sbin fuser -v -k -n tcp 9000 via ssh: bash: fuser: 未找到命令Unable to fence servic ...

  9. 今天给大家分享用scratch模拟物理电路电压,电阻值,电流的关系!

    获取源码文件,请在公众号中回复消息:"模拟物理电路"

最新文章

  1. Java学习总结:6
  2. 使用SoapUI调用安全WCF SOAP服务–第1部分,该服务
  3. 信息学奥赛一本通 2045:【例5.13】蛇形填数
  4. Python_多项式拟合
  5. python之日期与时间处理模块及利用pandas处理时间序列数据
  6. python 是否可以一键修图_ps如何快速批量修图?
  7. TeamViewer 收不到邮件该怎么办?
  8. 中美线规线径对照表(详细版)
  9. 原生JS实现canvas移动端电子签名板/画板
  10. ubuntu 安装python mysqldb
  11. Android keyevent值中文表
  12. 如何提高信号发生器(信号源)测量时的幅度精度
  13. python idle背景设置为黑色_python IDLE颜色设置
  14. 数据可视化的实现技术和工具比较(HTML5 canvas(Echart)、SVG、webGL等等)
  15. c语言实现鼠标驱动,用C语言写的鼠标驱动程序.doc
  16. SPSS 下载-安装
  17. android-pdf阅读器(Android开源项目)
  18. 手机关机收不到微信消息_为什么手机休眠的时候收不到微信 解决方法
  19. 怎么设置uboot从u盘启动linux,rt5350使用uboot从u盘启动linux成功含从u盘加载镜像与rootfs...
  20. 【乐于折腾】黑苹果 - 修炼之路

热门文章

  1. 英雄联盟里,为什么要打一下,走一下
  2. UG数控编程3种螺旋刀路,可用于各种2d和3d加工过程
  3. matlab图像的恢复
  4. 【报错总结】无法连接Hive的MetaStore数据库
  5. 使用国产KT148A语音芯片sop8封装,用户可以自己更换声音,低成本,高秒数
  6. 打开运行PS、AI等软件时卡在启动窗口的解决办法
  7. 华为手机怎么用云歌_华为手机功能之语音助手小艺,带你了解小艺的使用方式...
  8. A站复兴?B站说NO
  9. I have the Dream
  10. 淘宝数据库OceanBase SQL编译器部分 源码阅读--解析SQL语法树