本文接着分析存储CrawlURI curi的队列容器,最重要的是BdbWorkQueue类及BdbMultipleWorkQueues类

BdbWorkQueue类继承自抽象类WorkQueue,抽象类WorkQueue最重要的方法是

long enqueue(final WorkQueueFrontier frontier,CrawlURI curi)

CrawlURI peek(final WorkQueueFrontier frontier)

void dequeue(final WorkQueueFrontier frontier, CrawlURI expected)

分别为添加CrawlURI curi,获取CrawlURI curi以及完成CrawlURI curi的方法,具体逻辑在其子类实现(这里是BdbWorkQueue类)

BdbWorkQueue类完成抽象父类的逻辑,实现上述方法的具体方法分别为

void insertItem(final WorkQueueFrontier frontier,final CrawlURI curi, boolean overwriteIfPresent)

CrawlURI peekItem(final WorkQueueFrontier frontier)

void deleteItem(final WorkQueueFrontier frontier,final CrawlURI peekItem)

在分析BdbWorkQueue类的相关方法之前,先了解一下BdbWorkQueue类对象的系统环境中的状态

我们先来查看一下BdbWorkQueue对象在系统环境中实例化方法,在BdbFrontier对象的WorkQueue getQueueFor(final String classKey)方法里面

/*** Return the work queue for the given classKey, or null* if no such queue exists.* * @param classKey key to look for* @return the found WorkQueue*/protected WorkQueue getQueueFor(final String classKey) {      WorkQueue wq = allQueues.getOrUse(classKey,new Supplier<WorkQueue>() {public BdbWorkQueue get() {String qKey = new String(classKey); // ensure private minimal keyBdbWorkQueue q = new BdbWorkQueue(qKey, BdbFrontier.this);q.setTotalBudget(getQueueTotalBudget()); //-1
                        System.out.println(getQueuePrecedencePolicy().getClass().getName());getQueuePrecedencePolicy().queueCreated(q);return q;}});return wq;}

在BdbWorkQueue对象实例化时,传入ClassKey和BdbFrontier对象本身,然后是设置BdbWorkQueue对象的属性值(这些属性值用于工作队列的调度,后面文章再具体分析)

我们再看BdbWorkQueue类的构造方法:

/*** Create a virtual queue inside the given BdbMultipleWorkQueues * * @param classKey*/public BdbWorkQueue(String classKey, BdbFrontier frontier) {super(classKey);this.origin = BdbMultipleWorkQueues.calculateOriginKey(classKey);if (LOGGER.isLoggable(Level.FINE)) {LOGGER.fine(getPrefixClassKey(this.origin) + " " + classKey);}// add the queue-front 'cap' entry; see...// http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
        frontier.getWorkQueues().addCap(origin);}

在BdbWorkQueue类的构造方法里面初始化成员变量String classKey和byte[] origin(根据classkey生成的),并将它作为键添加到BdbMultipleWorkQueues封装的Database pendingUrisDB数据库(BdbMultipleWorkQueues对象为BdbFrontier对象成员BdbMultipleWorkQueues pendingUris实际数据库名称为pending)

下面我们来看BdbWorkQueue类的上面提到过的具体实现方法

void insertItem(final WorkQueueFrontier frontier,final CrawlURI curi, boolean overwriteIfPresent)

protected void insertItem(final WorkQueueFrontier frontier,final CrawlURI curi, boolean overwriteIfPresent) throws IOException {try {final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier).getWorkQueues();queues.put(curi, overwriteIfPresent);if (LOGGER.isLoggable(Level.FINE)) {LOGGER.fine("Inserted into " + getPrefixClassKey(this.origin) +" (count " + Long.toString(getCount())+ "): " +curi.toString());}} catch (DatabaseException e) {throw new IOException(e);}}

CrawlURI peekItem(final WorkQueueFrontier frontier)

protected CrawlURI peekItem(final WorkQueueFrontier frontier)throws IOException {final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier).getWorkQueues();DatabaseEntry key = new DatabaseEntry(origin);CrawlURI curi = null;int tries = 1;while(true) {try {curi = queues.get(key);} catch (DatabaseException e) {LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e);}// ensure CrawlURI, if any,  came from acceptable range: if(!ArchiveUtils.startsWith(key.getData(),origin)) {LOGGER.severe("inconsistency: "+classKey+"("+getPrefixClassKey(origin)+") with " + getCount() + " items gave "+ curi +"("+getPrefixClassKey(key.getData()));// clear curi to allow retrycuri = null; // reset key to original origin for retry
                key.setData(origin);}if (curi!=null) {// successbreak;}if (tries>3) {LOGGER.severe("no item where expected in queue "+classKey);break;}tries++;LOGGER.severe("Trying get #" + Integer.toString(tries)+ " in queue " + classKey + " with " + getCount()+ " items using key "+ getPrefixClassKey(key.getData()));}return curi;}

void deleteItem(final WorkQueueFrontier frontier,final CrawlURI peekItem)

protected void deleteItem(final WorkQueueFrontier frontier,final CrawlURI peekItem) throws IOException {try {final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier).getWorkQueues();queues.delete(peekItem);} catch (DatabaseException e) {throw new IOException(e);}}

