背景:服务端实现一个多对多的生产者消费者模式,监听某个端口,一旦有client连入,将socket存入队列。通知消费者进程进行消费。在消费者进程中,拿到客户端的socket,接收客户端的信息,并将接收到的数据返回服务端。

难点:锁,server main函数如何生成多对多的线程(这是个大坑,放的位置或逻辑不对极易退化成一对一模式,在实践中,本人将监听放入生产者函数中,进行循环监听,main函数类比网上的设计,结果导致多个客户端链接时,当第一个client处在输入状态,另起client时,即使后面的client输入完成也得不到响应,得等待第一个client输入处理结束,改了许久才实现,仅以此记录自己踩过的坑)

客户端代码【简单的socket编程】

#include <iostream>
#include <unistd.h>
#include <strings.h>
#include<string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netdb.h>#define PORT 9090
#define MAXDATASIZE 100char receiveM[100];
char sendM[100];int main(int argc, char *argv[]) {int fd, numbytes;struct hostent *he;struct sockaddr_in server;if (argc != 2) {std::cout<<"Usage args <IP Address>"<<std::endl;exit(1);}// 通过函数 gethostbyname()获得字符串形式的ip地址,并赋给heif ((he = gethostbyname(argv[1])) == NULL) {std::cout<<"gethostbyname() error"<<std::endl;exit(1);}// 产生套接字fdif ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {std::cout<<"socket() error"<<std::endl;exit(1);}bzero(&server, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(PORT);server.sin_addr = *((struct in_addr *) he->h_addr);if (connect(fd, (struct sockaddr *) &server, sizeof(struct sockaddr)) == -1) {std::cout<<"connect() error"<<std::endl;exit(1);}// 向服务器发送数据std::cout<<"send message to server:";fgets(sendM, 100, stdin);int send_le;send_le = strlen(sendM);sendM[send_le - 1] = '\0';send(fd, sendM, strlen(sendM), 0);// 从服务器接收数据if ((numbytes = recv(fd, receiveM, MAXDATASIZE, 0)) == -1) {std::cout<<"recv() error";exit(1);}std::cout<<"receive message from server:"<<receiveM<<std::endl;close(fd);
}

多对多生产者消费者代码

