在上一篇中介绍了连接Zookeeper集群的方法,这一篇将围绕一个有趣的话题---来展开,这就是Replication(索引复制),关于Solr Replication的详细介绍,可以参考http://wiki.apache.org/solr/SolrReplication。

在开始这个话题之前,先从我最近在应用中引入solr的master/slave架构时,遇到的一个让我困扰的实际问题。

应用场景简单描述如下:

1)首先master节点下载索引分片,然后创建配置文件,加入master节点的replication配置片段,再对索引分片进行合并(关于mergeIndex,可以参考http://wiki.apache.org/solr/MergingSolrIndexes),然后利用上述配置文件和索引数据去创建一个solr核。

2)slave节点创建配置文件,加入slave节点的replication配置片段,创建一个空的solr核,等待从master节点进行索引数据同步

出现的问题:slave节点没有从master节点同步到数据。

问题分析:

1)首先检查master节点,获取最新的可复制索引的版本号,

http://master_host:port/solr/replication?command=indexversion

发现返回的索引版本号是0,这说明mater节点根本没有触发replication动作,

2)为了确认上述判断,在slave节点上进一步查看replication的详细信息

http://slave_host:port/solr/replication?command=details

发现确实如此,尽管master节点的索引版本号和slave节点的索引版本号不一致,但索引却没有同步过来,再分别查看master节点和slave节点的日志,发现索引复制动作确实没有开始。

综上所述,确实是master节点没有触发索引复制动作,那究竟是为何呢?先将原因摆出来,后面会通过源码的分析来加以说明。

原因:solr合并索引时,不管你是通过mergeindexes的http命令,还是调用底层lucene的IndexWriter,记得最后一定要提交一个commit,否则,不仅索引不仅不会对查询可见,更是对于master/slave架构的solr集群来说,master节点的replication动作不会触发,因为indexversion没有感知到变化。

好了,下面开始对Solr的Replication的分析。

Solr容器在加载solr核的时候,会对已经注册的各个实现SolrCoreAware接口的Handler进行回调,调用其inform方法。

对于ReplicationHandler来说,就是在这里对自己是属于master节点还是slave节点进行判断,若是slave节点,则创建一个SnapPuller对象,定时负责从master节点主动拉索引数据下来;若是master节点,则只设置相应的参数。

