ChangeQueue类实现ChangeSource接口,声明了拉取下一条Change对象的方法

 * A source of {@link Change} objects.** @since 2.8*/
public interface ChangeSource {/*** @return the next change, or {@code null} if there is no change available*/public Change getNextChange();
}

在ChangeQueue类实例里面初始化阻塞队列private final BlockingQueue<Change> pendingChanges,作为保存Change对象容器

/*** 初始化阻塞队列pendingChanges* @param size* @param sleepInterval* @param introduceDelayAfterEachScan* @param activityLogger*/private ChangeQueue(int size, long sleepInterval, boolean introduceDelayAfterEachScan, CrawlActivityLogger activityLogger) {pendingChanges = new ArrayBlockingQueue<Change>(size);this.sleepInterval = sleepInterval;this.activityLogger = activityLogger;this.introduceDelayAfterEveryScan = introduceDelayAfterEachScan;}

参数introduceDelayAfterEveryScan设置在数据迭代完毕是否延时

上文中提到在其内部类CallBack中将提交的数据添加到阻塞队列BlockingQueue<Change> pendingChanges之中

而在ChangeQueue实现ChangeSource接口的方法中,实现从阻塞队列获取Change对象

/*** 获取阻塞队列pendingChanges元素* Gets the next available change from the ChangeQueue.  Will wait up to* 1/4 second for a change to appear if none is immediately available.** @return the next available change, or {@code null} if no changes are*         available*/public Change getNextChange() {try {return pendingChanges.poll(250L, TimeUnit.MILLISECONDS);} catch (InterruptedException ie) {return null;}}

ChangeQueue对象作为保存Change对象的缓冲容器,上文中分析到Change对象是通过启动监控器对象DocumentSnapshotRepositoryMonitor的线程方法添加进来的

那么,由哪个对象实现调用ChangeQueue对象的getNextChange()方法取出Change对象数据呢?

通过跟踪CheckpointAndChangeQueue类的loadUpFromChangeSource方法调用了getNextChange()方法,在该方法里面将获取的Chnage对象经过包装为CheckpointAndChange类型对象后添加到成员属性List<CheckpointAndChange> checkpointAndChangeList之中

先熟悉一下相关成员属性和构造函数

 private final AtomicInteger maximumQueueSize =new AtomicInteger(DEFAULT_MAXIMUM_QUEUE_SIZE);private final List<CheckpointAndChange> checkpointAndChangeList;private final ChangeSource changeSource;private final DocumentHandleFactory internalDocumentHandleFactory;private final DocumentHandleFactory clientDocumentHandleFactory;private volatile DiffingConnectorCheckpoint lastCheckpoint;private final File persistDir;  // place to persist enqueued valuesprivate MonitorRestartState monitorPoints = new MonitorRestartState();public CheckpointAndChangeQueue(ChangeSource changeSource, File persistDir,DocumentHandleFactory internalDocumentHandleFactory,DocumentHandleFactory clientDocumentHandleFactory) {this.changeSource = changeSource;this.checkpointAndChangeList= Collections.synchronizedList(new ArrayList<CheckpointAndChange>(maximumQueueSize.get()));this.persistDir = persistDir;this.internalDocumentHandleFactory = internalDocumentHandleFactory;this.clientDocumentHandleFactory = clientDocumentHandleFactory;ensurePersistDirExists();}

包括初始化ChangeSource类型对象changeSource(也即ChangeQueue类型对象)以及List容器List<CheckpointAndChange> checkpointAndChangeList

再来回顾loadUpFromChangeSource方法

 /*** 从ChangeSource拉取Change,加入checkpointAndChangeList*/private void loadUpFromChangeSource() {int max = maximumQueueSize.get();if (checkpointAndChangeList.size() < max) {lastCheckpoint = lastCheckpoint.nextMajor();}   while (checkpointAndChangeList.size() < max) {Change newChange = changeSource.getNextChange();if (newChange == null) {break;}lastCheckpoint = lastCheckpoint.next();checkpointAndChangeList.add(new CheckpointAndChange(lastCheckpoint, newChange));
    }}

方法主要行为即从changeSource对象取出Change对象,然后经过包装为CheckPointAndChange对象添加到 容器List<CheckpointAndChange> checkpointAndChangeList之中

在其resume方法里面调用了loadUpFromChangeSource方法(resume方法在DiffingConnectorDocumentList类的构造函数中调用)

