整个写文件的总体流程这里有介绍

主要分析了写文件时,NameServer端的源码分析

这篇文章介绍写文件时,Client端的源码分析

本文描述的内容涉及TFS写入流程图中的step1, step2, step3, step7

这里有介绍,本文主要分析源码

和通常的Linux文件操作API一样,写文件时包括三个步骤:open->write->close

TfsFile::tfs_open():

1. 读模式,发送block_id给NS,从NS查询相关ds_list返回给client

2. 写模式,发送block_id给NS,NS查询元数据,选择一个可写的block_id,以及相关ds_list返回给clientint TfsFile::tfs_open(const char *file_name, const char *suffix, const int32_t mode)

{

mode_ = mode;

//利用文件名计算得到block_id和file_id

conv_name(file_name, suffix);

int32_t iret = tfs_open(block_id_, file_id_, mode_);

if (iret != TFS_SUCCESS)

{

return iret;

}

FSName fsname;

fsname.set_cluster_id(session_->get_cluster_id());

fsname.set_block_id(block_id_);

fsname.set_file_id(file_id_);

//这个有什么用

fsname.set_prefix(suffix);

file_id_ = fsname.get_file_id();

strcpy(file_name_, fsname.get_name());

return TFS_SUCCESS;

}

int TfsFile::tfs_open(const uint32_t block_id, const uint64_t file_id, const int32_t mode)

{

if (session_ == NULL)

{

snprintf(error_message_, ERR_MSG_SIZE, "not set session, session is null");

return TFS_ERROR;

}

//读模式的话,必须指明block_id

if ((block_id == 0) && (mode == READ_MODE || mode == UNLINK_MODE))

{

snprintf(error_message_, ERR_MSG_SIZE, "%s no block, block_id(%u) file_id(%" PRI64_PREFIX "u)",

file_name_, block_id, file_id);

return TFS_ERROR;

}

mode_ = mode;

block_id_ = block_id;

file_id_ = file_id;

ds_list_.clear();

if (mode == READ_MODE)

{

//跟NameServer通信,主要是获取ds_list信息,主要是调用

//TfsSession::get_block_info_ex()函数,加上查询cache的信息

if (session_->get_block_info(block_id_, ds_list_) != TFS_SUCCESS)

{

//没有可读的DS,返回错误

if (ds_list_.size() == 0)

{

snprintf(error_message_, ERR_MSG_SIZE, "tfs open fail, block(%u) no exist in nameserver", block_id_);

return TFS_ERROR;

}

}

}

else if (mode == UNLINK_MODE)

{

if (session_->get_unlink_block_info(block_id_, ds_list_) != TFS_SUCCESS)

{

snprintf(error_message_, ERR_MSG_SIZE, "tfs open fail, block(%u) no exist in nameserver", block_id_);

return TFS_ERROR;

}

}

//写模式

else

{

uint32_t current_block_id = block_id_;

int32_t flag = BLOCK_WRITE | BLOCK_CREATE;

if ((mode_ & NEWBLK_MODE))

{

flag |= BLOCK_NEWBLK;

}

if ((mode_ & NOLEASE_MODE))

{

flag |= BLOCK_NOLEASE;

}

//这里也调用TfsSession::get_block_info_ex,如果对应的文件不存在,则

//NameServer会找到一个可用的block_id返回

if (session_->create_block_info(current_block_id, ds_list_, flag, fail_servers_) != TFS_SUCCESS)

{

if (ds_list_.size() == 0 || current_block_id == 0)

{

snprintf(error_message_, ERR_MSG_SIZE, "create block(%u) fail in nameserver", block_id_);

return TFS_ERROR;

}

}

//这里的APPED_MODE是如何来判断的,跟block_id==0有什么关系

if (block_id_ == 0)

{

block_id_ = current_block_id;

mode_ |= APPEND_MODE;

}

}

if (block_id_ == 0 || ds_list_.size() == 0)

{

snprintf(error_message_, ERR_MSG_SIZE, "block(%u),is invalid, dataserver size(%u)", block_id_,

static_cast (ds_list_.size()));

return TFS_ERROR;

}

if ((mode_ & READ_MODE))

{

if (file_id_ == 0)

{

//TBSYS_LOG(WARN, "blockId(%u) read fileid is 0.", block_id_);

}

//读模式通过这个方法来选择DS

pri_ds_index_ = static_cast (file_id_ % ds_list_.size());

}

else

{

pri_ds_index_ = 0;

}

//尝试连接ds_list中的DS,并且选出第一个可连接的DS作为primary DS

if (connect_ds() != TFS_SUCCESS)

{

snprintf(error_message_, ERR_MSG_SIZE, "connect to dataserver fail.");

if ((mode_ & APPEND_MODE))

{

fail_servers_.push_back(ds_list_[0]);

}

return TFS_ERROR;

}

fail_servers_.clear();

if ((mode_ & WRITE_MODE))

{

//这个函数调用client_->call(CreateFileMessage),经过上面connect_ds()的调用,

//client_是连到primary DS的客户端,所以这个消息不是发送给NS的

//通过此消息,去primary DS上获取file_numberint32_tret=create_filename();

if (ret!=TFS_SUCCESS)

{

CLIENT_POOL.release_client(client_);

TBSYS_LOG(ERROR, "create file name faile(%d)",ret);

returnret;

}

if (file_id_==0)

{

snprintf(error_message_,ERR_MSG_SIZE, "create file name fail,fileid == 0");

CLIENT_POOL.release_client(client_);

returnTFS_ERROR;

}

}

offset_=0;

eof_=TFS_FILE_EOF_FLAG_NO;

crc_=0;

is_open_flag_=TFS_FILE_OPEN_FLAG_YES;

returnTFS_SUCCESS;

}