public void inform(SolrCore core) {
    this.core = core;
    registerFileStreamResponseWriter();
    registerCloseHook();
    NamedList slave = (NamedList) initArgs.get("slave");
    boolean enableSlave = isEnabled( slave );
    if (enableSlave) {
      tempSnapPuller = snapPuller = new SnapPuller(slave, this, core);
      isSlave = true;
    }
    NamedList master = (NamedList) initArgs.get("master");
    boolean enableMaster = isEnabled( master );
    
    if (!enableSlave && !enableMaster) {
      enableMaster = true;
      master = new NamedList<Object>();
    }
    
    if (enableMaster) {
      includeConfFiles = (String) master.get(CONF_FILES);
      if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {
        List<String> files = Arrays.asList(includeConfFiles.split(","));
        for (String file : files) {
          if (file.trim().length() == 0) continue;
          String[] strs = file.split(":");
          // if there is an alias add it or it is null
          confFileNameAlias.add(strs[0], strs.length > 1 ? strs[1] : null);
        }
        LOG.info("Replication enabled for following config files: " + includeConfFiles);
      }
      List backup = master.getAll("backupAfter");
      boolean backupOnCommit = backup.contains("commit");
      boolean backupOnOptimize = !backupOnCommit && backup.contains("optimize");
      List replicateAfter = master.getAll(REPLICATE_AFTER);
      replicateOnCommit = replicateAfter.contains("commit");
      replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");

if (!replicateOnCommit && ! replicateOnOptimize) {
        replicateOnCommit = true;
      }
      
      // if we only want to replicate on optimize, we need the deletion policy to
      // save the last optimized commit point.
      if (replicateOnOptimize) {
        IndexDeletionPolicyWrapper wrapper = core.getDeletionPolicy();
        IndexDeletionPolicy policy = wrapper == null ? null : wrapper.getWrappedDeletionPolicy();
        if (policy instanceof SolrDeletionPolicy) {
          SolrDeletionPolicy solrPolicy = (SolrDeletionPolicy)policy;
          if (solrPolicy.getMaxOptimizedCommitsToKeep() < 1) {
            solrPolicy.setMaxOptimizedCommitsToKeep(1);
          }
        } else {
          LOG.warn("Replication can't call setMaxOptimizedCommitsToKeep on " + policy);
        }
      }

if (replicateOnOptimize || backupOnOptimize) {
        core.getUpdateHandler().registerOptimizeCallback(getEventListener(backupOnOptimize, replicateOnOptimize));
      }
      if (replicateOnCommit || backupOnCommit) {
        replicateOnCommit = true;
        core.getUpdateHandler().registerCommitCallback(getEventListener(backupOnCommit, replicateOnCommit));
      }
      if (replicateAfter.contains("startup")) {
        replicateOnStart = true;
        RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);
        try {
          DirectoryReader reader = s==null ? null : s.get().getIndexReader();
          if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) {
            try {
              if(replicateOnOptimize){
                Collection<IndexCommit> commits = DirectoryReader.listCommits(reader.directory());
                for (IndexCommit ic : commits) {
                  if(ic.getSegmentCount() == 1){
                    if(indexCommitPoint == null || indexCommitPoint.getGeneration() < ic.getGeneration()) indexCommitPoint = ic;
                  }
                }
              } else{
                indexCommitPoint = reader.getIndexCommit();
              }
            } finally {
              // We don't need to save commit points for replication, the SolrDeletionPolicy
              // always saves the last commit point (and the last optimized commit point, if needed)
              /***
              if(indexCommitPoint != null){
                core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());
              }
              ***/
            }
          }

// reboot the writer on the new index
          core.getUpdateHandler().newIndexWriter();

} catch (IOException e) {
          LOG.warn("Unable to get IndexCommit on startup", e);
        } finally {
          if (s!=null) s.decref();
        }
      }
      String reserve = (String) master.get(RESERVE);
      if (reserve != null && !reserve.trim().equals("")) {
        reserveCommitDuration = SnapPuller.readInterval(reserve);
      }
      LOG.info("Commits will be reserved for  " + reserveCommitDuration);
      isMaster = true;
    }

}

ReplicationHandler可以响应多种命令:

1)       indexversion。

这里需要了解的第一个概念是索引提交点(IndexCommit),这是底层lucene的东西,可以自行查阅资料。首先获取最新的索引提交点,然后从其中获取索引版本号和索引所属代。

IndexCommit commitPoint = indexCommitPoint;  // make a copy so it won't change
      if (commitPoint != null && replicationEnabled.get()) {
        core.getDeletionPolicy().setReserveDuration(commitPoint.getVersion(), reserveCommitDuration);
        rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());

rsp.add(GENERATION, commitPoint.getGeneration());

2)backup。这个命令用来对索引做快照。首先获取最新的索引提交点,然后创建做一个SnapShooter,具体的快照动作由这个对象完成,

   private void doSnapShoot(SolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) { 

try {
      int numberToKeep = params.getInt(NUMBER_BACKUPS_TO_KEEP, Integer.MAX_VALUE);
      IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
      IndexCommit indexCommit = delPolicy.getLatestCommit();
      
      if(indexCommit == null) {
        indexCommit = req.getSearcher().getReader().getIndexCommit();
      }
      
      // small race here before the commit point is saved
      new SnapShooter(core, params.get("location")).createSnapAsync(indexCommit, numberToKeep, this);
      
    } catch (Exception e) {
      LOG.warn("Exception during creating a snapshot", e);
      rsp.add("exception", e);
    }
  }

快照对象会启动一个线程去异步地做一个索引备份。

void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, final ReplicationHandler replicationHandler) {

replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getVersion());

new Thread() {

@Override

public void run() {

createSnapshot(indexCommit, numberToKeep, replicationHandler);

}

}.start();

}

