本文分析BdbFrontier对象的相关状态和方法

BdbFrontier类继承自WorkQueueFrontier类   WorkQueueFrontier类继承自AbstractFrontier类

BdbFrontier类的void start()方法如下(在其父类WorkQueueFrontier里面):

org.archive.crawler.frontier.BdbFrontier

org.archive.crawler.frontier.WorkQueueFrontier

public void start() {if(isRunning()) {return; }uriUniqFilter.setDestination(this);super.start();try {initInternalQueues();} catch (Exception e) {throw new IllegalStateException(e);}}

调用父类AbstractFrontier的void start()方法

 public void start() {if(isRunning()) {return; }if (getRecoveryLogEnabled()) try {initJournal(loggerModule.getPath().getFile().getAbsolutePath());} catch (IOException e) {throw new IllegalStateException(e);}pause();startManagerThread();}

首先设置当前对象(BdbFrontier)为State.PAUSE状态,然后调用void startManagerThread()方法

/*** Start the dedicated thread with an independent view of the frontier's* state. */protected void startManagerThread() {managerThread = new Thread(this+".managerThread") {public void run() {AbstractFrontier.this.managementTasks();}};managerThread.setPriority(Thread.NORM_PRIORITY+1); managerThread.start();}

在线程对象Thread managerThread里面调用void managementTasks()方法