TfsFile::tfs_write():

1. 传入数据,进行数据写入int TfsFile::tfs_write(char *data, const int32_t len)

{

if (is_open_flag_ == TFS_FILE_OPEN_FLAG_NO)

{

snprintf(error_message_, ERR_MSG_SIZE, "tfs file(%s) don't open file", file_name_);

return -2;

}

if (!(mode_ & WRITE_MODE))

{

snprintf(error_message_, ERR_MSG_SIZE, "open file mode isn't write.");

return -1;

}

WriteDataMessage dsmessage;

//file_number用来干什么??

dsmessage.set_file_number(file_number_);

dsmessage.set_block_id(block_id_);

dsmessage.set_file_id(file_id_);

//小文件写入,需要找到这个block_id对应的块的offset进行写入

dsmessage.set_offset(offset_);

dsmessage.set_length(len);

//注意这里,讲ds_list_放入message中,发送给primary DS,让它去给其他DS发送数据

dsmessage.set_ds_list(ds_list_);

dsmessage.set_data(data);

//给primary DS发送写入消息,这里是同步等待,还是异步等待??

Message *message = client_->call(&dsmessage);

int32_t iret = TFS_ERROR;

if (message != NULL)

{

if (message->get_message_type() == STATUS_MESSAGE)

{

StatusMessage *msg = dynamic_cast (message);

if (msg->get_status() == STATUS_MESSAGE_OK)

{

//记录数据crc和文件偏移量

crc_ = Func::crc(crc_, data, len);

offset_ += len;

iret = TFS_SUCCESS;

}

else

{

#ifdef __CLIENT_METRICS__

write_metrics_.incr_failed_count();

#endif

snprintf(error_message_, ERR_MSG_SIZE, "tfs write, get error msg(%s)", msg->get_error());

}

}

tbsys::gDelete(message);

}

else

{

#ifdef __CLIENT_METRICS__

write_metrics_.incr_timeout_count();

#endif

}

if (iret != TFS_SUCCESS)

{

snprintf(error_message_, ERR_MSG_SIZE, "tfs write, get error msg from dataserver(%s)",

tbsys::CNetUtil::addrToString(client_->get_mip()).c_str());

client_->disconnect();

is_open_flag_ = TFS_FILE_OPEN_FLAG_NO;

CLIENT_POOL.release_client(client_);

return -1;

}

return len;

}