void createSnapshot(final IndexCommit indexCommit, int numberToKeep, ReplicationHandler replicationHandler) {

NamedList details = new NamedList();

details.add("startTime", new Date().toString());

File snapShotDir = null;

String directoryName = null;

Lock lock = null;

try {

if(numberToKeep<Integer.MAX_VALUE) {

deleteOldBackups(numberToKeep);

}

SimpleDateFormat fmt = new SimpleDateFormat(DATE_FMT, Locale.US);

directoryName = "snapshot." + fmt.format(new Date());

lock = lockFactory.makeLock(directoryName + ".lock");

if (lock.isLocked()) return;

snapShotDir = new File(snapDir, directoryName);

if (!snapShotDir.mkdir()) {

LOG.warn("Unable to create snapshot directory: " + snapShotDir.getAbsolutePath());

return;

}

Collection<String> files = indexCommit.getFileNames();

FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(), indexCommit);

fileCopier.copyFiles(files, snapShotDir);

details.add("fileCount", files.size());

details.add("status", "success");

details.add("snapshotCompletedAt", new Date().toString());

} catch (Exception e) {

SnapPuller.delTree(snapShotDir);

LOG.error("Exception while creating snapshot", e);

details.add("snapShootException", e.getMessage());

} finally {

replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getVersion());

replicationHandler.snapShootDetails = details;

if (lock != null) {

try {

lock.release();

} catch (IOException e) {

LOG.error("Unable to release snapshoot lock: " + directoryName + ".lock");

}

}

}

}

3)fetchindex。响应来自slave节点的取索引文件的请求,会启动一个线程来实现索引文件的获取。

String masterUrl = solrParams.get(MASTER_URL);

if (!isSlave && masterUrl == null) {

rsp.add(STATUS,ERR_STATUS);

rsp.add("message","No slave configured or no 'masterUrl' Specified");

return;

}

final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);

new Thread() {

@Override

public void run() {

doFetch(paramsCopy);

}

}.start();

rsp.add(STATUS, OK_STATUS);

具体的获取动作是通过SnapPuller对象来实现的,首先尝试获取pull对象锁,如果请求锁失败,则说明还有取索引数据动作未结束,如果请求锁成功,就调用SnapPuller对象的fetchLatestIndex方法来取最新的索引数据。

void doFetch(SolrParams solrParams) {

String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);

if (!snapPullLock.tryLock())

return;

try {

tempSnapPuller = snapPuller;

if (masterUrl != null) {

NamedList<Object> nl = solrParams.toNamedList();

nl.remove(SnapPuller.POLL_INTERVAL);

tempSnapPuller = new SnapPuller(nl, this, core);

}

tempSnapPuller.fetchLatestIndex(core);

} catch (Exception e) {

LOG.error("SnapPull failed ", e);

} finally {

tempSnapPuller = snapPuller;

snapPullLock.unlock();

}

}

最后真正的取索引数据过程,首先,若mastet节点的indexversion为0,则说明master节点根本没有提供可供复制的索引数据,若master节点和slave节点的indexversion相同,则说明slave节点目前与master节点索引数据状态保持一致,无需同步。若两者的indexversion不同,则开始索引复制过程,首先从master节点上下载指定索引版本号的索引文件列表,然后创建一个索引文件同步服务线程来完成同并工作。

这里需要区分的是,如果master节点的年代比slave节点要老,那就说明两者已经不相容,此时slave节点需要新建一个索引目录,再从master节点做一次全量索引复制。还需要注意的一点是,索引同步也是可以同步配置文件的,若配置文件发生变化,则需要对solr核进行一次reload操作。最对了,还有,和文章开头一样, slave节点同步完数据后,别忘了做一次commit操作,以便刷新自己的索引提交点到最新的状态。最后,关闭并等待同步服务线程结束。此外,具体的取索引文件是通过FileFetcher对象来完成。