// producerConsumerModel.h
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>static const int bufferSize = 4; // Item buffer size.
//static const int totalProducts = 10;   // How many items we plan to produce.struct ItemRepository {void *item_buffer[bufferSize];size_t read_position;size_t write_position;size_t produced_item_counter;size_t consumed_item_counter;std::mutex mtx;std::mutex produced_item_counter_mtx;std::mutex consumed_item_counter_mtx;std::condition_variable repo_not_full;std::condition_variable repo_not_empty;
} gItemRepository;typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, void *item) {std::unique_lock <std::mutex> lock(ir->mtx);while (((ir->write_position + 1) % bufferSize) == ir->read_position) {std::cout << "Producer is waiting for an empty slot...\n";(ir->repo_not_full).wait(lock);}(ir->item_buffer)[ir->write_position] = item;(ir->write_position)++;if (ir->write_position == bufferSize)ir->write_position = 0;(ir->repo_not_empty).notify_all();lock.unlock();
}void *ConsumeItem(ItemRepository *ir) {void *data;std::unique_lock <std::mutex> lock(ir->mtx);while (ir->write_position == ir->read_position) {std::cout << "Consumer is waiting for items...\n";(ir->repo_not_empty).wait(lock);}data = (ir->item_buffer)[ir->read_position];(ir->read_position)++;if (ir->read_position >= bufferSize)ir->read_position = 0;(ir->repo_not_full).notify_all();lock.unlock();return data;
}void *ProducerTask(void *args) {std::unique_lock <std::mutex> lock(gItemRepository.produced_item_counter_mtx);if (gItemRepository.produced_item_counter < totalProducts) {++(gItemRepository.produced_item_counter);ProduceItem(&gItemRepository, args);std::cout << "Producer thread  is producing the " << gItemRepository.produced_item_counter<< "^th item" << std::endl;}lock.unlock();
}void *ConsumerTask(void *args) {void *item;std::unique_lock <std::mutex> lock(gItemRepository.consumed_item_counter_mtx);if (gItemRepository.consumed_item_counter < totalProducts) {item = ConsumeItem(&gItemRepository);++(gItemRepository.consumed_item_counter);std::cout << "Consumer thread  is consuming the " << gItemRepository.consumed_item_counter << "^th item"<< std::endl;}lock.unlock();int fd = *(int *) item;char buf[100];if (recv(fd, buf, 100, 0) == -1) {//receive dataexit(1);}std::cout << "receive msg**************" << buf << std::endl;std::string res = buf;send(fd, res.c_str(), res.length(), 0);std::cout << "send finished..." << std::endl;
}void InitItemRepository(ItemRepository *ir) {ir->write_position = 0;ir->read_position = 0;ir->produced_item_counter = 0;ir->consumed_item_counter = 0;
}

server函数端

#include <iostream>
#include <string.h>
#include <strings.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include<pthread.h>
#include "configParams.h"
#include "producerConsumerModel.h"#define BACKLOG 1
using std::cout;
using std::endl;
using std::string;
using std::exception;static const int THREAD_NUM = 4;
int main(int argc, char *argv[]) {string ip = "*.*.*.*";// change itint port = 9090;cout << ip << "  " << port << endl;int listenfd;struct sockaddr_in server;int connectfd;struct sockaddr_in client;socklen_t sin_size;sin_size = sizeof(struct sockaddr_in);if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {exit(1);}int opt = SO_REUSEADDR;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));bzero(&server, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(port);server.sin_addr.s_addr = htonl(INADDR_ANY);//local any ip(one machine has many network card,one card has many ip)// bindif (bind(listenfd, (struct sockaddr *) &server, sizeof(struct sockaddr)) == -1) {exit(1);}// listenif (listen(listenfd, BACKLOG) == -1) {exit(1);}//----------must notice as follow way------------------------InitItemRepository(&gItemRepository);while (1) {// acceptpthread_t pdthread, cmthread; //define a pthreadif ((connectfd = accept(listenfd, (struct sockaddr *) &client, &sin_size)) == -1) {exit(1);}pthread_create(&pdthread, NULL, ProducerTask, (void *) &connectfd);pthread_create(&cmthread, NULL, ConsumerTask, NULL);}close(listenfd);}
// configParams.h#define SERVERADDR  "10.142.32.164"
#define PORT    9090

参考文献

http://www.cnblogs.com/haippy/p/3252092.html
https://github.com/forhappy/Cplusplus-Concurrency-In-Practice/blob/master/zh/chapter11-Application/11.1%20Producer-Consumer-solution.md

c++实现多对多生产者消费者和socket连用相关推荐

  1. java多线程并发之旅-09-java 生产者消费者 Producer/Consumer 模式

    生产者消费者模式 在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产 ...

  2. 生产者/消费者模式之深入理解

    ★简介 生产者消费者模式并不是GOF提出的23种设计模式之一,23种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是我们编程过程 ...

  3. 进程同步之生产者消费者模型

    一.概念引入 例如寄信 我(生产者) 邮递员(消费者) 我把信件写好 -----相当于生产者生产数据 我把信放到邮筒-----相当于把生产者的数据放入到缓冲区 邮递员把信从邮筒取出-----相当于消费 ...

  4. 消息队列:生产者/消费者模式

    1.什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接 ...

  5. 生产者/消费者模式的理解及实现

    ★简介 生产者消费者模式并不是GOF提出的23种设计模式之一,23种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是我们编程过程 ...

  6. 设计模式——生产者消费者模式

    1 基本概括 2 主要介绍 2.1 概念 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消 ...

  7. 生产者消费者模式详解(转载)

    ★简介 在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理 ...

  8. 互斥锁、共享内存方式以及生产者消费者模型

    守护进程 1.守护进程的概念 进程指的是一个正在运行的程序,守护进程也是一个普通进程 意思就是一个进程可以守护另一个进程 import time from multiprocessing import ...

  9. 多线程实现生产者消费者

    1. Python多线程介绍 Python提供了两个有关多线程的标准库,thread和threading.thread提供了低级别的,原始的线程和一个锁.threading则是一个高级模块,提供了对t ...

  10. 线程同步之 生产者消费者模型详解

    前言 博主本来没打算讲这个比较前面的知识的(博主socket编程还有两个部分没讲,进程也才写完回收僵尸进程的三种方法,信号捕捉器也才完结),但是今天有朋友来问博主,什么是生产者消费者模型,所以博主就先 ...

最新文章

  1. 一家专业做SEO的公司介绍给大家|利槿网络
  2. java jar 和 war 包的区别
  3. 远程桌面与本地桌面实现文件传输
  4. 为什么用redis?
  5. 关于报工和生产订单的一些状态解释
  6. Pandas数据分析常用数据操作(3年总结)
  7. 万万没想到,刷1000道题目,还不如搞懂这几个机械动图!
  8. 真正的问题应该在我身上……
  9. Ubuntu16.04下,Firefox每次打开新网页都是以新建Windows而不是Tab的解决方案:
  10. 玩转 SpringBoot 2 快速整合 | RESTful Api 篇
  11. 互联网公司分批返岗;Safari 将封杀超过398天的 HTTPS 证书;TypeScript 3.8 发布 | 极客头条...
  12. 微信公众号开发相关流程及功能介绍
  13. plsql汉化包下载
  14. WebSockt面试题
  15. 计算机类单位换算,计算机单位换算大全
  16. 双均线策略 ------优矿学习
  17. 【全】可供选择的软件开源协议的罗列
  18. C/C++探秘(1)
  19. 分享 6 个 Vue3 开发必备的 VSCode 插件
  20. sql中intersect_INTERSECT –谓词中被低估的双向

热门文章

  1. 【学习笔记】信息系统项目管理-项目采购管理-合同分类
  2. Java模拟醉汉行走问题_醉汉随机行走/随机漫步问题(Random Walk Drunk Python)
  3. 我们可能会遇到的距离量算方法
  4. 三角函数π/2转化_三角函数不会做?看这里,带你搞定
  5. 记一次kubernetes的搭建遇坑coredns状态为CrashLoopBackOff并不断重启
  6. java cim客户端_高效使用 SBLIM CIM Client
  7. python求均值方差不用numpy_【Python】不用numpy用纯python求极差、平均数、中位数、众数与方差,python的打印到控制台...
  8. java鼠标乱跑_光标乱跑怎么办 光标乱跑解决方法【图文】
  9. LeetCode 打家劫舍题型 解析
  10. 使用js完成一个类似于小广告的功能,斜着运动,遇到边界弹回