关闭文件:

1. 由client向primary DS发送closeFileMessage消息,由primary DS通知其他DS关闭文件(如果不关闭文件的话,是否该文件是不可读的????)

int TfsFile::tfs_close()

{

if (is_open_flag_ == TFS_FILE_OPEN_FLAG_NO)

{

TBSYS_LOG(INFO, "tfs close successful , buf tfs file not open");

return TFS_SUCCESS;

}

//读模式,直接关闭client,设置is_open_flag就行

if (mode_ & READ_MODE)

{

is_open_flag_ = TFS_FILE_OPEN_FLAG_NO;

if (client_ != NULL)

CLIENT_POOL.release_client(client_);

TBSYS_LOG(DEBUG, "tfs close successful");

return TFS_SUCCESS;

}

if (!(mode_ & WRITE_MODE))

{

TBSYS_LOG(INFO, "tfs close successful");

return TFS_SUCCESS;

}

if (offset_ == 0)

{

is_open_flag_ = TFS_FILE_OPEN_FLAG_NO;

if (client_ != NULL)

{

CLIENT_POOL.release_client(client_);

}

TBSYS_LOG(INFO, "tfs close successful, offset(%d)", offset_);

return TFS_ERROR;

}

if (client_ == NULL)

{

TBSYS_LOG(ERROR, "tfs close fail, client is null");

return TFS_ERROR;

}

CloseFileMessage dsmessage;

dsmessage.set_file_number(file_number_);

dsmessage.set_block_id(block_id_);

dsmessage.set_file_id(file_id_);

dsmessage.set_option_flag(option_flag_);

//发送closeFileMessage给primary,让primary通知其他DS关闭文件

dsmessage.set_ds_list(ds_list_);

dsmessage.set_crc(crc_);

Message *message = client_->call(&dsmessage);

int32_t iret = TFS_ERROR;

if (message != NULL)

{

if (message->get_message_type() == STATUS_MESSAGE)

{

StatusMessage* msg = dynamic_cast (message);

if (msg->get_status() == STATUS_MESSAGE_OK)

{

iret = TFS_SUCCESS;

}

else

{

#ifdef __CLIENT_METRICS__

close_metrics_.incr_failed_count();

#endif

snprintf(error_message_, ERR_MSG_SIZE, "tfs file close, get errow msg(%s)", msg->get_error());

client_->disconnect();

}

}

tbsys::gDelete(message);

}

else

{

snprintf (error_message_, ERR_MSG_SIZE, "tfs file close,send msg to dataserver fail, errors(time out, blockid(%u) fileid(%" PRI64_PREFIX "u)",

block_id_, file_id_);

#ifdef __CLIENT_METRICS__

close_metrics_.incr_failed_count();

#endif

}

#ifdef __CLIENT_METRICS__

if (close_metrics_.total_count % 50 == 0)

{

TBSYS_LOG(DEBUG, "write file (%s) on server(%s)", file_name_, tbsys::CNetUtil::addrToString(last_elect_ds_id_).c_str());

}

#endif

is_open_flag_ = TFS_FILE_OPEN_FLAG_NO;

CLIENT_POOL.release_client(client_);

return iret;

}