boolean fetchLatestIndex(SolrCore core) throws IOException {

replicationStartTime = System.currentTimeMillis();

try {

//get the current 'replicateable' index version in the master

NamedList response = null;

try {

response = getLatestVersion();

} catch (Exception e) {

LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());

return false;

}

long latestVersion = (Long) response.get(CMD_INDEX_VERSION);

long latestGeneration = (Long) response.get(GENERATION);

if (latestVersion == 0L) {

//there is nothing to be replicated

return false;

}

IndexCommit commit;

RefCounted<SolrIndexSearcher> searcherRefCounted = null;

try {

searcherRefCounted = core.getNewestSearcher(false);

commit = searcherRefCounted.get().getReader().getIndexCommit();

} finally {

if (searcherRefCounted != null)

searcherRefCounted.decref();

}

if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {

//master and slave are alsready in sync just return

LOG.info("Slave in sync with master.");

return false;

}

LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);

LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());

LOG.info("Starting replication process");

// get the list of files first

fetchFileList(latestVersion);

// this can happen if the commit point is deleted before we fetch the file list.

if(filesToDownload.isEmpty()) return false;

LOG.info("Number of files in latest index in master: " + filesToDownload.size());

// Create the sync service

fsyncService = Executors.newSingleThreadExecutor();

// use a synchronized list because the list is read by other threads (to show details)

filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());

// if the generateion of master is older than that of the slave , it means they are not compatible to be copied

// then a new index direcory to be created and all the files need to be copied

boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;

File tmpIndexDir = createTempindexDir(core);

if (isIndexStale())

isFullCopyNeeded = true;

successfulInstall = false;

boolean deleteTmpIdxDir = true;

File indexDir = null ;

try {

indexDir = new File(core.getIndexDir());

downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestVersion);

LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");

Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);

if (!modifiedConfFiles.isEmpty()) {

downloadConfFiles(confFilesToDownload, latestVersion);

if (isFullCopyNeeded) {

successfulInstall = modifyIndexProps(tmpIndexDir.getName());

deleteTmpIdxDir = false;

} else {

successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);

}

if (successfulInstall) {

LOG.info("Configuration files are modified, core will be reloaded");

logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files.

reloadCore();

}

} else {

terminateAndWaitFsyncService();

if (isFullCopyNeeded) {

successfulInstall = modifyIndexProps(tmpIndexDir.getName());

deleteTmpIdxDir = false;

} else {

successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);

}

if (successfulInstall) {

logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);

doCommit();

}

}

replicationStartTime = 0;

return successfulInstall;

} catch (ReplicationHandlerException e) {

LOG.error("User aborted Replication");

} catch (SolrException e) {

throw e;

} catch (Exception e) {

throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);

} finally {

if (deleteTmpIdxDir) delTree(tmpIndexDir);

else delTree(indexDir);

}

return successfulInstall;

} finally {

if (!successfulInstall) {

logReplicationTimeAndConfFiles(null, successfulInstall);

}

filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;

replicationStartTime = 0;

fileFetcher = null;

if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdownNow();

fsyncService = null;

stop = false;

fsyncException = null;

}

}

