主要参考hollowworld以及route_guide程序示例。同步方式由于官方给的例子里面有,所以相对容易。为了增加点难度顺便学习一下protobuf,所以接口定义文件中用了比较复杂的字典和枚举数据结构。

1. proto文件:

// image transmission server
syntax = "proto3";option java_multiple_files = true;
option java_package = "io.grpc.examples.ImgTransmit";
option java_outer_classname = "ImgTransmit";
option objc_class_prefix = "IMT";package ImgTransmit;// service definition.
service ImgDemo {//simple RPCrpc resDescFetched(BaseName) returns (Description) {}// A client-to-server streaming RPC. // stream type means a group of ImgInfo will be sent orderly from clientrpc ImgUpload (stream ImgInfo) returns (Status) {}// A server-to-client streaming RPC. send result img to clientrpc resImgFetched (BaseName) returns (stream ImgInfo) {}
}message ImgInfo {string name = 1;enum ImgType {JPG = 0;PNG = 1;}message Img {bytes data = 1;ImgType type = 2;int32 height=3;int32 width=4;int32 channel=5;}// int32 indicates original img id map<int32,Img> maps = 2;
}message Status {int32 code = 1;
}message BaseName {repeated string name=1;
}
message Description {repeated string desc=1;
}

由proto文件生成demo.grpc.pb.h demo.grpc.pb.cc demo.pb.h demo.pb.cc文件就不放了,可以用grpc提供的工具以及protobuf工具自行生成。

2. server服务端程序

