47 工作实战:Socket 结合线程池的使用

立志是事业的大门,工作是登堂入室的旅程。

——巴斯德

引导语

Socket 面试最终题一般都是让你写一个简单的客户端和服务端通信的例子,本文就带大家一起来写这个 demo。

1 要求

  1. 可以使用 Socket 和 ServiceSocket 以及其它 API;
  2. 写一个客户端和服务端之间 TCP 通信的例子;
  3. 服务端处理任务需要异步处理;
  4. 因为服务端处理能力很弱,只能同时处理 5 个请求,当第六个请求到达服务器时,需要服务器返回明确的错误信息:服务器太忙了,请稍后重试~。

需求比较简单,唯一复杂的地方在于第四点,我们需要对客户端的请求量进行控制,首先我们需要确认的是,我们是无法控制客户端发送的请求数的,所以我们只能从服务端进行改造,比如从服务端进行限流。

有的同学可能很快想到,我们应该使用 ServerSocket 的 backlog 的属性,把其设置成 5,但我们在上一章中说到 backlog 并不能准确代表限制的客户端连接数,而且我们还要求服务端返回具体的错误信息,即使 backlog 生效,也只会返回固定的错误信息,不是我们定制的错误信息。

我们好好想想,线程池似乎可以做这个事情,我们可以把线程池的 coreSize 和 maxSize 都设置成 4,把队列大小设置成 1,这样服务端每次收到请求后,会先判断一下线程池中的队列有没有数据,如果有的话,说明当前服务器已经马上就要处理第五个请求了,当前请求就是第六个请求,应该被拒绝。

正好线程池的加入也可以满足第三点,服务端的任务可以异步执行。

2 客户端代码

客户端的代码比较简单,直接向服务器请求数据即可,代码如下:

public class SocketClient {private static final Integer SIZE = 1024;private static final ThreadPoolExecutor socketPoll = new ThreadPoolExecutor(50, 50,365L,TimeUnit.DAYS,new LinkedBlockingQueue<>(400));@Testpublic void test() throws InterruptedException {// 模拟客户端同时向服务端发送 6 条消息for (int i = 0; i < 6; i++) {socketPoll.submit(() -> {send("localhost", 7007, "nihao");});}Thread.sleep(1000000000);}/*** 发送tcp** @param domainName 域名* @param port       端口* @param content    发送内容*/public static String send(String domainName, int port, String content) {log.info("客户端开始运行");Socket socket = null;OutputStream outputStream = null;InputStreamReader isr = null;BufferedReader br = null;InputStream is = null;StringBuffer response = null;try {if (StringUtils.isBlank(domainName)) {return null;}// 无参构造器初始化 Socket,默认底层协议是 TCPsocket = new Socket();socket.setReuseAddress(true);// 客户端准备连接服务端,设置超时时间 10 秒socket.connect(new InetSocketAddress(domainName, port), 10000);log.info("TCPClient 成功和服务端建立连接");// 准备发送消息给服务端outputStream = socket.getOutputStream();// 设置 UTF 编码,防止乱码byte[] bytes = content.getBytes(Charset.forName("UTF-8"));// 输出字节码segmentWrite(bytes, outputStream);// 关闭输出socket.shutdownOutput();log.info("TCPClient 发送内容为 {}",content);// 等待服务端的返回socket.setSoTimeout(50000);//50秒还没有得到数据,直接断开连接// 得到服务端的返回流is = socket.getInputStream();isr = new InputStreamReader(is);br = new BufferedReader(isr);// 从流中读取返回值response = segmentRead(br);// 关闭输入流socket.shutdownInput();//关闭各种流和套接字close(socket, outputStream, isr, br, is);log.info("TCPClient 接受到服务端返回的内容为 {}",response);return response.toString();} catch (ConnectException e) {log.error("TCPClient-send socket连接失败", e);throw new RuntimeException("socket连接失败");} catch (Exception e) {log.error("TCPClient-send unkown errror", e);throw new RuntimeException("socket连接失败");} finally {try {close(socket, outputStream, isr, br, is);} catch (Exception e) {// do nothing}}}/*** 关闭各种流** @param socket* @param outputStream* @param isr* @param br* @param is* @throws IOException*/public static void close(Socket socket, OutputStream outputStream, InputStreamReader isr,BufferedReader br, InputStream is) throws IOException {if (null != socket && !socket.isClosed()) {try {socket.shutdownOutput();} catch (Exception e) {}try {socket.shutdownInput();} catch (Exception e) {}try {socket.close();} catch (Exception e) {}}if (null != outputStream) {outputStream.close();}if (null != br) {br.close();}if (null != isr) {isr.close();}if (null != is) {is.close();}}/*** 分段读** @param br* @throws IOException*/public static StringBuffer segmentRead(BufferedReader br) throws IOException {StringBuffer sb = new StringBuffer();String line;while ((line = br.readLine()) != null) {sb.append(line);}return sb;}/*** 分段写** @param bytes* @param outputStream* @throws IOException*/public static void segmentWrite(byte[] bytes, OutputStream outputStream) throws IOException {int length = bytes.length;int start, end = 0;for (int i = 0; end != bytes.length; i++) {start = i == 0 ? 0 : i * SIZE;end = length > SIZE ? start + SIZE : bytes.length;length -= SIZE;outputStream.write(bytes, start, end - start);outputStream.flush();}}}

客户端代码中我们也用到了线程池,主要是为了并发模拟客户端一次性发送 6 个请求,按照预期服务端在处理第六个请求的时候,会返回特定的错误信息给客户端。

以上代码主要方法是 send 方法,主要处理像服务端发送数据,并处理服务端的响应。

3 服务端代码

服务端的逻辑分成两个部分,第一部分是控制客户端的请求个数,当超过服务端的能力时,拒绝新的请求,当服务端能力可响应时,放入新的请求,第二部分是服务端任务的执行逻辑。

3.1 对客户端请求进行控制

public class SocketServiceStart {/*** 服务端的线程池,两个作用* 1:让服务端的任务可以异步执行* 2:管理可同时处理的服务端的请求数*/private static final ThreadPoolExecutor collectPoll = new ThreadPoolExecutor(4, 4,365L,TimeUnit.DAYS,new LinkedBlockingQueue<>(1));@Testpublic void test(){start();}/*** 启动服务端*/public static final void start() {log.info("SocketServiceStart 服务端开始启动");try {// backlog  serviceSocket处理阻塞时,客户端最大的可创建连接数,超过客户端连接不上// 当线程池能力处理满了之后,我们希望尽量阻塞客户端的连接
//      ServerSocket serverSocket = new ServerSocket(7007,1,null);// 初始化服务端ServerSocket serverSocket = new ServerSocket();serverSocket.setReuseAddress(true);
//      serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 80));serverSocket.bind(new InetSocketAddress("localhost", 7007));log.info("SocketServiceStart 服务端启动成功");// 自旋,让客户端一直在取客户端的请求,如果客户端暂时没有请求,会一直阻塞while (true) {// 接受客户端的请求Socket socket = serverSocket.accept();// 如果队列中有数据了,说明服务端已经到了并发处理的极限了,此时需要返回客户端有意义的信息if (collectPoll.getQueue().size() >= 1) {log.info("SocketServiceStart 服务端处理能力到顶,需要控制客户端的请求");//返回处理结果给客户端rejectRequest(socket);continue;}try {// 异步处理客户端提交上来的任务collectPoll.submit(new SocketService(socket));} catch (Exception e) {socket.close();}}} catch (Exception e) {log.error("SocketServiceStart - start error", e);throw new RuntimeException(e);} catch (Throwable e) {log.error("SocketServiceStart - start error", e);throw new RuntimeException(e);}}// 返回特定的错误码给客户端public static void rejectRequest(Socket socket) throws IOException {OutputStream outputStream = null;try{outputStream = socket.getOutputStream();byte[] bytes = "服务器太忙了,请稍后重试~".getBytes(Charset.forName("UTF-8"));SocketClient.segmentWrite(bytes, outputStream);socket.shutdownOutput();}finally {//关闭流SocketClient.close(socket,outputStream,null,null,null);}}}

我们使用 collectPoll.getQueue().size() >= 1 来判断目前服务端是否已经到达处理的极限了,如果队列中有一个任务正在排队,说明当前服务端已经超负荷运行了,新的请求应该拒绝掉,如果队列中没有数据,说明服务端还可以接受新的请求。

以上代码注释详细,就不累赘说了。

3.2 服务端任务的处理逻辑

服务端的处理逻辑比较简单,主要步骤是:从客户端的 Socket 中读取输入,进行处理,把响应返回给客户端。

我们使用线程沉睡 2 秒来模拟服务端的处理逻辑,代码如下:

public class SocketService implements Runnable {private Socket socket;public SocketService() {}public SocketService(Socket socket) {this.socket = socket;}@Overridepublic void run() {log.info("SocketService 服务端任务开始执行");OutputStream outputStream = null;InputStream is = null;InputStreamReader isr = null;BufferedReader br = null;try {//接受消息socket.setSoTimeout(10000);// 10秒还没有得到数据,直接断开连接is = socket.getInputStream();isr = new InputStreamReader(is,"UTF-8");br = new BufferedReader(isr);StringBuffer sb = SocketClient.segmentRead(br);socket.shutdownInput();log.info("SocketService accept info is {}", sb.toString());//服务端处理 模拟服务端处理耗时Thread.sleep(2000);String response  = sb.toString();//返回处理结果给客户端outputStream = socket.getOutputStream();byte[] bytes = response.getBytes(Charset.forName("UTF-8"));SocketClient.segmentWrite(bytes, outputStream);socket.shutdownOutput();//关闭流SocketClient.close(socket,outputStream,isr,br,is);log.info("SocketService 服务端任务执行完成");} catch (IOException e) {log.error("SocketService IOException", e);} catch (Exception e) {log.error("SocketService Exception", e);} finally {try {SocketClient.close(socket,outputStream,isr,br,is);} catch (IOException e) {log.error("SocketService IOException", e);}}}
}

4 测试

测试的时候,我们必须先启动服务端,然后再启动客户端,首先我们启动服务端,打印日志如下:

接着我们启动客户端,打印日志如下:

我们最后看一下服务端的运行日志:

从以上运行结果中,我们可以看出得出的结果是符合我们预期的,服务端在请求高峰时,能够并发处理5个请求,其余请求可以用正确的提示进行拒绝。

5 总结

所以代码集中在 SocketClient、SocketServiceStart、SocketService 中,启动的顺序为先启动 SocketServiceStart,后运行 SocketClient,感兴趣的同学可以自己 debug 下,加深印象。

面试官系统精讲Java源码及大厂真题 - 47 工作实战:Socket 结合线程池的使用相关推荐