深入剖析SolrCloud(四)相关推荐

  1. 深入剖析SolrCloud(一)

    深入剖析SolrCloud(一) SolrCloud是基于Solr和Zookeeper的分布式搜索方案,是正在开发中的Solr4.0的核心组件之一,它的主要思想是使用Zookeeper作为集群的配置信 ...

  2. 查询优化器内核剖析第四篇:从一个实例看执行计划

    查询优化器内核剖析第四篇:从一个实例看执行计划 系列文章索引: 查询优化器内核剖析第一篇 查询优化器内核剖析第二篇:产生候选执行计划&执行计划成本估算 查询优化器内核剖析第三篇:查询的执行与计 ...

  3. GDAL源码剖析(四)之命令行程序说明二

    接博客GDAL源码剖析(四)之命令行程序说明一http://blog.csdn.net/liminlu0314/article/details/6978589 其中有个nearblack,gdalbu ...

  4. x264代码剖析(四):vs2010编译x264错误集锦

    x264代码剖析(四):vs2010编译x264错误集锦 支持VC++平台的x264的最新版本是x264-20091006,接下来就以该版本为例分析编译运行x264过程中遇到的问题以及解决办法. 1. ...

  5. 深入剖析SolrCloud(二)

    上一篇介绍了SolrCloud的基本概念,从这一篇开始我将深入到其实现代码中进行剖析. SolrCloud最重要的一点就是引入了ZooKeeper来统一管理各种配置和状态信息.zookeeper是一个 ...

  6. ijkplayer播放器剖析(四)音频解码与音频输出机制分析

    ijkplayer播放器剖析系列文章: ijkplayer播放器剖析(一)从应用层分析至Jni层的流程分析 ijkplayer播放器剖析(二)消息机制分析 ijkplayer播放器剖析(三)音频解码与 ...

  7. GDAL源码剖析(四)之命令行程序说明一

    一.GDAL工具通用命令 下面的工具主要参考的GDAL官方网站中提供的帮助文档说明,此外还有我的一些经验,GDAL官方具体地址为:http://gdal.org/gdal_utilities.html ...

  8. GTest源码剖析(四)——TEST_P宏

    GTest源码剖析--TEST_P宏 GTest源码剖析TEST_P宏 TEST_P宏用法 TestWithParam 类 1 TestWithParam 类定义 2 WithParamInterfa ...

  9. 一起分析Linux系统设计思想——05字符设备驱动框架剖析(四)

    在学习资料满天飞的大环境下,知识变得非常零散,体系化的知识并不多,这就导致很多人每天都努力学习到感动自己,最终却收效甚微,甚至放弃学习.我的使命就是过滤掉大量的垃圾信息,将知识体系化,以短平快的方式直 ...

最新文章

  1. 【SRIO】3、RapidIO串行物理层的包传输过程
  2. QT的QLabel类的使用
  3. java mac sh_如何创建AppleScript或Command文件以在Mac OS上启动Java应用程序?
  4. 【若依(ruoyi)】表格图片预览功能图片超宽、超高问题
  5. 儿童节礼包!10 后都开始学编程了,你需要这个阿里技术电子书大全
  6. SQL Server创建索引(转)
  7. 决策树 算法原理及代码
  8. 一个智能运维算法测试方法
  9. c语言正确理解以下名词及其含义,C程序作业答案.doc
  10. ubuntu安装c/c++编译环境
  11. nginx配置二级目录,反向代理不同ip+端口
  12. python-字典方法(dict)知识整理
  13. 关键路径转化率分析——漏斗模型
  14. MobaXterm 中文乱码问题
  15. python爬取音乐源码_Python爬虫教程,爬取网易云的音乐
  16. 华为Java社招面试经历详解【已拿到offer】
  17. c语言字符动画源码下载,C语言程序实例大全(220个精彩源码下载)
  18. 为什么打工人996会猝死,而老板007不会?
  19. L13过拟合欠拟合及其解决方案
  20. 【洛谷 P3191】 [HNOI2007]紧急疏散EVACUATE(二分答案,最大流)

热门文章

  1. linux下 添加一个新账户tom,linux 账户管理命令 useradd、groupadd使用方法
  2. linux rec命令_文件过多时ls命令为什么会卡住?
  3. matlab虚拟现实之使用V-Realm Builder2建模
  4. 计算机专业的英语文献,计算机专业英语论文参考文献集 计算机专业英语英语参考文献哪里找...
  5. linux svn pacman,【图片】manjaro Linux 中的etc pacman.conf设置错误了,谁发个默认的我看看,谢谢大佬们了_manjaro吧_百度贴吧...
  6. 怎样对php使用systemctl启动,Centos7 配置php-fpm服务到systemctl
  7. sap产品图谱 - road to sap.pdf_蛇胆陈皮胶囊化学成分及指纹图谱研究
  8. 计算机视觉(二)——深度学习进阶
  9. 研0必读!李航《统计学习方法》啃书指南
  10. 货物贸易外汇监测系统 企业版_企业能耗在线监测系统介绍