/*** Main loop of frontier's managerThread. Only exits when State.FINISH * is requested (perhaps automatically at URI exhaustion) and reached. * * General strategy is to try to fill outbound queue, then process an* item from inbound queue, and repeat. A HOLD (to be implemented) or * PAUSE puts frontier into a stable state that won't be changed* asynchronously by worker thread activity. */protected void managementTasks() {assert Thread.currentThread() == managerThread;try {loop: while (true) {try {State reachedState = null; switch (targetState) {case EMPTY:reachedState = State.EMPTY; case RUN:// enable outbound takes if previously lockedwhile(outboundLock.isWriteLockedByCurrentThread()) {outboundLock.writeLock().unlock();}if(reachedState==null) {reachedState = State.RUN; }reachedState(reachedState);Thread.sleep(1000);if(isEmpty()&&targetState==State.RUN) {requestState(State.EMPTY); } else if (!isEmpty()&&targetState==State.EMPTY) {requestState(State.RUN); }break;case HOLD:// TODO; for now treat same as PAUSEcase PAUSE:// pausing// prevent all outbound takes
                        outboundLock.writeLock().lock();// process all inboundwhile (targetState == State.PAUSE) {if (getInProcessCount()==0) {reachedState(State.PAUSE);}Thread.sleep(1000);}break;case FINISH:// prevent all outbound takes
                        outboundLock.writeLock().lock();// process all inboundwhile (getInProcessCount()>0) {Thread.sleep(1000);}finalTasks(); // TODO: more cleanup?
                        reachedState(State.FINISH);break loop;}} catch (RuntimeException e) {// log, try to pause, continuelogger.log(Level.SEVERE,"",e);if(targetState!=State.PAUSE && targetState!=State.FINISH) {requestState(State.PAUSE);}}}} catch (InterruptedException e) {throw new RuntimeException(e);} // try to leave in safely restartable state: targetState = State.PAUSE;while(outboundLock.isWriteLockedByCurrentThread()) {outboundLock.writeLock().unlock();}//TODO: ensure all other structures are cleanly reset on restart
        logger.log(Level.FINE,"ending frontier mgr thread");}

上面的方法是不断的根据BdbFrontier对象当前状态设置成员变量protected ReentrantReadWriteLock outboundLock = new ReentrantReadWriteLock(true)的锁定状态

后面的void initInternalQueues() 方法是初始化爬虫任务的相关队列

/*** Initializes internal queues.  May decide to keep all queues in memory based on* {@link QueueAssignmentPolicy#maximumNumberOfKeys}.  Otherwise invokes* {@link #initAllQueues()} to actually set up the queues.* * Subclasses should invoke this method with recycle set to "true" in * a private readObject method, to restore queues after a checkpoint.* * @param recycle* @throws IOException* @throws DatabaseException*/protected void initInternalQueues() throws IOException, DatabaseException {this.initOtherQueues();if (workQueueDataOnDisk()&& preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() >= 0&& preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() <= MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {this.allQueues = new ObjectIdentityMemCache<WorkQueue>(701, .9f, 100);} else {this.initAllQueues();}}

首先调用BdbFrontier对象的void initOtherQueues()方法,在BdbFrontier类里面

@Overrideprotected void initOtherQueues() throws DatabaseException {boolean recycle = (recoveryCheckpoint != null);// tiny risk of OutOfMemoryError: if giant number of snoozed// queues all wake-to-ready at oncereadyClassQueues = new LinkedBlockingQueue<String>();inactiveQueuesByPrecedence = new ConcurrentSkipListMap<Integer,Queue<String>>();retiredQueues = bdb.getStoredQueue("retiredQueues", String.class, recycle);// primary snoozed queuessnoozedClassQueues = new DelayQueue<DelayedWorkQueue>();// just in case: overflow for extreme situationssnoozedOverflow = bdb.getStoredMap("snoozedOverflow", Long.class, DelayedWorkQueue.class, true, false);this.futureUris = bdb.getStoredMap("futureUris", Long.class, CrawlURI.class, true, recoveryCheckpoint!=null);// initialize master map in which other queues livethis.pendingUris = createMultipleWorkQueues();}

上述方法初始化了一系列的队列,这些队列各自的作用待后文再分析

void initAllQueues()方法是初始化成员变量ObjectIdentityCache<WorkQueue> allQueues = null;如下,在BdbFrontier类里面

@Overrideprotected void initAllQueues() throws DatabaseException {boolean isRecovery = (recoveryCheckpoint != null);this.allQueues = bdb.getObjectCache("allqueues", isRecovery, WorkQueue.class, BdbWorkQueue.class);if(isRecovery) {// restore simple instance fields JSONObject json = recoveryCheckpoint.loadJson(beanName);try {nextOrdinal.set(json.getLong("nextOrdinal"));queuedUriCount.set(json.getLong("queuedUriCount"));futureUriCount.set(json.getLong("futureUriCount"));succeededFetchCount.set(json.getLong("succeededFetchCount"));failedFetchCount.set(json.getLong("failedFetchCount"));disregardedUriCount.set(json.getLong("disregardedUriCount"));totalProcessedBytes.set(json.getLong("totalProcessedBytes"));JSONArray inactivePrecedences = json.getJSONArray("inactivePrecedences"); // restore all intended inactiveQueuesfor(int i = 0; i < inactivePrecedences.length(); i++) {int precedence = inactivePrecedences.getInt(i);inactiveQueuesByPrecedence.put(precedence,createInactiveQueueForPrecedence(precedence,true));}} catch (JSONException e) {throw new RuntimeException(e);}           // retired queues already restored with prior data in initOtherQueues// restore ready queues (those not already on inactive, retired)BufferedReader activeQueuesReader = null;try {activeQueuesReader = recoveryCheckpoint.loadReader(beanName,"active");String line; while((line = activeQueuesReader.readLine())!=null) {readyClassQueues.add(line); }} catch (IOException ioe) {throw new RuntimeException(ioe); } finally {IOUtils.closeQuietly(activeQueuesReader); }// TODO: restore largestQueues topNset?
        }}

ObjectIdentityCache<WorkQueue> allQueues成员用于管理BdbWorkQueue工作队列的缓存

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/18/3027677.html

Heritrix 3.1.0 源码解析(六)相关推荐

  1. Heritrix 3.1.0 源码解析(八)

    本文接着分析存储CrawlURI curi的队列容器,最重要的是BdbWorkQueue类及BdbMultipleWorkQueues类 BdbWorkQueue类继承自抽象类WorkQueue,抽象 ...

  2. Heritrix 3.1.0 源码解析(十一)

    上文分析了Heritrix3.1.0系统是怎么添加CrawlURI curi对象的,那么在系统初始化的时候,是怎么载入CrawlURI curi种子的呢? 我们回顾前面的文章,在我们执行采集任务的la ...

  3. Heritrix 3.1.0 源码解析(三十四)

    本文主要分析FetchFTP处理器,该处理器用于ftp文件的下载,该处理器的实现是通过封装commons-net-2.0.jar组件来实现ftp文件下载 在FetchFTP处理器里面定义了内部类Soc ...

  4. Heritrix 3.1.0 源码解析(十四)

    我在分析BdbFrontier对象的void schedule(CrawlURI caURI).CrawlURI next() .void finished(CrawlURI cURI)方法是,其实还 ...

  5. solrlucene3.6.0源码解析(三)

    solr索引操作(包括新增 更新 删除 提交 合并等)相关UML图如下 从上面的类图我们可以发现,其中体现了工厂方法模式及责任链模式的运用 UpdateRequestProcessor相当于责任链模式 ...

  6. Celery 源码解析六:Events 的实现

    序列文章: Celery 源码解析一:Worker 启动流程概述 Celery 源码解析二:Worker 的执行引擎 Celery 源码解析三: Task 对象的实现 Celery 源码解析四: 定时 ...

  7. 锚框、交并比和非极大值抑制(tf2.0源码解析)

    锚框.交并比和非极大值抑制(tf2.0源码解析) 文章目录 锚框.交并比和非极大值抑制(tf2.0源码解析) 一.锚框生成 1.锚框的宽高 2.锚框的个数 3.注意点(★★★) 4.tf2.0代码 二 ...

  8. 基于8.0源码解析:startService 启动过程

    基于8.0源码解析:startService 启动过程 首先看一张startService的图,心里有个大概的预估,跟Activity启动流程比,Service的启动稍微简单点,并且我把Service ...

  9. Android Glide 3.7.0 源码解析(八) , RecyclableBufferedInputStream 的 mark/reset 实现

    个人博客传送门 一.mark / reset 的作用 Android Glide 3.7.0 源码解析(七) , 细说图形变换和解码有提到过RecyclableBufferedInputStream ...

最新文章

  1. dataframe,python,numpy 问题索引1
  2. layui跳转html如何带参数,Layui跳转页面代码(可携带复杂参数)
  3. 【每日一算法】单词接龙
  4. 如何帮助企业优化商业模式?看精益数据分析的“欺”与“破”
  5. USACO-Section1.2 Friday the Thirteenth (简单日期处理)
  6. java枚举类型转换为Struts2的select的数据
  7. Ubuntu 13.04 双显卡安装NVIDIA GT 630M驱动
  8. Python学习笔记:过滤N位数并绘制折线图
  9. python创建虚拟环境报错typeerror_python 创建虚拟环境时报错OSError, setuptools下载失败...
  10. idea上一步下一步快捷键_领航者的一步,左右行业的下一步!双11海尔洗衣机再夺冠的思考...
  11. 面试题10.3-变态跳台阶
  12. 第4章 基本TCP套接口编程
  13. Android 输入法问题 解决三星s5830i或华为低端机输入法崩溃问题
  14. 微信小程序可滑动周日历组件
  15. ffmpeg 录屏 screen capture recorder
  16. 如何做好项目进度管理
  17. 关于民族的数据库表设计
  18. 关于查找、搜集市场调查报告的一些网站收集
  19. 学习使用php实现公历农历转换的方法代码
  20. C# 生成带二维表头的Excel表

热门文章

  1. python反转义字符_Python对HTML转义字符进行反转义的实现方法
  2. 参数整定临界比例度实验_实验低温浴“秘方”,调节两个溶剂的比例实现零下10度到78度定温配制...
  3. mybatis传参数
  4. URL 授权访问另外一种方法,利用 Java 1.1 访问密码保护的 URL
  5. JavaScript实现单词首字母大写的方法集锦
  6. FilterListener笔记
  7. android两个耳机能连两部手机吗,AirPods使用技巧:如何让耳机同时连接两台手机...
  8. php mysql 大量读取_PHP使用PDO从mysql读取大量数据处理详解
  9. python获取剪切板内容_如何从python中读取(windows)剪贴板中的文本?
  10. notepad去除每行空格后面的所有的内容,并且获取每行最后一个字段的内容(就是删除每行的行首空格)...