#include <algorithm>
#include <chrono>
#include <cmath>
#include <iostream>
#include <fstream>
#include <memory>
#include <string>
#include<map>
#include <exception> #include <grpcpp/grpcpp.h>
#include "demo.grpc.pb.h"
#include "demo.pb.h"using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using ImgTransmit::ImgInfo;
using ImgTransmit::ImgInfo_Img;
typedef  ImgTransmit::Status My_Status;
using ImgTransmit::ImgDemo;using Ms = std::chrono::milliseconds;
using Sec = std::chrono::seconds;
template <class UnitType=Ms>
using TimePoint = std::chrono::time_point<std::chrono::high_resolution_clock, UnitType>;
/*客户端,直接操作stub客户存根类对象进行通讯服务端,继承ImgDemo::Service,并实现相应的接口本例客户端,服务端均是同步(sync)实现
*/
// Logic and data behind the server's behavior.
class ImageServiceImpl final : public ImgDemo::Service {
public:::grpc::Status ImgUpload(ServerContext* context, ::grpc::ServerReader< ::ImgTransmit::ImgInfo>* reader, My_Status* response) override {::ImgTransmit::ImgInfo info;int point_count = 0;int feature_count = 0;float distance = 0.0;// reader 接收客户端传来的一组图片if (resultList.size() > 0)resultList.clear();int error = -1;int count = 0;std::chrono::system_clock::time_point start_time = std::chrono::system_clock::now();while (reader->Read(&info)) {//挨个处理图片try {std::string name = info.name();google::protobuf::Map<google::protobuf::int32, ImgInfo_Img> maps = info.maps();int size = maps.size();google::protobuf::Map<google::protobuf::int32, ImgInfo_Img>::iterator it = maps.begin();for (; it != maps.end(); it++) {ImgInfo_Img img = it->second;int c = img.channel();int h = img.height();int w = img.width();std::string rowData = img.data();std::cout << "img size :" << h << "," << w << "," << c<<", data length:"<< rowData.size()<< std::endl;resultList[name] = info;/*//可将收到的图片保存到指定目录std::string savePath("/img/img-server/recevied_img/");std::ofstream  out(savePath+name, std::ios::out | std::ios::binary | std::ios::ate);out.write(rowData.c_str(), sizeof(char) * (rowData.size()));out.close();*/}count++;}catch (std::exception& e) {std::cout << "exception: " << e.what() << std::endl;error += 1;}}//告知客户端,消息是否收到std::chrono::system_clock::time_point end_time = std::chrono::system_clock::now();Sec secs =std::chrono::duration_cast<Sec>(end_time - start_time);Ms ms = std::chrono::duration_cast<Ms>(end_time - start_time);TimePoint<Sec> sec_time_point(secs);TimePoint<Ms> ms_time_point(ms);std::cout << count <<" images received success."<<std::endl;std::cout << "time cost is: "<< ms_time_point.time_since_epoch().count() <<" ms,  "<< sec_time_point.time_since_epoch().count()<<" seconds"<<std::endl;response->set_code(1);if (error > 0) {return ::grpc::Status(::grpc::StatusCode::DATA_LOSS, "data received uncompletely.");}elsereturn ::grpc::Status::OK;}::grpc::Status resImgFetched(::grpc::ServerContext* context, const ::ImgTransmit::BaseName* request, ::grpc::ServerWriter< ::ImgTransmit::ImgInfo>* writer)override {//const std::string& name = request->name();const google::protobuf::RepeatedPtrField<std::string>& names=request->name();int count=0, size = names.size();for (int i = 0; i < size;i++) {const std::string& singleName = names.Get(i);if (resultList.count(singleName) == 1) {::ImgTransmit::ImgInfo res = resultList[singleName];writer->Write(res);count--;}count++;}if(count<size)return ::grpc::Status::OK;else return ::grpc::Status(::grpc::StatusCode::NOT_FOUND, "uncompleted reponse.");}::grpc::Status resDescFetched(::grpc::ServerContext* context, const ::ImgTransmit::BaseName* request, ::ImgTransmit::Description* response)override {const google::protobuf::RepeatedPtrField<std::string>& names = request->name();const google::protobuf::RepeatedPtrField<std::string>* rsp = response->mutable_desc();int count = 0, size = names.size();for (int i = 0; i < size; i++) {const std::string& singleName = names.Get(i);if (resultList.count(singleName) == 1) {std::string ss;const ::ImgTransmit::ImgInfo& singleInfo = resultList[singleName];response->add_desc(singleInfo.name());count--;}count++;}if (count < size)return ::grpc::Status::OK;elsereturn ::grpc::Status(::grpc::StatusCode::NOT_FOUND, "uncompleted reponse.");}
private:std::map<std::string,::ImgTransmit::ImgInfo> resultList;
};void RunServer() {std::string server_address("0.0.0.0:50057");ImageServiceImpl service;ServerBuilder builder;//Server-side Connection Managementbuilder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);//默认为0,在没有rpc待处理的情况下,不允许发PING帧builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS,10000);//默认7200000,两个小时后发送PING帧builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);//默认20000,20秒后如果没收到PING ACK,就重发PINGbuilder.AddChannelArgument(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 10);//默认累计发了2个PING帧之后,必须发送一次带数据的帧才能继续发PING帧builder.AddChannelArgument(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 5);//默认为2,最多重发2次如果对方不响应,就断开连接// Listen on the given address without any authentication mechanism.builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());// Register "service" as the instance through which we'll communicate with// clients. In this case it corresponds to an *synchronous* service.builder.RegisterService(&service);// Finally assemble the server.std::unique_ptr<Server> server(builder.BuildAndStart());std::cout << "Server listening on " << server_address << std::endl;// Wait for the server to shutdown. Note that some other thread must be// responsible for shutting down the server for this call to ever return.server->Wait();
}int main(int argc, char** argv) {std::cout << "ready go to sleeping" << std::endl;RunServer();return 0;
}

3.client 客户端程序

#include <chrono>
#include <iostream>
#include <fstream>
#include <memory>
#include <random>
#include <string>
#include <thread>
#include <vector>#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include "demo.grpc.pb.h"using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReader;
using grpc::ClientReaderWriter;
using grpc::ClientWriter;
using grpc::Status;
using ImgTransmit::ImgInfo_Img;
using ImgTransmit::ImgInfo_ImgType;
using ImgTransmit::ImgInfo;
using ImgTransmit::BaseName;
using ImgTransmit::Description;
typedef  ImgTransmit::Status My_Status;
using ImgTransmit::ImgDemo;using Ms = std::chrono::milliseconds;
using Sec = std::chrono::seconds;
template <class UnitType = Ms>
using TimePoint = std::chrono::time_point<std::chrono::high_resolution_clock, UnitType>;void parse_txt(const std::string & txt_path,std::vector<std::string>& content) {std::ifstream fp(txt_path);if (!fp.is_open()){std::cout << "can not open this file" << std::endl;return;}std::string line;while (getline(fp, line)) {// using printf() in all tests for consistencyif (line.front() == '#'|| line.front() == ';')continue;printf("%s\n", line.c_str());content.push_back(line);}fp.close();return;
}class ImageClient {
public:ImageClient(std::shared_ptr<Channel> channel): stub_(ImgDemo::NewStub(channel)) {//stub底层对应着一个tcp socket,销毁stub就会断开tcp连接}void resImgFetched(const std::string& imgname) {BaseName query;ClientContext context;query.add_name(imgname);std::unique_ptr<ClientReader<ImgTransmit::ImgInfo> > reader(stub_->resImgFetched(&context, query));ImgTransmit::ImgInfo info;//info 将会在服务端的resImgFetched中被处理while (reader->Read(&info)) {//将流读完std::cout << "Found feature " << info.name() << std::endl;}Status status = reader->Finish();if (status.ok()) {std::cout << " result fetch rpc succeeded."<< std::endl;}else {std::cout << "result fetch rpc failed." << std::endl;}}void resDescFetched(const std::string& imgname) {BaseName query;Description res;ClientContext context;query.add_name(imgname);Status status = stub_->resDescFetched(&context, query, &res);if (!status.ok()) {std::cout << "GetFeature rpc failed." << std::endl;return;}auto descs = res.desc();int size = descs.size();for (int i = 0; i < size; i++) {const std::string& description = descs.Get(i);std::cout << "get success: " << description << std::endl;}return;}void ImgUpload(const std::vector<std::string>& img_list) {My_Status stats;ClientContext context;const int kPoints = 10;unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();std::default_random_engine generator(seed);std::uniform_int_distribution<int> delay_distribution(500, 1500);//这个stats将会在服务端的ImgUpload被处理std::unique_ptr<ClientWriter<::ImgTransmit::ImgInfo> > writer(stub_->ImgUpload(&context, &stats));std::chrono::system_clock::time_point start_time = std::chrono::system_clock::now();for (int i = 0; i < img_list.size(); i++) {std::string path = img_list.at(i);std::ifstream imgreader(path, std::ifstream::in | std::ios::binary);if(!imgreader.is_open())continue;imgreader.seekg(0, imgreader.end);  //将文件流指针定位到流的末尾int length = imgreader.tellg();imgreader.seekg(0, imgreader.beg);  //将文件流指针重新定位到流的开始char* buffer = (char*)malloc(sizeof(char) * length);imgreader.read(buffer, length);imgreader.close();int crest = path.find_last_of('\\');ImgInfo_Img img_detail;ImgInfo_ImgType type = ImgInfo_ImgType::ImgInfo_ImgType_JPG;img_detail.set_height(224);img_detail.set_width(224);img_detail.set_channel(3);img_detail.set_type(type);img_detail.set_data(buffer, length);free(buffer);      ImgTransmit::ImgInfo info;info.set_name(path.substr(crest + 1, -1));google::protobuf::Map<google::protobuf::int32, ImgInfo_Img>* maps =info.mutable_maps();google::protobuf::MapPair<google::protobuf::int32, ImgInfo_Img> item(i, img_detail);//maps->operator[](i)= img_detail;maps->insert(item);if (!writer->Write(info)) {// Broken stream.break;}std::this_thread::sleep_for(std::chrono::milliseconds(delay_distribution(generator)));}writer->WritesDone();std::chrono::system_clock::time_point end_time = std::chrono::system_clock::now();Status status = writer->Finish();if (status.ok()) {std::cout << "ImgUpload finished  with code: " << stats.code() <<  std::endl;Ms ms = std::chrono::duration_cast<Ms>(end_time - start_time);TimePoint<Ms> ms_time_point(ms);std::cout << "time cost is: " << ms_time_point.time_since_epoch().count() << " ms."<< std::endl;}else {std::cout << "ImgUpload rpc failed with code: "<<stats.code()<< std::endl;}}private:std::unique_ptr<ImgDemo::Stub> stub_;
};int main(int argc, char** argv) {ImageClient client(grpc::CreateChannel("localhost:50057",grpc::InsecureChannelCredentials()));std::cout << "-------------- upload image --------------" << std::endl;std::cout << "please assign the img list file:" << std::endl;std::vector<std::string> imglist;imglist.push_back("/path/to/image/275eef8456311c49af5e6137d959e1de.jpg");imglist.push_back("/path/to/image/98bb90c44f55aaeeae417d8233226785.jpg");client.ImgUpload(imglist);std::cout << "-------------- fetch image result --------------" << std::endl;client.resImgFetched("98bb90c44f55aaeeae417d8233226785.jpg");std::cout << "-------------- fetch description result --------------" << std::endl;client.resDescFetched("98bb90c44f55aaeeae417d8233226785.jpg");std::cin.get();return 0;
}

c++ grpc 实现一个传图服务(同步方式,流式传输)相关推荐

  1. c++ grpc 实现一个传图服务(异步方式,流式接收与发送)

    ~!转载请注明出处 异步传输官方示例只给了普通Unary元对象的传输,没有流式传输示例,经过摸索调试,实现了grpc的异步流式传输(目前只是单向流,服务端推流至客户端,或者客户端上送流至服务端). 1 ...

  2. grpc流式传输示例(c++)

    目录 grpc理解 流式传输方式 Attention grpc理解 grpc是结合protobuf的远程调用框架,服务端和客户端均支持同步和异步模式.同步模式下,服务器的service函数会阻塞,且当 ...

  3. grpc 流式传输_编写下载服务器。 第一部分:始终流式传输,永远不要完全保留在内存中...

    grpc 流式传输 下载各种文件(文本或二进制文件)是每个企业应用程序的生死攸关的事情. PDF文档,附件,媒体,可执行文件,CSV,超大文件等.几乎每个应用程序迟早都必须提供某种形式的下载. 下载是 ...

  4. 一个包含30行代码的Python项目:如何在您最喜欢的Twitcher流式传输时设置SMS通知...

    Hi everyone :) Today I am beginning a new series of posts specifically aimed at Python beginners. Th ...

  5. java流式上传下载_精讲RestTemplate第6篇-文件上传下载与大文件流式下载

    C++Templates(第2版英文版) 123.24元 (需用券) 去购买 > 本文是精讲RestTemplate第6篇,前篇的blog访问地址如下: 精讲RestTemplate第1篇-在S ...

  6. curl上传文件linux,在Linux中如何使用curl从一个服务器流式传输文件到另一个服务器(有限的服务器资源)...

    我的API服务器具有非常有限的磁盘空间(500MB)和内存(1GB).它获得的API调用之一是接收文件.消费者调用API并传递要下载的URL. 我的服务器的"目标"是将此文件上传到 ...

  7. OKHTTP 实现流式传输上传文件

    1. 引入okhttp依赖 implementation 'com.squareup.okhttp3:okhttp:4.10.0' 2. 编写工具类 object HttpUtils {private ...

  8. gRPC的通信方式-客户端流式、服务端流式、双向流式在Java的调用示例

    场景 gPRC简介以及Java中使用gPRC实现客户端与服务端通信(附代码下载): https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/ ...

  9. html 前端传数据流,jquery – 使用Node.js流式传输数据

    我想知道是否可以使用Node.js将数据从服务器流式传输到客户端.从我可以理解的所有在互联网上,这是必须是可能的,但我没有找到一个正确的例子或解决方案. 我想要的是一个单独的http post到nod ...

最新文章

  1. JavaScript 的 Promise  和  C# 的 waitone 一样吗?请大家讨论i两句。
  2. php 链接redis 实际例子
  3. Linux系统proc详解
  4. 字符串转换到double数组
  5. python生成器函数的使用(模拟cycle函数)
  6. 状态(State)模式
  7. Python-1-基础
  8. 打印服务Print Spooler自动停止解决方案
  9. 2022低压电工操作证考试题模拟考试平台操作
  10. 用企业微信管理微信客户有哪些好处?
  11. 欧洲计算机专业排名,最新!2021年QS世界大学学科排名发布!欧洲各国各学科专业排名表现抢眼!...
  12. 洛谷·[HNOI2015]落忆枫音
  13. arcgis实现cad图斑批量导入后,图斑颜色设置cad图层颜色保持一致
  14. Python访问剪贴板
  15. arduino期末考试题
  16. python正版软件多少钱_正版数据库软件需要多少钱
  17. Orion2 CDM 操作系统-操作部分20211206
  18. Qt--模拟按下按键(键盘)
  19. 微信H5公众号获取openid爬坑记
  20. securecrt连接虚拟机提示账号密码错误

热门文章

  1. 苹果录屏怎么设置_电脑怎么设置录屏?怎么设置电脑自动录屏?
  2. 微信裂变涨粉如何实操效果最好
  3. Arduino开发实例-DS3231实时时钟+LCD显示
  4. 如何用C语言编写简单的日历查询程序
  5. Dockerfile构建apache镜像
  6. linux日志系统介绍 —— syslog(),openlog(),closelog()
  7. 什么是网站安全?网站安全保障有哪些方式?
  8. php 实现购物车的案例,PHP实现的购物车类实例
  9. WORKNC 2022软件安装说明视频教程
  10. 2019弘康人寿值得入手的保险清单出炉!