cassandra写数据CommitLog
cassandra
两种方式:
Cassandra-ArchitectureCommitLog
Cassandra持久化-Durability
一种是配置commitlog_sync为periodic,定期模式;另外一种是batch,
默认(Cassandra1.2.19/3.0.0)为periodic,定期10000ms
#commitlog_sync: batch
#commitlog_sync_batch_window_in_ms: 50
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
这里如果是periodic模式潜在丢数据的风险,来看看两种实现方式,大致调用顺序
StorageProxy. ->WritePerformer.apply()->counterWriteTask()/sendToHintedEndpoints()->((CounterMutation/mutation).apply()->Mutation.apply()->Keyspace.apply()->CommitLog.instance.add(mutation),主要看CommitLog.instance.add(mutation)
CommitLog.instance.add(mutation)
public ReplayPosition add(Mutation mutation)
{assert mutation != null;long size = Mutation.serializer.serializedSize(mutation, MessagingService.current_version);long totalSize = size + ENTRY_OVERHEAD_SIZE;if (totalSize > MAX_MUTATION_SIZE){throw new IllegalArgumentException(String.format("Mutation of %s bytes is too large for the maxiumum size of %s",totalSize, MAX_MUTATION_SIZE));}Allocation alloc = allocator.allocate(mutation, (int) totalSize);try{ICRC32 checksum = CRC32Factory.instance.create();final ByteBuffer buffer = alloc.getBuffer();BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer);// checksummed lengthdos.writeInt((int) size);checksum.update(buffer, buffer.position() - 4, 4);buffer.putInt(checksum.getCrc());int start = buffer.position();// checksummed mutationMutation.serializer.serialize(mutation, dos, MessagingService.current_version);checksum.update(buffer, start, (int) size);buffer.putInt(checksum.getCrc());}catch (IOException e){throw new FSWriteError(e, alloc.getSegment().getPath());}finally{alloc.markWritten();}executor.finishWriteFor(alloc);return alloc.getReplayPosition();}
这里主要写buffer,没有刷盘,这时会有两种方式,就是之前说的periodic与batch,主要看 executor.finishWriteFor(alloc),起里边调用了maybeWaitForSync(),是一个抽像的,在BatchCommitLogService与PeriodicCommitLogService中实现
public void finishWriteFor(Allocation alloc)
{maybeWaitForSync(alloc);written.incrementAndGet();
}
protected abstract void maybeWaitForSync(Allocation alloc);
BatchCommitLogService中实现
protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
{// wait until record has been safely persisted to diskpending.incrementAndGet();alloc.awaitDiskSync(commitLog.metrics.waitingOnCommit);pending.decrementAndGet();
}
void waitForSync(int position, Timer waitingOnCommit)
{while (lastSyncedOffset < position){WaitQueue.Signal signal = waitingOnCommit != null ?syncComplete.register(waitingOnCommit.time()) :syncComplete.register();if (lastSyncedOffset < position)signal.awaitUninterruptibly();elsesignal.cancel();}
}
这里面如果lastSyncedOffset < position是会一直等待的,知道lastSyncedOffset>=position,即当前alloc对应的buffer已被flush
PeriodicCommitLogService中实现,这里的关键是waitForSyncToCatchUp()
protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
{if (waitForSyncToCatchUp(Long.MAX_VALUE)){// wait until periodic sync() catches up with its schedulelong started = System.currentTimeMillis();pending.incrementAndGet();while (waitForSyncToCatchUp(started)){WaitQueue.Signal signal = syncComplete.register(commitLog.metrics.waitingOnCommit.time());if (waitForSyncToCatchUp(started))signal.awaitUninterruptibly();elsesignal.cancel();}pending.decrementAndGet();}
}
waitForSyncToCatchUp()
private boolean waitForSyncToCatchUp(long started)
{return started > lastSyncedAt + blockWhenSyncLagsMillis;
}
这里的blockWhenSyncLagsMillis是1.5倍的commitlog_sync_period_in_ms
blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5);
为什么是1.5倍呢,我的理解是假设flush刷盘的时间是0.5个commitlog_sync_period,但是这个其实是不一定的,可能大于0.5,可能小于0.5,这里就潜在数据丢失了,假设这个确实flush一次不止0.5个commitlog_sync_period,那写完的数据其实是不确定一定刷盘了的。
具体的flush代码,位于AbstractCommitLogService中的start()方法中
long syncStarted = System.currentTimeMillis();
commitLog.sync(shutdown);
lastSyncedAt = syncStarted;
syncComplete.signalAll();
commitLog.sync()->segment.sync()->write(startMarker, sectionEnd),write在CompressedSegment与MemoryMappedSegment实现,最终都是调用的channel.force()
转载于:https://www.cnblogs.com/donganwangshi/p/4530841.html
cassandra写数据CommitLog相关推荐
- Cassandra 的数据存储结构——本质是SortedMapRowKey, SortedMapColumnKey, ColumnValue
Cassandra 的数据存储结构 Cassandra 的数据模型是基于列族(Column Family)的四维或五维模型.它借鉴了 Amazon 的 Dynamo 和 Google's BigTab ...
- cassandra读写数据
cassandra读写数据 写 写一致性级别 cassandra的可调一致性级别意味着你可以在查询中指定所需的写操作一致性,一致性级别越高,说明需要更多的副本节点相应才能认为写操作完成,更高的一致性级 ...
- python 往excel 里面写数据
使用的python 的版本为3x 往excel 表格里面写数据使用的是xlwt, 如果电脑上没有 可以使用pip install xlwt 下载一个 步骤如下 1 创建工作薄 xls = xlwt. ...
- 写数据到文件注意事项write方法
[问题1] 使用FileOutputStream类写数据到文件中,本来是很简单的实现,但就是生成的文件与原文件大小不一样,排查了一整天,才找到问题原因所在, writer = new FileOutp ...
- 关于MATLAB中xlswrite函数写数据出现服务器异常情况的解决办法
关于MATLAB中xlswrite函数写数据出现服务器异常情况的解决办法 参考文章: (1)关于MATLAB中xlswrite函数写数据出现服务器异常情况的解决办法 (2)https://www.cn ...
- 配置 influxDB 鉴权及 HTTP API 写数据的方法
本文简要描述如何为 InfluxDB 开启鉴权和配置用户管理权限(安装后默认不需要登录),以及开启鉴权后如何使用 HTTP API 写数据. 创建 InfluxDB 管理员账号 创建 admin 帐号 ...
- 往hdfs写数据无权限
调用jar包把风电数据往hdfs写数据无权限问题 错误信息:hadoop.security.AccessControlException:Permission denied:user=gcl,acce ...
- Netty源码分析第7章(编码器和写数据)----第2节: MessageToByteEncoder
Netty源码分析第7章(编码器和写数据)---->第2节: MessageToByteEncoder Netty源码分析第七章: Netty源码分析 第二节: MessageToByteEnc ...
- Java17-day08【File(创建和删除文件、判断和获取功能、遍历目录)、IO流(字节流写数据、异常处理、字节流读数据、复制文本文件、复制图片)】
视频+资料(工程源码.笔记)[链接:https://pan.baidu.com/s/1MdFNUADVSFf-lVw3SJRvtg 提取码:zjxs] Java基础--学习笔记(零起点打开java ...
最新文章
- 只在必要时保存服务器控件视图状态
- BIND日志相关(二)
- 近亿台物联网设备或遭劫持,这家IoT云平台遭遇“灾难性”入侵事件
- Openstack在controller节点 nova image-list HTTP500
- 转 MySQL问题排查工具介绍
- 云上建站快速入门:博客、论坛、CMS、电子商务网站统统搞定
- Keras-训练可视化
- linux查看帮助信息,命令帮助信息的获取
- IBatis.net动态SQL语句(六)
- jsf入门实例_JSF错误消息示例教程
- Java面试题:热情盛夏,分享Java大厂面试百题
- SQL递归查询知多少
- windows完全卸载MySql数据库
- 数据库系统及应用——班级管理系统
- deepin V20.2版本安装MySQL
- mac sublime中文乱码问题解决
- 华为核心交换机绑定IP+MAC+端口案例
- deepin firewall
- LeetCode之Shortest Unsorted Continous Subarray
- 【对象存储】关于阿里云OSS踩坑记录