接下里分析BdbMultipleWorkQueues类的相关方法,照例在分析它的方法之前了解一下该对象的系统环境中的状态

BdbMultipleWorkQueues对象在BdbFrontier类源码中的实例化方法如下:

/*** Create the single object (within which is one BDB database)* inside which all the other queues live. * * @return the created BdbMultipleWorkQueues* @throws DatabaseException*/protected BdbMultipleWorkQueues createMultipleWorkQueues()throws DatabaseException {Database db;boolean recycle = (recoveryCheckpoint != null);BdbModule.BdbConfig dbConfig = new BdbModule.BdbConfig();dbConfig.setAllowCreate(!recycle);// Make database deferred write: URLs that are added then removed // before a page-out is required need never cause disk IO.db = bdb.openDatabase("pending", dbConfig, recycle);return new BdbMultipleWorkQueues(db, bdb.getClassCatalog());}

传入数据库Database db和StoredClassCatalog类型对象 (我后来注意到,这个StoredClassCatalog对象在BdbMultipleWorkQueues构造方法里面并没有用到;这里的BDB数据库类型转换的EntryBinding<CrawlURI>对象没有采用这个StoredClassCatalog对象,而是后面的EntryBinding<CrawlURI> crawlUriBinding对象)

我们再看它的构造函数

/*** Create the multi queue in the given environment. * * @param env bdb environment to use* @param classCatalog Class catalog to use.* @param recycle True if we are to reuse db content if any.* @throws DatabaseException*/public BdbMultipleWorkQueues(Database db,StoredClassCatalog classCatalog)throws DatabaseException {this.pendingUrisDB = db;crawlUriBinding =new KryoBinding<CrawlURI>(CrawlURI.class);
//            new RecyclingSerialBinding<CrawlURI>(classCatalog, CrawlURI.class);
//            new BenchmarkingBinding<CrawlURI>(new EntryBinding[] {
//                new KryoBinding<CrawlURI>(CrawlURI.class,true),
//                new KryoBinding<CrawlURI>(CrawlURI.class,false),
//                new RecyclingSerialBinding<CrawlURI>(classCatalog, CrawlURI.class),
//            });
            }

初始化成员变量Database pendingUrisDB和EntryBinding<CrawlURI> crawlUriBinding

我们再看它的相关方法

void put(CrawlURI curi, boolean overwriteIfPresent)

/*** Put the given CrawlURI in at the appropriate place. * * @param curi* @throws DatabaseException*/public void put(CrawlURI curi, boolean overwriteIfPresent) throws DatabaseException {DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey();if (insertKey == null) {insertKey = calculateInsertKey(curi);curi.setHolderKey(insertKey);}DatabaseEntry value = new DatabaseEntry();crawlUriBinding.objectToEntry(curi, value);// Output tally on avg. size if level is FINE or greater.if (LOGGER.isLoggable(Level.FINE)) {tallyAverageEntrySize(curi, value);}OperationStatus status;if(overwriteIfPresent) {status = pendingUrisDB.put(null, insertKey, value);} else {status = pendingUrisDB.putNoOverwrite(null, insertKey, value);}if (status!=OperationStatus.SUCCESS) {LOGGER.log(Level.SEVERE,"URI enqueueing failed; "+status+ " "+curi, new RuntimeException());}}

上面关键的是我们要了解DatabaseEntry insertKey是怎么算出来的

/*** Calculate the insertKey that places a CrawlURI in the* desired spot. First bytes are always classKey (usu. host)* based -- ensuring grouping by host -- terminated by a zero* byte. Then 8 bytes of data ensuring desired ordering * within that 'queue' are used. The first byte of these 8 is* priority -- allowing 'immediate' and 'soon' items to * sort above regular. Next 1 byte is 'precedence'. Last 6 bytes * are ordinal serial number, ensuring earlier-discovered * URIs sort before later. * * NOTE: Dangers here are:* (1) priorities or precedences over 2^7 (signed byte comparison)* (2) ordinals over 2^48* * Package access & static for testing purposes. * * @param curi* @return a DatabaseEntry key for the CrawlURI*/static DatabaseEntry calculateInsertKey(CrawlURI curi) {byte[] classKeyBytes = null;int len = 0;classKeyBytes = curi.getClassKey().getBytes(Charsets.UTF_8);len = classKeyBytes.length;byte[] keyData = new byte[len+9];System.arraycopy(classKeyBytes,0,keyData,0,len);keyData[len]=0;long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL;ordinalPlus = ((long)curi.getSchedulingDirective() << 56) | ordinalPlus;long precedence = Math.min(curi.getPrecedence(), 127);ordinalPlus = (((precedence) & 0xFFL) << 48) | ordinalPlus;ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len+1);return new DatabaseEntry(keyData);}

CrawlURI get(DatabaseEntry headKey),这个DatabaseEntry headKey是有BdbWorkQueue对象传过来的,具体怎么得到的,待后文分析

/*** Get the next nearest item after the given key. Relies on * external discipline -- we'll look at the queues count of how many* items it has -- to avoid asking for something from a* range where there are no associated items --* otherwise could get first item of next 'queue' by mistake. * * <p>TODO: hold within a queue's range* * @param headKey Key prefix that demarks the beginning of the range* in <code>pendingUrisDB</code> we're interested in.* @return CrawlURI.* @throws DatabaseException*/public CrawlURI get(DatabaseEntry headKey)throws DatabaseException {DatabaseEntry result = new DatabaseEntry();// From Linda Lee of sleepycat:// "You want to check the status returned from Cursor.getSearchKeyRange// to make sure that you have OperationStatus.SUCCESS. In that case,// you have found a valid data record, and result.getData()// (called by internally by the binding code, in this case) will be// non-null. The other possible status return is// OperationStatus.NOTFOUND, in which case no data record matched// the criteria. "OperationStatus status = getNextNearestItem(headKey, result);CrawlURI retVal = null;if (status != OperationStatus.SUCCESS) {LOGGER.severe("See '1219854 NPE je-2.0 "+ "entryToObject...'. OperationStatus "+ " was not SUCCESS: "+ status+ ", headKey "+ BdbWorkQueue.getPrefixClassKey(headKey.getData()));return null;}try {retVal = (CrawlURI)crawlUriBinding.entryToObject(result);} catch (ClassCastException cce) {Object obj = crawlUriBinding.entryToObject(result);LOGGER.log(Level.SEVERE,"see [#HER-1283]: deserialized " + obj.getClass() + " has ClassLoader " + obj.getClass().getClassLoader().getClass(),cce);return null; } catch (RuntimeExceptionWrapper rw) {LOGGER.log(Level.SEVERE,"expected object missing in queue " +BdbWorkQueue.getPrefixClassKey(headKey.getData()),rw);return null; }retVal.setHolderKey(headKey);return retVal;}

进一步调用OperationStatus getNextNearestItem(DatabaseEntry headKey,DatabaseEntry result)方法

protected OperationStatus getNextNearestItem(DatabaseEntry headKey,DatabaseEntry result) throws DatabaseException {Cursor cursor = null;OperationStatus status;try {cursor = this.pendingUrisDB.openCursor(null, null);// get cap; headKey at this point should always point to // a queue-beginning cap entry (zero-length value)status = cursor.getSearchKey(headKey, result, null);if (status != OperationStatus.SUCCESS) {LOGGER.severe("bdb queue cap missing: " + status.toString() + " "  + new String(headKey.getData()));return status;}if (result.getData().length > 0) {LOGGER.severe("bdb queue has nonzero size: " + result.getData().length);return OperationStatus.KEYEXIST;}// get next item (real first item of queue)status = cursor.getNext(headKey,result,null);} finally { if(cursor!=null) {cursor.close();}}return status;}

void delete(CrawlURI item)方法(根据holderKey键删除)

/*** Delete the given CrawlURI from persistent store. Requires* the key under which it was stored be available. * * @param item* @throws DatabaseException*/public void delete(CrawlURI item) throws DatabaseException {OperationStatus status;DatabaseEntry de = (DatabaseEntry)item.getHolderKey();status = pendingUrisDB.delete(null, de);if (status != OperationStatus.SUCCESS) {LOGGER.severe("expected item not present: "+ item+ "("+ (new BigInteger(((DatabaseEntry) item.getHolderKey()).getData())).toString(16) + ")");}}

void forAllPendingDo(Closure c) 方法用于遍历记录并且回调Closure c的方法(闭包)

/*** 遍历记录* Utility method to perform action for all pending CrawlURI instances.* @param c Closure action to perform* @throws DatabaseException*/protected void forAllPendingDo(Closure c) throws DatabaseException {DatabaseEntry key = new DatabaseEntry();DatabaseEntry value = new DatabaseEntry();Cursor cursor = pendingUrisDB.openCursor(null, null);while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) {if (value.getData().length == 0) {continue;}CrawlURI item = (CrawlURI) crawlUriBinding.entryToObject(value);c.execute(item);}cursor.close(); }

CompositeData getFrom(String m, int maxMatches, Pattern pattern, boolean verbose)方法是JMX管理 需要用到的

/*** @param m marker or null to start with first entry* @param maxMatches* @return list of matches starting from marker position* @throws DatabaseException*/public CompositeData getFrom(String m, int maxMatches, Pattern pattern, boolean verbose) throws DatabaseException {int matches = 0;int tries = 0;ArrayList<String> results = new ArrayList<String>(maxMatches);DatabaseEntry key;if (m == null) {key = getFirstKey();} else {byte[] marker = m.getBytes(); // = FrontierJMXTypes.fromString(m);key = new DatabaseEntry(marker);}DatabaseEntry value = new DatabaseEntry();Cursor cursor = null;OperationStatus result = null;try {cursor = pendingUrisDB.openCursor(null,null);result = cursor.getSearchKey(key, value, null);while(matches < maxMatches && result == OperationStatus.SUCCESS) {if(value.getData().length>0) {CrawlURI curi = (CrawlURI) crawlUriBinding.entryToObject(value);if(pattern.matcher(curi.toString()).matches()) {if (verbose) {results.add("[" + curi.getClassKey() + "] " + curi.shortReportLine());} else {results.add(curi.toString());}matches++;}tries++;}result = cursor.getNext(key,value,null);}} finally {if (cursor !=null) {cursor.close();}}if(result != OperationStatus.SUCCESS) {// end of scanm = null;} else {m = new String(key.getData()); // = FrontierJMXTypes.toString(key.getData());
        }String[] arr = results.toArray(new String[results.size()]);CompositeData cd;try {cd = new CompositeDataSupport(/*FrontierJMXTypes.URI_LIST_DATA*/ null,new String[] { "list", "marker" },new Object[] { arr, m });} catch (OpenDataException e) {throw new IllegalStateException(e);}return cd;}

最后我们有必要了解一下EntryBinding<CrawlURI> crawlUriBinding对象,该类为一个泛型类,实现了je的EntryBinding<K>接口,用于将系统里面的自定义类型(在heritrix系统中也就是CrawlURI类)转换为BDB数据库的类型,其源码如下:

/*** Binding for use with BerkeleyDB-JE that uses Kryo serialization rather* than BDB's (custom version of) Java serialization.* * @contributor gojomo*/
public class KryoBinding<K> implements EntryBinding<K> {protected Class<K> baseClass;protected AutoKryo kryo = new AutoKryo(); protected ThreadLocal<WeakReference<ObjectBuffer>> threadBuffer = new ThreadLocal<WeakReference<ObjectBuffer>>() {@Overrideprotected WeakReference<ObjectBuffer> initialValue() {return new WeakReference<ObjectBuffer>(new ObjectBuffer(kryo,16*1024,Integer.MAX_VALUE));}};/*** Constructor. Save parameters locally, as superclass * fields are private. * * @param classCatalog is the catalog to hold shared class information** @param baseClass is the base class for serialized objects stored using* this binding*/@SuppressWarnings("unchecked")public KryoBinding(Class baseClass) {this.baseClass = baseClass;kryo.autoregister(baseClass);// TODO: reevaluate if explicit registration should be requiredkryo.setRegistrationOptional(true);}public Kryo getKryo() {return kryo;}private ObjectBuffer getBuffer() {WeakReference<ObjectBuffer> ref = threadBuffer.get();ObjectBuffer ob = ref.get();if (ob == null) {ob = new ObjectBuffer(kryo,16*1024,Integer.MAX_VALUE);threadBuffer.set(new WeakReference<ObjectBuffer>(ob));}return ob;        }/*** Copies superclass simply to allow different source for FastOoutputStream.* * @see com.sleepycat.bind.serial.SerialBinding#entryToObject*/public void objectToEntry(K object, DatabaseEntry entry) {entry.setData(getBuffer().writeObjectData(object));}@Overridepublic K entryToObject(DatabaseEntry entry) {return getBuffer().readObjectData(entry.getData(), baseClass);}
}

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/17/3025420.html

转载于:https://www.cnblogs.com/chenying99/archive/2013/04/19/3025420.html

Heritrix 3.1.0 源码解析(八)相关推荐

  1. Heritrix 3.1.0 源码解析(六)

    本文分析BdbFrontier对象的相关状态和方法 BdbFrontier类继承自WorkQueueFrontier类   WorkQueueFrontier类继承自AbstractFrontier类 ...

  2. Heritrix 3.1.0 源码解析(十一)

    上文分析了Heritrix3.1.0系统是怎么添加CrawlURI curi对象的,那么在系统初始化的时候,是怎么载入CrawlURI curi种子的呢? 我们回顾前面的文章,在我们执行采集任务的la ...

  3. Heritrix 3.1.0 源码解析(三十四)

    本文主要分析FetchFTP处理器,该处理器用于ftp文件的下载,该处理器的实现是通过封装commons-net-2.0.jar组件来实现ftp文件下载 在FetchFTP处理器里面定义了内部类Soc ...

  4. Heritrix 3.1.0 源码解析(十四)

    我在分析BdbFrontier对象的void schedule(CrawlURI caURI).CrawlURI next() .void finished(CrawlURI cURI)方法是,其实还 ...

  5. Android Glide 3.7.0 源码解析(八) , RecyclableBufferedInputStream 的 mark/reset 实现

    个人博客传送门 一.mark / reset 的作用 Android Glide 3.7.0 源码解析(七) , 细说图形变换和解码有提到过RecyclableBufferedInputStream ...

  6. Alian解读SpringBoot 2.6.0 源码(八):启动流程分析之刷新应用上下文(下)

    目录 一.背景 1.1.刷新的整体调用流程 1.2.本文解读范围 二.初始化特定上下文子类中的其他特殊bean 2.1.初始化主体资源 2.2.创建web服务 三.检查监听器bean并注册它们 四.实 ...

  7. solrlucene3.6.0源码解析(三)

    solr索引操作(包括新增 更新 删除 提交 合并等)相关UML图如下 从上面的类图我们可以发现,其中体现了工厂方法模式及责任链模式的运用 UpdateRequestProcessor相当于责任链模式 ...

  8. 锚框、交并比和非极大值抑制(tf2.0源码解析)

    锚框.交并比和非极大值抑制(tf2.0源码解析) 文章目录 锚框.交并比和非极大值抑制(tf2.0源码解析) 一.锚框生成 1.锚框的宽高 2.锚框的个数 3.注意点(★★★) 4.tf2.0代码 二 ...

  9. 基于8.0源码解析:startService 启动过程

    基于8.0源码解析:startService 启动过程 首先看一张startService的图,心里有个大概的预估,跟Activity启动流程比,Service的启动稍微简单点,并且我把Service ...

最新文章

  1. c语言输入姓名比较是否同性,C语言基础--选择题
  2. kali linux 安装java_kali linux安装java
  3. Vue+axios 实现http拦截及vue-router拦截
  4. python类和对象介绍_python中的类,对象,方法,属性等介绍
  5. 无盘服务器为什么重启还原,无盘站反复重启怎么办
  6. Java基于socket服务实现UDP协议的方法
  7. 2 如何设置窗口title_如何设置华为4G路由2的WiFi黑白名单【设置方法】
  8. 计算机学院运动会通讯稿,2021大学运动会通讯稿篇
  9. hash算法_hash一致性算法
  10. [560]python简单验证文本的Zipf分布
  11. pvr格式的用什么打开_cocos2d 查看pvr图片的详细格式
  12. 会员权益营销中,设置会员权益的三个标准
  13. 解决挂过代理之后ip不变
  14. 自动化测试面试题及答案大全(5)
  15. Go 1.19.3 error原理简析
  16. 利用组合模式来实现组件处理器的工程实践
  17. IOC/DI与AOP概念的理解
  18. 【BZOJ5314】【JSOI2018】—潜入行动(树形dp)
  19. 千寻位置 开发demo_专攻高精定位解决方案,「千寻位置」要解决自动驾驶车辆“我在哪儿”的问题...
  20. 【基础强训】day3

热门文章

  1. 【2012.4.22】北京植物园卧佛寺
  2. SQLServer数据库的备份/恢复的3中策略实例
  3. CloudStack设计思想
  4. Docker容器制作
  5. linux给普通用户分配root权限
  6. php2018面试题20块,php最新面试题2018届毕业生专享
  7. Netflix是这样炼成的:谁构建,谁运维
  8. vim配置@year12
  9. 华为面试题——一道关于指针方面的编程题(C/C++)
  10. Hadoop运维记录系列(三)