/*** 获取List<CheckpointAndChange>队列* Returns an {@link Iterator} for currently available* {@link CheckpointAndChange} objects that occur after the passed in* checkpoint. The {@link String} form of a {@link DiffingConnectorCheckpoint}* passed in is produced by calling* {@link DiffingConnectorCheckpoint#toString()}. As a side effect, Objects* up to and including the object with the passed in checkpoint are removed* from this queue.** @param checkpointString null means return all {@link CheckpointAndChange}*        objects and a non null value means to return*        {@link CheckpointAndChange} objects with checkpoints after the*        passed in value.* @throws IOException if error occurs while manipulating recovery state*/synchronized List<CheckpointAndChange> resume(String checkpointString)throws IOException {//移除已完成队列
    removeCompletedChanges(checkpointString);//从ChangeSource拉取Change,加入checkpointAndChangeList
    loadUpFromChangeSource();//更新monitorPoints
    monitorPoints.updateOnGuaranteed(checkpointAndChangeList);try {//持久化checkpointAndChangeList到队列文件//一次resume即生成一文件
      writeRecoveryState();} finally {// TODO: Enahnce with mechanism that remembers// information about recovery files to avoid re-reading.//移除冗余的队列文件 (已经消费完成的)
      removeExcessRecoveryState();}return getList();}

在填充List<CheckpointAndChange> checkpointAndChangeList容器后,将其中的数据以json格式持久化到队列文件