tfs_client php,TFS 源码分析 写文件操作 Client端相关推荐

  1. elasticsearch源码分析之search模块(client端)

    elasticsearch源码分析之search模块(client端) 注意,我这里所说的都是通过rest api来做的搜索,所以对于接收到请求的节点,我姑且将之称之为client端,其主要的功能我们 ...

  2. MyBatis 源码分析 - 映射文件解析过程

    1.简介 在上一篇文章中,我详细分析了 MyBatis 配置文件的解析过程.由于上一篇文章的篇幅比较大,加之映射文件解析过程也比较复杂的原因.所以我将映射文件解析过程的分析内容从上一篇文章中抽取出来, ...

  3. elasticsearch源码分析之search模块(server端)

    elasticsearch源码分析之search模块(server端) 继续接着上一篇的来说啊,当client端将search的请求发送到某一个node之后,剩下的事情就是server端来处理了,具体 ...

  4. Tornado源码分析 --- 静态文件处理模块

    每个web框架都会有对静态文件的处理支持,下面对于Tornado的静态文件的处理模块的源码进行分析,以加强自己对静态文件处理的理解. 先从Tornado的主要模块 web.py 入手,可以看到在App ...

  5. Docker源码分析(二):Docker Client创建与命令执行

    http://www.infoq.com/cn/articles/docker-source-code-analysis-part2 1. 前言 如今,Docker作为业界领先的轻量级虚拟化容器管理引 ...

  6. live555源码分析----mpg文件的处理

    live555支持的文件格式多为单流的文件,仅支持*.mpg.*.mkv.*.webm几种音视频混合类型的文件.其实我的目的是扩展其支持的格式,如avi等, 所以来分析一下mpg文件的处理. mpg文 ...

  7. Yii2 源码分析 - 入口文件执行流程

    2019独角兽企业重金招聘Python工程师标准>>> 以 yii 2.0.14 高级版的 frontend 为例,从 frontend/web/index.php 开始 //引用 ...

  8. rocketmq 消息删除_RocketMQ源码分析之文件过期删除机制

    1.由于RocketMQ操作CommitLog.ConsumeQueue文件,都是基于内存映射方法并在启动的时候,会加载commitlog.ConsumeQueue目录下的所有文件,为了避免内存与磁盘 ...

  9. LDD3源码分析之ioctl操作 .

    http://blog.csdn.net/liuhaoyutz/article/details/7386254 作者:刘昊昱 博客:http://blog.csdn.net/liuhaoyutz 编译 ...

最新文章

  1. python详细安装教程环境配置-如何安装Python(环境设置)?详细安装步骤图解
  2. oracle linux 6.5 安装 virtualbox
  3. ML之LoRBaggingRF:依次利用Bagging、RF算法对泰坦尼克号数据集 (Kaggle经典案例)获救人员进行二分类预测——模型融合
  4. 高人写的浙大简史(转)
  5. [css] 说说visibility属性的collapse属性值有什么作用?在不同浏览器下有什么区别?
  6. echarts vue 柱状图实例_「源码学习」适用于 Vue3 的 ECharts 包装组件
  7. java stream byte_乐字节-Java8新特性之Stream流(上)
  8. 杭电计算机组成原理教材答案,杭电计算机组成原理包建课后作业答案详解.doc...
  9. php动态页面在ie浏览器中css布局板块全缩在中间,CSS网页布局开发时的常见问题及解决方法...
  10. RabbitMQ 交换机、队列、消息持久化
  11. 计算机dos全套教学视频,梦想之路DOS命令系列培训教程(视频打包)
  12. 杭电OJ刷题指南(ACM)
  13. keil 生成三角波dac0832_怎么样利用南方CASS三角网法和方格网法进行土方量计算...
  14. 毕业设计So Easy:基于Java语言西餐厅点餐系统
  15. 开源办公OA开发平台使用说明:用车管理应用
  16. WinRAR 3.51 注册码
  17. 网络安全——数据库基础知识
  18. Excel使用攻略(1)
  19. iOS-将像素绘制到屏幕上
  20. 三维图像专业处理软件Dragonfly 应用-如何计算面孔隙率

热门文章

  1. Android studio真机测试不能直接打开APP
  2. OmniGraffle 中文 - 汉化 6.1版本汉化
  3. 【论文笔记】sigmod15-COMMIT A Scalable Approach to Mining Communication Motifs from Dynamic Networks
  4. 攻防世界-web-simple js
  5. 电子电气架构——ECU升级(Bootloader)A/B分区策略汇总
  6. 利用WinRAR打包免安装绿色C#程序
  7. 基于leaflet编写的经纬线网格绘制react插件
  8. Unreal Engine 4 物理模拟之物理碰撞、重叠与射线检测
  9. 拼多多拼团小程序开发
  10. 智领云彭锋受邀参加2022数字经济领航者峰会,深度探讨“云原生时代的数据赋能”...