1 snmpGetDevicesTask方法

 public static Map<String,CapEbu> snmpGetDevicesTask(List<String> ipList, List<String> oidList) {if(ipList==null||ipList.size()==0 || oidList==null || oidList.size()==0) return null;Map<String,CapEbu> ebuMap = new HashMap<String,CapEbu>();List<CapEbu> ebulist = new Vector<CapEbu>();int ipsSize = ipList.size();//ip段大小//指定循环次数,如果不指定循环次数且IP段大小是65536,那么线程池的一个任务就包含600多个ip,那么在极短时间内,当前任务会有一个监听器线程,该线程可能会响应不过来,因为要处理600多个响应。//现在指定一次循环,线程池就只处理1000个ip,同时该循环中只向线程池发布100个任务,平均一个任务只处理10个ip。int cicleNumber = (ipsSize%ScanDeviceContants.DEAL_IPNUMBER == 0)?(ipsSize/ScanDeviceContants.DEAL_IPNUMBER):(ipsSize/ScanDeviceContants.DEAL_IPNUMBER+1);//创建线程执行器ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(ScanDeviceContants.THREADPOOL_SIZE);List<Future<List<CapEbu>>> resultList = new ArrayList<Future<List<CapEbu>>>();for(int cicle=0; cicle<cicleNumber; cicle++){List<String> iptemp = new ArrayList<String>();iptemp.addAll(ipList.subList(cicle*ScanDeviceContants.DEAL_IPNUMBER, cicle==cicleNumber-1?ipList.size():(cicle+1)*ScanDeviceContants.DEAL_IPNUMBER));//创建ip分配器AssignIps assignIps = new AssignIps(iptemp);for(int i=0; i<assignIps.getTaskSize(); i++){SnmpEbuTask snmpEbuTask = new SnmpEbuTask(assignIps.getAssignedIps(i),oidList);Future<List<CapEbu>> result = executor.submit(snmpEbuTask);resultList.add(result);snmpEbuTask = null;}System.out.println("CorePoolSize:"+executor.getCorePoolSize()+"---"+"MaximumPoolSize:"+executor.getMaximumPoolSize()+"-----LargestPoolSize:"+executor.getLargestPoolSize()+"-------PoolSize:"+executor.getPoolSize());executor.allowCoreThreadTimeOut(false);System.out.println("executor.getKeepAliveTime(TimeUnit.MILLISECONDS):"+executor.getKeepAliveTime(TimeUnit.NANOSECONDS));//直到所有任务都已完成,则停止循环do {try {TimeUnit.MILLISECONDS.sleep(500);System.out.println("线程活跃数量:"+executor.getActiveCount());} catch (InterruptedException e) {e.printStackTrace();}}
//          while (executor.getCompletedTaskCount()<resultList.size()*(cicle+1)); while (executor.getActiveCount()!=0); System.out.println("线程活跃数量:"+executor.getActiveCount()+",执行器执行完的数量:"+executor.getCompletedTaskCount());System.out.println("执行器"+(cicle+1)+"是否已终止:"+executor.isTerminated());}executor.shutdown();//关闭线程执行器//所有任务已经完成,开始取出任务for (int i=0; i<resultList.size(); i++) {Future<List<CapEbu>> result=resultList.get(i);if(result.isDone()){try {List<CapEbu> ebus = result.get();if(ebus!=null&&ebus.size()>0){for(int j = 0 ; j < ebus.size() ; j++){if(!ebuMap.containsKey(ebus.get(j).getEbuIp())){ebuMap.put(ebus.get(j).getEbuIp(), ebus.get(j));System.out.println("放入map中,测试结果为 : ip = " + ebus.get(j).getEbuIp());}}}
//                  ebulist.removeAll(ebus);
//                  ebulist.addAll(ebus);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}} return ebuMap;}

2 SnmpEbuTask.java

import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;import org.snmp4j.CommunityTarget;
import org.snmp4j.PDU;
import org.snmp4j.Snmp;
import org.snmp4j.event.ResponseEvent;
import org.snmp4j.event.ResponseListener;
import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.smi.Address;
import org.snmp4j.smi.GenericAddress;
import org.snmp4j.smi.OID;
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.VariableBinding;
import org.snmp4j.transport.DefaultUdpTransportMapping;import com.nufront.euht.model.CapEbu;
import com.nufront.euht.scanDevice.util.OidTranslatorUtil;
import com.nufront.euht.util.Beans;
import com.nufront.euht.util.StringUtil;public class SnmpEbuTask implements Callable<List<CapEbu>> {private int version = SnmpConstants.version2c;private String protocol = "udp";private int port = 161;private String community = "euhtpub";private List<String> ipsAddress;private List<String> oids;private List<CapEbu> capEbus = new Vector<CapEbu>();private Object lock = new Object();//同步锁private int responseCounter = 0;//响应计数器private ConcurrentHashMap<String,Object> distinctIp = new ConcurrentHashMap<String,Object>();//ip响应去重器public SnmpEbuTask(List<String> ipsAddress, List<String> oids) {this.ipsAddress = ipsAddress;this.oids = oids;}private CommunityTarget createCommunityTarget(String address,String community, int version, long timeOut, int retry) {Address targetAddress = GenericAddress.parse(address);CommunityTarget target = new CommunityTarget();target.setCommunity(new OctetString(community));target.setAddress(targetAddress);target.setVersion(version);target.setTimeout(timeOut); // millisecondstarget.setRetries(retry);return target;}@Overridepublic List<CapEbu> call() throws Exception {String address = null;CommunityTarget target = null;DefaultUdpTransportMapping transport = null;Snmp snmp = null;try {transport = new DefaultUdpTransportMapping();transport.listen();snmp = new Snmp(transport);PDU pdu = new PDU();pdu.setType(PDU.GET);for (String oid : oids) {pdu.add(new VariableBinding(new OID(oid)));}ResponseListener listener = new ResponseListener() {public void onResponse(ResponseEvent event) {((Snmp) event.getSource()).cancel(event.getRequest(), this);PDU response = event.getResponse();PDU request = event.getRequest();System.out.println("[request]:" + request);String ip = null;if (Beans.isNotEmpty(event.getPeerAddress())) {ip  = event.getPeerAddress().toString(); // 获取IPif(!StringUtil.isNullOrBlank(ip)){ip = ip.split("/")[0];System.out.println("registerListener, ip=" + ip);}}if (response == null) {System.out.println("[ERROR]: response is null, ip is "+ip);} else if (response.getErrorStatus() != 0) {System.out.println("[ERROR]: response status"+ response.getErrorStatus() + " Text:"+ response.getErrorStatusText()+ " ip is "+ip);} else {if(ip.equals("192.168.22.226")){System.out.println();}System.out.println("Received response Success!!!"+"       ip:"+ip);CapEbu capEbu = new CapEbu();capEbu.setEbuIp(ip);List<OidData> oidDatas = new ArrayList<OidData>();for (int i = 0; i < response.size(); i++) {VariableBinding vb = response.get(i);OidData oidData = new OidData();oidData.setOid(vb.getOid().toString());oidData.setValue(vb.getVariable().toString());oidDatas.add(oidData);}if(Beans.isNotEmpty(oidDatas) && oidDatas.size()>0){for(OidData oidData : oidDatas){OidTranslatorUtil.oidvalueConvertToName(oidData, capEbu);}}if(!distinctIp.containsKey(ip)){distinctIp.put(ip, new Object());capEbus.add(capEbu);System.out.println("capEbus.size():"+capEbus.size());}}synchronized (lock) {responseCounter++;System.out.println("responseCounter++:"+responseCounter);}}};for(int i=0; i<ipsAddress.size(); i++){address = protocol + ":" + ipsAddress.get(i) + "/" + port;target = createCommunityTarget(address, community, version, 1000L, 5);snmp.send(pdu, target, null, listener);System.out.println("asynchronous send pdu wait for response...");}long baselineTime = System.currentTimeMillis();while(responseCounter < ipsAddress.size()){long sendedTime = System.currentTimeMillis();//如果逗留时间超过5秒,有可能是网络原因或者其他原因,导致响应不会到达监听器,为避免死循环,则应马上结束循环。if(sendedTime-baselineTime>5000){break;}TimeUnit.MILLISECONDS.sleep(1000);}return capEbus;} catch (Exception e) {System.out.println("SNMP GetNext Exception:" + e);}finally{snmp.close();//扫描结束后,得立刻关掉snmp,否则监听器的线程将一直处于wait状态,任务数过多的话,线程创建也将越来越多,最终导致系统崩溃transport.close();}return null;}}

一段java并发编程代码相关推荐

  1. java并发编程代码示例_java并发编程之同步器代码示例

    java并发编程之同步器代码示例 发布时间:2020-09-08 16:53:41 来源:脚本之家 阅读:58 作者:Blessing_H 同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作 ...

  2. 【Java并发编程】之十六:深入Java内存模型——happen-before规则及其对DCL的分析(含代码)...

    Java并发编程系列 版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/mmc_maodun/article/details/17348313 转载请注 ...

  3. Java并发编程71道面试题及答案

    Java并发编程71道面试题及答案 1.在java中守护线程和本地线程区别? java中的线程分为两种:守护线程(Daemon)和用户线程(User). 任何线程都可以设置为守护线程和用户线程,通过方 ...

  4. Java并发编程:JMM和volatile关键字

    Java内存模型 随着计算机的CPU的飞速发展,CPU的运算能力已经远远超出了从主内存(运行内存)中读取的数据的能力,为了解决这个问题,CPU厂商设计出了CPU内置高速缓存区.高速缓存区的加入使得CP ...

  5. Java并发编程:线程封闭和ThreadLocal详解

    什么是线程封闭 当访问共享变量时,往往需要加锁来保证数据同步.一种避免使用同步的方式就是不共享数据.如果仅在单线程中访问数据,就不需要同步了.这种技术称为线程封闭.在Java语言中,提供了一些类库和机 ...

  6. 《Java并发编程实践》学习笔记之一:基础知识

    <Java并发编程实践>学习笔记之一:基础知识 1.程序与进程 1.1 程序与进程的概念 (1)程序:一组有序的静态指令,是一种静态概念:  (2)进程:是一种活动,它是由一个动作序列组成 ...

  7. Java并发编程:Thread类的使用

    为什么80%的码农都做不了架构师?>>>    Java并发编程:Thread类的使用 在前面2篇文章分别讲到了线程和进程的由来.以及如何在Java中怎么创建线程和进程.今天我们来学 ...

  8. Java并发编程(五)JVM指令重排

    我是不是学了一门假的java...... 引言:在Java中看似顺序的代码在JVM中,可能会出现编译器或者CPU对这些操作指令进行了重新排序:在特定情况下,指令重排将会给我们的程序带来不确定的结果.. ...

  9. Java并发编程,无锁CAS与Unsafe类及其并发包Atomic

    为什么80%的码农都做不了架构师?>>>    我们曾经详谈过有锁并发的典型代表synchronized关键字,通过该关键字可以控制并发执行过程中有且只有一个线程可以访问共享资源,其 ...

最新文章

  1. PyTorch教程(八):常见激活函数与Loss的梯度
  2. 2016年Web前端面试题
  3. 功能强大的TCGA再分析平台
  4. TCP/IP入门(1) --链路层
  5. git 源代码自动检查_Visual Studio中Git的简单使用
  6. Javascript ES6 Promise异步链式读取文件解决回调地狱
  7. 高等院校计算机考试等级,全国高等院校计算机等级试考试大纲.doc
  8. _CentOS「linux」学习笔记11:crontab定时任务常用参数和基本语法
  9. 计算机网络系统集成实验指导,系统集成实验指导.docx
  10. qt在linux下编译资源文件,linux下Qt qrc文件的编写与应用
  11. 笔记本电脑建立Wifi热点多种方法
  12. 反思-我们真的初老了么?
  13. abs绝对位置指令 三菱plc_三菱FX系列PLC方便指令的使用方法
  14. gitlab-ce更新后reconfigure报错
  15. 视频编辑工具:添加水印、特效、音乐、导出视频、视频转gif
  16. 合格硕士学位论文的工作量
  17. 网站备案必须要云服务器,备案必须要云服务器吗
  18. 研究生复试发邮件注意事项
  19. WPS office根目录在哪?_wps和office的区别是什么
  20. 如何让我们的人生,拥有更多的可能性?

热门文章

  1. Eviews学习笔记
  2. 【Java 数据库】Connections.getTables() 方法 获取数据库的元数据
  3. 【Java网络编程(二)】UDP案例——在线咨询
  4. ACM练习 校赛183F:公平的游戏(TLE)【set的使用,给迭代器增加指定偏移量】
  5. 数据结构 - 字符串 - 最长公共子序列 + 最长公共子字符串 - 动态规划
  6. python输入hello输出olleh_leetcode上的python练习(6)
  7. 04.local_gateway和network相关设置
  8. google怎么做(2.相似网页算法)
  9. python怎么实现类似#define宏定义_Python系列学习笔记
  10. 1032 挖掘机技术哪家强 (20分)——15行代码AC