  1. 面试官系统精讲Java源码及大厂真题 - 44 场景实战:ThreadLocal 在上下文传值场景下的实践

    44 场景实战:ThreadLocal 在上下文传值场景下的实践 开篇语 我们在 <打动面试官:线程池流程编排中的运用实战>一文中将流程引擎简单地完善了一下,本文在其基础上继续进行改造,建 ...

  2. 面试官系统精讲Java源码及大厂真题 - 24 举一反三:队列在 Java 其它源码中的应用

    24 举一反三:队列在 Java 其它源码中的应用 世上无难事,只要肯登攀. 引导语 队列除了提供 API 供开发者使用外,自身也和 Java 中其他 API 紧密结合,比如线程池和锁,线程池直接使用 ...

  3. 面试官系统精讲Java源码及大厂真题 - 07 List 源码会问哪些面试题

    07 List 源码会问哪些面试题 勤学如春起之苗,不见其增,日有所长. --陶潜 引导语 List 作为工作中最常见的集合类型,在面试过程中,也是经常会被问到各种各样的面试题,一般来说,只要你看过源 ...

  4. 面试官系统精讲Java源码及大厂真题 - 01 开篇词:为什么学习本专栏

    01 开篇词:为什么学习本专栏 更新时间:2019-10-30 10:08:31 才能一旦让懒惰支配,它就一无可为. --克雷洛夫 不为了源码而读源码,只为了更好的实践 你好,我是文贺,Java 技术 ...