/** * 持久化json队列* @throws IOException*/private void writeRecoveryState() throws IOException {// TODO(pjo): Move this method into RecoveryFile.File recoveryFile = new RecoveryFile(persistDir);FileOutputStream outStream = new FileOutputStream(recoveryFile);Writer writer = new OutputStreamWriter(outStream, Charsets.UTF_8);try {try {writeJson(writer);} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed writing recovery file.", e);}writer.flush();outStream.getFD().sync();} finally {writer.close();}}

队列文件命名包含了当前系统时间,用于比较文件创建的早晚

/** * 可用于比较时间的队列文件* A File that has some of the recovery logic. *  Original recovery files' names contained a single nanosecond timestamp,*  eg.  recovery.10220010065599398 .  These turned out to be flawed*  because nanosecond times can go "back in time" between JVM restarts.*  Updated recovery files' names contain a wall clock millis timestamp *  followed by an underscore followed by a nanotimestamp, eg.*  recovery.702522216012_10220010065599398 .*/static class RecoveryFile extends File {final static long NO_TIME_AVAIL = -1;long milliTimestamp = NO_TIME_AVAIL;long nanoTimestamp;long parseTime(String s) throws IOException {try {return Long.parseLong(s);} catch(NumberFormatException e) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}}/*** 解析文件名称中包含的时间* @throws IOException*/void parseOutTimes() throws IOException {try {String basename = getName();if (!basename.startsWith(RECOVERY_FILE_PREFIX)) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());} else {String extension = basename.substring(RECOVERY_FILE_PREFIX.length());if (!extension.contains("_")) {  // Original name format.nanoTimestamp = parseTime(extension);} else {  // Updated name format.String timeParts[] = extension.split("_");if (2 != timeParts.length) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}milliTimestamp = parseTime(timeParts[0]);nanoTimestamp = parseTime(timeParts[1]);}}} catch(IndexOutOfBoundsException e) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}}RecoveryFile(File persistanceDir) throws IOException {super(persistanceDir, RECOVERY_FILE_PREFIX + System.currentTimeMillis()+ "_" + System.nanoTime());parseOutTimes();}/*** 该构造函数用于先获得文件绝对路径* @param absolutePath* @throws IOException*/RecoveryFile(String absolutePath) throws IOException {super(absolutePath);parseOutTimes();}boolean isOlder(RecoveryFile other) {boolean weHaveMillis = milliTimestamp != NO_TIME_AVAIL;boolean otherHasMillis = other.milliTimestamp != NO_TIME_AVAIL;boolean bothHaveMillis = weHaveMillis && otherHasMillis;boolean neitherHasMillis = (!weHaveMillis) && (!otherHasMillis);if (bothHaveMillis) {if (this.milliTimestamp < other.milliTimestamp) {return true;} else if (this.milliTimestamp > other.milliTimestamp) {return false;} else {return this.nanoTimestamp < other.nanoTimestamp;}} else if (neitherHasMillis) {return this.nanoTimestamp < other.nanoTimestamp;} else if (weHaveMillis) {  // and other doesn't; we are newer.return false;} else {  // other has millis; other is newer.return true;}}/** A delete method that logs failures. *//*** 删除文件*/public void logOnFailDelete() {boolean deleted = super.delete();if (!deleted) {LOG.severe("Failed to delete: " + getAbsolutePath());}}// TODO(pjo): Move more recovery logic into this class.}

下面来看在其启动方法(start方法)都做了什么

 /*** Initialize to start processing from after the passed in checkpoint* or from the beginning if the passed in checkpoint is null.  Part of* making DocumentSnapshotRepositoryMonitorManager go from "cold" to "warm".*/public synchronized void start(String checkpointString) throws IOException {LOG.info("Starting CheckpointAndChangeQueue from " + checkpointString);//创建队列目录
    ensurePersistDirExists();checkpointAndChangeList.clear();lastCheckpoint = constructLastCheckpoint(checkpointString);if (null == checkpointString) {//删除队列文件
      removeAllRecoveryState();} else {RecoveryFile current = removeExcessRecoveryState();//加载monitorPoints和checkpointAndChangeList队列
      loadUpFromRecoveryState(current);//this.monitorPoints.points.entrySet();
      }}

无非从原先保存的队列文件中加载CheckPointAndChange对象列表到List<CheckpointAndChange> checkpointAndChangeList容器中(另外还包括MonitorCheckoint对象)

/*** 加载队列* @param file* @throws IOException*/private void loadUpFromRecoveryState(RecoveryFile file) throws IOException {// TODO(pjo): Move this method into RecoveryFile.new LoadingQueueReader().readJson(file);}

在CheckpointAndChangeQueue类中定义了内部类,即用于从json格式文件加载CheckPointAndChange对象列表到List<CheckpointAndChange> checkpointAndChangeList容器

抽象队列读取抽象类AbstractQueueReader

/*** 从json文件加载队列抽象类* Reads JSON recovery files. Uses the Template Method pattern to* delegate what to do with the parsed objects to subclasses.** Note: This class uses gson for streaming support.*/private abstract class AbstractQueueReader {public void readJson(File file) throws IOException {readJson(new BufferedReader(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8)));}/*** Reads and parses the stream, calling the abstract methods to* take whatever action is required. The given stream will be* closed automatically.** @param reader the stream to parse*/@VisibleForTestingvoid readJson(Reader reader) throws IOException {JsonReader jsonReader = new JsonReader(reader);try {readJson(jsonReader);} finally {jsonReader.close();}}/*** Reads and parses the stream, calling the abstract methods to* take whatever action is required.*/private void readJson(JsonReader reader) throws IOException {JsonParser parser = new JsonParser();reader.beginObject();while (reader.hasNext()) {String name = reader.nextName();if (name.equals(MONITOR_STATE_JSON_TAG)) {readMonitorPoints(parser.parse(reader));} else if (name.equals(QUEUE_JSON_TAG)) {reader.beginArray();while (reader.hasNext()) {readCheckpointAndChange(parser.parse(reader));}reader.endArray();} else {throw new IOException("Read invalid recovery file.");}}reader.endObject();reader.setLenient(true);String name = reader.nextString();if (!name.equals(SENTINAL)) {throw new IOException("Read invalid recovery file.");}}protected abstract void readMonitorPoints(JsonElement gson)throws IOException;protected abstract void readCheckpointAndChange(JsonElement gson)throws IOException;}

抽象方法由子类实现

/*** 检测队列文件的有效性* Verifies that a JSON recovery file is valid JSON with a* trailing sentinel.*/private class ValidatingQueueReader extends AbstractQueueReader {protected void readMonitorPoints(JsonElement gson) throws IOException {}protected void readCheckpointAndChange(JsonElement gson)throws IOException {}}/*** 从json文件加载队列实现类*//** Loads the queue from a JSON recovery file. *//** TODO(jlacey): Change everything downstream to gson. For now, we* reserialize the individual gson objects and deserialize them* using org.json.*/@VisibleForTestingclass LoadingQueueReader extends AbstractQueueReader {/*** 加载MonitorRestartState checkpoint(HashMap<String, MonitorCheckpoint> points)*/protected void readMonitorPoints(JsonElement gson) throws IOException {try {JSONObject json = gsonToJson(gson);monitorPoints = new MonitorRestartState(json);//monitorPoints.updateOnGuaranteed(checkpointAndChangeList)} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed reading persisted JSON queue.", e);}}/*** 加载checkpointAndChangeList*/protected void readCheckpointAndChange(JsonElement gson)throws IOException {try {JSONObject json = gsonToJson(gson);checkpointAndChangeList.add(new CheckpointAndChange(json,internalDocumentHandleFactory, clientDocumentHandleFactory));} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed reading persisted JSON queue.", e);}}// TODO(jlacey): This could be much more efficient, especially// with LOBs, if we directly transformed the objects with a little// recursive parser. This code is only used when recovering failed// batches, so I don't know if that's worth the effort.private JSONObject gsonToJson(JsonElement gson) throws JSONException {return new JSONObject(gson.toString());}}

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789560.html

转载于:https://www.cnblogs.com/chenying99/p/3789560.html

企业搜索引擎开发之连接器connector(二十七)相关推荐

  1. 企业搜索引擎开发之连接器connector(二十九)

    在哪里调用监控器管理对象snapshotRepositoryMonitorManager的start方法及stop方法,然后又在哪里调用CheckpointAndChangeQueue对象的resum ...

  2. Android开发笔记(一百二十七)活用提示窗Toast和Snackbar

    提示窗Toast 大家平时都经常用Toast,可是你是否发现,系统默认的Toast样式太过单调乏味呢?其实Toast的界面也允许开发者自行定制,只要定义好提示窗的布局文件,即可调用Toast类的set ...

  3. 【Visual C++】游戏开发笔记二十七 Direct3D 11入门级知识介绍

    游戏开发笔记二十七 Direct3D 11入门级知识介绍 作者:毛星云    邮箱: happylifemxy@163.com    期待着与志同道合的朋友们相互交流 上一节里我们介绍了在迈入Dire ...

  4. 读后感:沙场秋点兵---走出软件作坊:三五个人十来条枪 如何成为开发正规军(二十七)

    沙场秋点兵---走出软件作坊:三五个人十来条枪 如何成为开发正规军(二十七) http://blog.csdn.net/david_lv/archive/2008/07/02/2604704.aspx ...

  5. 企业微信开发:获取 access_token(二)

    前言   简单的用白话了解一下企业微信的作用,企业微信是腾讯微信团队为企业打造的专业办公管理工具.大致和钉钉差不多,适用于政府.企业等各类组织的一个产品,可以有效的帮您管理员工.个人感觉企业微信开发要 ...

  6. TRS电信114企业搜索引擎解决方案

    服务电信行业 打造生活搜索 TRS电信114企业搜索引擎解决方案 TRS公司结合电信企业建设114搜索引擎的需求,应用多年来自主开发的TRS Database Server作为企业搜索引擎服务的平台, ...

  7. 企业项目开发--分布式缓存memcached(3)

    此文已由作者赵计刚授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 3.3.ssmm0-data 结构: 3.3.1.pom.xml 1 <?xml version=&q ...

  8. OpenCV学习笔记(二十六)——小试SVM算法ml OpenCV学习笔记(二十七)——基于级联分类器的目标检测objdect OpenCV学习笔记(二十八)——光流法对运动目标跟踪Video Ope

    OpenCV学习笔记(二十六)--小试SVM算法ml 总感觉自己停留在码农的初级阶段,要想更上一层,就得静下心来,好好研究一下算法的东西.OpenCV作为一个计算机视觉的开源库,肯定不会只停留在数字图 ...

  9. SAP UI5 应用开发教程之六十二 - 基于 OData V4 的 SAP UI5 表格控件使用方法介绍试读版

    一套适合 SAP UI5 初学者循序渐进的学习教程 教程目录 SAP UI5 本地开发环境的搭建 SAP UI5 应用开发教程之一:Hello World SAP UI5 应用开发教程之二:SAP U ...

最新文章

  1. apache2.4中layout模块和ssi模块的冲突
  2. vue中引入第三方字体图标库iconfont,及iconfont引入彩色图标
  3. 8步教你打开Android之门 NDK入门教程
  4. python画正方体_python绘制立方体的方法
  5. java基础之多线程笔记
  6. RHEL 7.5 部署 OpenStack Queens 踩坑记
  7. protel 99se 负片打印
  8. 薛定谔Maestro--LigPrep 准备配体
  9. 浅析淘宝数据魔方技术架构
  10. ftp服务器怎么删文件夹,删除ftp服务器文件夹
  11. UE4 C++与UMG
  12. 除了秃顶,你和程序员大佬还有啥区别?
  13. 进程监控工具 Procmon有Linux版本了
  14. 小米盒子 改装 无线打印服务器,教你给小米盒子换个超级简洁的“猫友桌面”...
  15. 从“增信”到“征信”:互联网金融时代的中国个人征信体系发展
  16. 分布式计算、云计算与大数据第四章
  17. GooglePlay 发布问题汇总
  18. ElGamal算法实现
  19. FFMPEG AV sync
  20. PMT有两种基本模式――单光子计数和电流感应选择

热门文章

  1. 自己实现一个最简单的数据库
  2. Google I/O大会,炫酷产品汇总
  3. 卷积神经网络模型如何辨识裸体图片
  4. 【OpenGL】用OpenGL shader实现将YUV(YUV420,YV12)转RGB-(直接调用GPU实现,纯硬件方式,效率高)...
  5. 遇到虚拟系统内存问题怎么办?Hypervisor帮你除bug
  6. JavaScript总结(一)
  7. Android Weekly Notes Issue #226
  8. CRLF line terminators导致shell脚本报错:command not found
  9. Nginx服务器编译添加SSL模块
  10. Spring【依赖注入】就是这么简单