cassandra
两种方式:

Cassandra-ArchitectureCommitLog

Cassandra持久化-Durability

一种是配置commitlog_sync为periodic,定期模式;另外一种是batch,

默认(Cassandra1.2.19/3.0.0)为periodic,定期10000ms

#commitlog_sync: batch
#commitlog_sync_batch_window_in_ms: 50
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000

这里如果是periodic模式潜在丢数据的风险,来看看两种实现方式,大致调用顺序

StorageProxy. ->WritePerformer.apply()->counterWriteTask()/sendToHintedEndpoints()->((CounterMutation/mutation).apply()->Mutation.apply()->Keyspace.apply()->CommitLog.instance.add(mutation),主要看CommitLog.instance.add(mutation)

CommitLog.instance.add(mutation)

public ReplayPosition add(Mutation mutation)
{assert mutation != null;long size = Mutation.serializer.serializedSize(mutation, MessagingService.current_version);long totalSize = size + ENTRY_OVERHEAD_SIZE;if (totalSize > MAX_MUTATION_SIZE){throw new IllegalArgumentException(String.format("Mutation of %s bytes is too large for the maxiumum size of %s",totalSize, MAX_MUTATION_SIZE));}Allocation alloc = allocator.allocate(mutation, (int) totalSize);try{ICRC32 checksum = CRC32Factory.instance.create();final ByteBuffer buffer = alloc.getBuffer();BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer);// checksummed lengthdos.writeInt((int) size);checksum.update(buffer, buffer.position() - 4, 4);buffer.putInt(checksum.getCrc());int start = buffer.position();// checksummed mutationMutation.serializer.serialize(mutation, dos, MessagingService.current_version);checksum.update(buffer, start, (int) size);buffer.putInt(checksum.getCrc());}catch (IOException e){throw new FSWriteError(e, alloc.getSegment().getPath());}finally{alloc.markWritten();}executor.finishWriteFor(alloc);return alloc.getReplayPosition();}

这里主要写buffer,没有刷盘,这时会有两种方式,就是之前说的periodic与batch,主要看 executor.finishWriteFor(alloc),起里边调用了maybeWaitForSync(),是一个抽像的,在BatchCommitLogService与PeriodicCommitLogService中实现

public void finishWriteFor(Allocation alloc)
{maybeWaitForSync(alloc);written.incrementAndGet();
}
protected abstract void maybeWaitForSync(Allocation alloc);

BatchCommitLogService中实现

protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
{// wait until record has been safely persisted to diskpending.incrementAndGet();alloc.awaitDiskSync(commitLog.metrics.waitingOnCommit);pending.decrementAndGet();
}
void waitForSync(int position, Timer waitingOnCommit)
{while (lastSyncedOffset < position){WaitQueue.Signal signal = waitingOnCommit != null ?syncComplete.register(waitingOnCommit.time()) :syncComplete.register();if (lastSyncedOffset < position)signal.awaitUninterruptibly();elsesignal.cancel();}
}

这里面如果lastSyncedOffset < position是会一直等待的,知道lastSyncedOffset>=position,即当前alloc对应的buffer已被flush

PeriodicCommitLogService中实现,这里的关键是waitForSyncToCatchUp()

protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
{if (waitForSyncToCatchUp(Long.MAX_VALUE)){// wait until periodic sync() catches up with its schedulelong started = System.currentTimeMillis();pending.incrementAndGet();while (waitForSyncToCatchUp(started)){WaitQueue.Signal signal = syncComplete.register(commitLog.metrics.waitingOnCommit.time());if (waitForSyncToCatchUp(started))signal.awaitUninterruptibly();elsesignal.cancel();}pending.decrementAndGet();}
}

waitForSyncToCatchUp()

private boolean waitForSyncToCatchUp(long started)
{return started > lastSyncedAt + blockWhenSyncLagsMillis;
}

这里的blockWhenSyncLagsMillis是1.5倍的commitlog_sync_period_in_ms

blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5);

为什么是1.5倍呢,我的理解是假设flush刷盘的时间是0.5个commitlog_sync_period,但是这个其实是不一定的,可能大于0.5,可能小于0.5,这里就潜在数据丢失了,假设这个确实flush一次不止0.5个commitlog_sync_period,那写完的数据其实是不确定一定刷盘了的。
具体的flush代码,位于AbstractCommitLogService中的start()方法中

long syncStarted = System.currentTimeMillis();
commitLog.sync(shutdown);
lastSyncedAt = syncStarted;
syncComplete.signalAll();

commitLog.sync()->segment.sync()->write(startMarker, sectionEnd),write在CompressedSegment与MemoryMappedSegment实现,最终都是调用的channel.force()

转载于:https://www.cnblogs.com/donganwangshi/p/4530841.html

cassandra写数据CommitLog相关推荐

  1. Cassandra 的数据存储结构——本质是SortedMapRowKey, SortedMapColumnKey, ColumnValue

    Cassandra 的数据存储结构 Cassandra 的数据模型是基于列族(Column Family)的四维或五维模型.它借鉴了 Amazon 的 Dynamo 和 Google's BigTab ...

  2. cassandra读写数据

    cassandra读写数据 写 写一致性级别 cassandra的可调一致性级别意味着你可以在查询中指定所需的写操作一致性,一致性级别越高,说明需要更多的副本节点相应才能认为写操作完成,更高的一致性级 ...

  3. python 往excel 里面写数据

    使用的python 的版本为3x 往excel 表格里面写数据使用的是xlwt, 如果电脑上没有 可以使用pip install xlwt 下载一个 步骤如下 1 创建工作薄  xls = xlwt. ...

  4. 写数据到文件注意事项write方法

    [问题1] 使用FileOutputStream类写数据到文件中,本来是很简单的实现,但就是生成的文件与原文件大小不一样,排查了一整天,才找到问题原因所在, writer = new FileOutp ...

  5. 关于MATLAB中xlswrite函数写数据出现服务器异常情况的解决办法

    关于MATLAB中xlswrite函数写数据出现服务器异常情况的解决办法 参考文章: (1)关于MATLAB中xlswrite函数写数据出现服务器异常情况的解决办法 (2)https://www.cn ...

  6. 配置 influxDB 鉴权及 HTTP API 写数据的方法

    本文简要描述如何为 InfluxDB 开启鉴权和配置用户管理权限(安装后默认不需要登录),以及开启鉴权后如何使用 HTTP API 写数据. 创建 InfluxDB 管理员账号 创建 admin 帐号 ...

  7. 往hdfs写数据无权限

    调用jar包把风电数据往hdfs写数据无权限问题 错误信息:hadoop.security.AccessControlException:Permission denied:user=gcl,acce ...

  8. Netty源码分析第7章(编码器和写数据)----第2节: MessageToByteEncoder

    Netty源码分析第7章(编码器和写数据)---->第2节: MessageToByteEncoder Netty源码分析第七章: Netty源码分析 第二节: MessageToByteEnc ...

  9. Java17-day08【File(创建和删除文件、判断和获取功能、遍历目录)、IO流(字节流写数据、异常处理、字节流读数据、复制文本文件、复制图片)】

    视频+资料(工程源码.笔记)[链接:https://pan.baidu.com/s/1MdFNUADVSFf-lVw3SJRvtg   提取码:zjxs] Java基础--学习笔记(零起点打开java ...

最新文章

  1. 只在必要时保存服务器控件视图状态
  2. BIND日志相关(二)
  3. 近亿台物联网设备或遭劫持,这家IoT云平台遭遇“灾难性”入侵事件
  4. Openstack在controller节点 nova image-list HTTP500
  5. 转 MySQL问题排查工具介绍
  6. 云上建站快速入门:博客、论坛、CMS、电子商务网站统统搞定
  7. Keras-训练可视化
  8. linux查看帮助信息,命令帮助信息的获取
  9. IBatis.net动态SQL语句(六)
  10. jsf入门实例_JSF错误消息示例教程
  11. Java面试题:热情盛夏,分享Java大厂面试百题
  12. SQL递归查询知多少
  13. windows完全卸载MySql数据库
  14. 数据库系统及应用——班级管理系统
  15. deepin V20.2版本安装MySQL
  16. mac sublime中文乱码问题解决
  17. 华为核心交换机绑定IP+MAC+端口案例
  18. deepin firewall
  19. LeetCode之Shortest Unsorted Continous Subarray
  20. 【对象存储】关于阿里云OSS踩坑记录

热门文章

  1. IOS开发 ios7适配
  2. 在C# 中 如何限制在文本框(textBox)中输入的类型为正整数
  3. Linux---管道通信的使用
  4. TypeError:Joi.validate is not a function 解决办法
  5. Curie Module是什么
  6. 为了有一些储蓄,把自己压榨到最狠的时候是怎样的?
  7. 2020年电商上市公司市值梯队
  8. 创业者如何利用数字经济实现企业升级?
  9. 今天看到一个热搜,说一个美团会员配送费6元,普通用户2元,导致轩然大波
  10. 今天分享一个做自媒体的方法论