  5. java源码pdf_面试官系统精讲Java源码及大厂真题 PDF 下载

    主要内容: 第 1 章 基础 01 开篇词:为什么学习本专栏 不为了源码而读源码,只为了更好的实践 你好,我是文贺,Java 技术专家,DDD 和业务中台的资深实践者,一周面试 2-3 次的面试官. ...

  6. 面试官系统精讲Java源码及大厂真题 - 48 一起看过的 Java 源码和面试真题

    48 一起看过的 Java 源码和面试真题 不为了源码而读源码,只为了更好的实践 持续几个月,我们的专栏终于结束了,这篇总结篇,我们又想回到当初写这篇专栏的初心:我们不为读源码而读源码,只是为了更好的 ...

  7. 面试官系统精讲Java源码及大厂真题 - 45 Socket 源码及面试题

    45 Socket 源码及面试题 引导语 Socket 中文翻译叫套接字,可能很多工作四五年的同学都没有用过这个 API,但只要用到这个 API 时,必然是在重要的工程的核心代码处. 大家平时基本都在 ...

  8. 面试官系统精讲Java源码及大厂真题 - 30 AbstractQueuedSynchronizer 源码解析(上)

    30 AbstractQueuedSynchronizer 源码解析(上) 不想当将军的士兵,不是好士兵. 引导语 AbstractQueuedSynchronizer 中文翻译叫做同步器,简称 AQ ...

  9. 面试官系统精讲Java源码及大厂真题 - 27 Thread 源码解析

    27 Thread 源码解析 书籍乃世人积累智慧之长明灯. 引导语 从本章开始我们开始学习线程的知识,线程是非常有趣的一个章节,大多数同学对于线程 API,属于不用就忘,到用时需要百度的情况,希望通过 ...

最新文章

  1. 淘宝装修:第一日 —— 图片轮播
  2. R使用neuralnet包构建神经网络回归模型并与线性回归模型对比实战
  3. 网站推广专员浅析网站推广运营如何提升企业网站转化率?
  4. 【一步到胃解决】Several ports (8005, 8080, 8009) required by Tomcat v9.0 Server at localhost are already
  5. 如何在Ubuntu中使用root账号登录(新增Ubuntu12.04/14.04)?
  6. MDT2012部署问题,USMT5.0 迁移哪些内容
  7. LUA 协程 Coroutine
  8. 机器人操作系统——ROS,Robot Operating System
  9. 学python需要c-为什么我建议每个开发人员都需要学 Python ?
  10. java后台怎么解密md5,Java md5 密码加解密
  11. 【干货】Chrome插件(扩展)开发全攻略
  12. Everything+Wox
  13. CTFSHOW 萌新赛 萌新记忆
  14. 滤波器的知识点(一)
  15. poj2248 DFS+剪枝 or BFS
  16. 微信小程序 使用canvas绘图
  17. Redis数据库中Hash哈希的介绍,常用命令和应用场景
  18. python爬取付费漫画_Python爬虫---爬取腾讯动漫全站漫画
  19. 挖洞经验 | 利用Slack应用程序窃取Slack用户的下载文件
  20. 使用jquery控制只能输入数字,并且关闭输入法

热门文章

  1. c# 智能升级程序代码(2)
  2. FCKEditor在.net的使用 cnblogs
  3. Magento: 获取客户信息 Get Customer’s Full Name, First Name, Last Name and Email Addres
  4. Bootstrap 3 移除输入框聚焦线 How to remove border (outline) around text/input boxes?
  5. html5游戏制作入门系列教程(四)
  6. node.js——麻将算法(七)简易版麻将出牌AI2.0
  7. [【转载】 linux进程间通信方式
  8. 解决nginx无法启动的问题——端口被占用
  9. C/C++——一个因为unsigned引发的大问题
  10. LeetCode 540. Single Element in a Sorted Array