Java5中的线程池实例讲解
Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了 Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。
简介
本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:
1. 建立监听端口。
2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。
这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销 毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器 模型将如下:
1. 建立监听端口,创建线程池。
2. 发现有新连接,使用线程池来执行服务任务。
3. 服务完毕,释放线程到线程池。
下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。
初始化
初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态 方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个 java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方 法来建立线程池。
ExecutorService pool = Executors.newFixedThreadPool(10);
表示新建了一个线程池,线程池里面有10个线程为任务队列服务。
使用ServerSocket对象来初始化监听端口。
private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);
服务新连接
当有新连接建立时,accept返回时,将服务任务提交给线程池执行。
while(true){
Socket socket = serverListenSocket.accept();
pool.execute(new ServiceThread(socket));
}
这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。
服务任务
服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此 ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个 线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:
private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
int ret = 0;
try{
lock.lock();
ret = count;
}finally{
lock.unlock();
}
return ret;
}
private void increaseCount(){
try{
lock.lock();
++count;
}finally{
lock.unlock();
}
}
服务线程在开始给客户端打印一个欢迎信息,
increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+"\r\n";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());
然后使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费 时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执 行完毕,则无需等待即可获得结果,如果还在执行,则等待到运行完毕。
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other".getBytes());
String result = future.get();
dos.write(result.getBytes());
其中TimeConsumingTask实现了Callable接口
class TimeConsumingTask implements Callable {
public String call() throws Exception {
System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
return "ok, here's the result: It takes me lots of time to produce this result";
}
}
这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数, 其作用类似与Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。在这段程序中,我们提交了 一个Callable的任务,然后程序不会堵塞,而是继续执行dos.write("let's do soemthing other".getBytes());当程序执行到String result = future.get()时如果call函数已经执行完毕,则取得返回值,如果还在执行,则等待其执行完毕。
服务器端的完整实现
服务器端的完整实现代码如下:
- package demo;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.io.Serializable;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.RejectedExecutionHandler;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.ReentrantLock;
- public class Server
- {
- private static int produceTaskSleepTime = 100;
- private static int consumeTaskSleepTime = 1200;
- private static int produceTaskMaxNumber = 100;
- private static final int CORE_POOL_SIZE = 2;
- private static final int MAX_POOL_SIZE = 100;
- private static final int KEEPALIVE_TIME = 3;
- private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
- private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
- private static final String HOST = "127.0.0.1";
- private static final int PORT = 19527;
- private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
- // private ThreadPoolExecutor serverThreadPool = null;
- private ExecutorService pool = null;
- private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
- private ServerSocket serverListenSocket = null;
- private int times = 5;
- public void start()
- {
- // You can also init thread pool in this way.
- /*
- * serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue, rejectedExecutionHandler);
- */
- pool = Executors.newFixedThreadPool(10);
- try
- {
- serverListenSocket = new ServerSocket(PORT);
- serverListenSocket.setReuseAddress(true);
- System.out.println("I'm listening");
- while (times-- > 0)
- {
- Socket socket = serverListenSocket.accept();
- String welcomeString = "hello";
- // serverThreadPool.execute(new ServiceThread(socket, welcomeString));
- pool.execute(new ServiceThread(socket));
- }
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- cleanup();
- }
- public void cleanup()
- {
- if (null != serverListenSocket)
- {
- try
- {
- serverListenSocket.close();
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- // serverThreadPool.shutdown();
- pool.shutdown();
- // 调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法, 那么线程池会一直保持下去,以便随时添加新的任务。interrupt():只有阻塞(sleep,wait,join的线程调用他们的 interrupt()才起作用,正在运行的线程不起作用也不抛异常)
- }
- public static void main(String args[])
- {
- Server server = new Server();
- server.start();
- }
- }
- class ServiceThread implements Runnable, Serializable
- {
- private static final long serialVersionUID = 0;
- private Socket connectedSocket = null;
- private String helloString = null;
- private static int count = 0;
- private static ReentrantLock lock = new ReentrantLock();
- ServiceThread(Socket socket)
- {
- connectedSocket = socket;
- }
- public void run()
- {
- increaseCount();
- int curCount = getCount();
- helloString = "hello, id = " + curCount + "\r\n";
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<String> future = executor.submit(new TimeConsumingTask());
- DataOutputStream dos = null;
- try
- {
- dos = new DataOutputStream(connectedSocket.getOutputStream());
- dos.write(helloString.getBytes());
- try
- {
- dos.write("let's do soemthing other.\r\n".getBytes());
- String result = future.get();
- dos.write(result.getBytes());
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- catch (ExecutionException e)
- {
- e.printStackTrace();
- }
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- finally
- {
- if (null != connectedSocket)
- {
- try
- {
- connectedSocket.close();
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- if (null != dos)
- {
- try
- {
- dos.close();
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- executor.shutdown();
- }
- }
- private int getCount()
- {
- int ret = 0;
- try
- {
- lock.lock();
- ret = count;
- }
- finally
- {
- lock.unlock();
- }
- return ret;
- }
- private void increaseCount()
- {
- try
- {
- lock.lock();
- ++count;
- }
- finally
- {
- lock.unlock();
- }
- }
- }
- class TimeConsumingTask implements Callable<String>
- {
- public String call() throws Exception
- {
- System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
- return "ok, here's the result: It takes me lots of time to produce this result";
- }
- }
转载于:https://www.cnblogs.com/sand-tiny/p/3511722.html
Java5中的线程池实例讲解相关推荐
- java mina多线程_mina2中的线程池
一.Mina中的线程池模型 前面介绍了Mina总体的层次结构,那么在Mina里面是怎么使用Java NIO和进行线程调度的呢?这是提高IO处理性能的关键所在.Mina的线程调度原理主要如下图所示: A ...
- async spring 默认线程池_Spring boot注解@Async线程池实例详解
这篇文章主要介绍了Spring boot注解@Async线程池实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从Spring3开始提供了@A ...
- Java-Java中的线程池原理分析及使用
文章目录 概述 线程池的优点 线程池的实现原理 线程池的使用 创建线程池 向线程池中提交任务 关闭线程池 合理的配置线程池 线程池的监控 概述 我们在上篇博文 Java-多线程框架Executor解读 ...
- 万字图文 | 学会Java中的线程池,这一篇也许就够了!
来源:一枝花算不算浪漫 线程池原理思维导图.png 前言 Java中的线程池已经不是什么神秘的技术了,相信在看的读者在项目中也都有使用过.关于线程池的文章也是数不胜数,我们站在巨人的肩膀上来再次梳理一 ...
- C#中的线程池使用(一)
1 线程池的概念 许多应用程序使用多个线程,但这些线程经常在休眠状态中耗费大量的时间来等待事件发生.其他线程可能进入休眠状态,并且仅定期被唤醒以轮询更改或更新状态信息,然后再次进入休眠状态.为了简化 ...
- 在Python网络爬虫程序中使用线程池
在Python网络爬虫程序中使用线程池 一.为什么需要使用线程池 二.线程池的使用 2.1 线程池的类与方法 2.2 使用线程池的一般步骤 三.在爬虫程序中使用线程池的实例 一.为什么需要使用线程池 ...
- python 在主线程开线程_Python开启线程,在函数中开线程的实例
逻辑处理上分成了多个模块,为了提高效率,前一个模块处理完调用后一个模块操作时使用多线程 我这里遇到的情形是前面取数据后面存到mysql,发现单线程效率很低,改为取数据后开线程存到mysql 开启线程之 ...
- python停止线程池_详解python中Threadpool线程池任务终止示例代码
需求 加入我们需要处理一串个位数(0~9),奇数时需要循环打印它:偶数则等待对应时长并完成所有任务:0则是错误,但不需要终止任务,可以自定义一些处理. 关键点 定义func函数处理需求 callbac ...
- python可以开多少线程_Python开启线程,在函数中开线程的实例
逻辑处理上分成了多个模块,为了提高效率,前一个模块处理完调用后一个模块操作时使用多线程 我这里遇到的情形是前面取数据后面存到mysql,发现单线程效率很低,改为取数据后开线程存到mysql 开启线程之 ...
最新文章
- MOOON-scheduler核心设计图(初稿)
- Hadoop历代版本及其特点
- github加速-解决GitHub访问速度很慢的问题
- .Net Core应用框架Util介绍(二)
- java注解编程_Java注解编程原理
- layui 渲染select下拉选项 ,日期控件的用法
- 软件工程形式化技术简介
- Cinchoo ETL-对大型CSV文件进行排序
- android 添加新用户,华为手机怎么添加新用户?华为手机添加新用户的方法
- 自动驾驶——驾驶员反应时间的文献调研
- 输出矩阵的左下半三角
- java远程方法调用(rmi)--好_java 远程方法调用(RMI)
- Android 动画分类
- 2022-2027年中国电容器行业市场全景评估及发展战略规划报告
- java编程语言怎么学习,详细说明
- oracle按序号排序,Oracle排序以及序号的显示
- ESP32开发路程——环境搭建、引脚、烧录、UART、ADC、WS2812、RFID、DAC、FreeRTOS、CJSON
- py实战绘制人口金字塔图
- Sky Computing
- Mac 设置 xdebug + Sublime 方法整理
热门文章
- esl8266开发之旅_从ESL老师到越南软件开发人员的旅程
- 物联网与互联网的6大区别
- java 判断是否为邮箱_Java判断邮箱是否存在 有返回值
- Python培训班线上线下哪种靠谱
- 2021年web前端发展方向有哪些
- Chameleon跨端框架——壹个理想主义团队的开源作品
- Java图形化:布局方式
- docker-dockerfile
- windows下配置redis集群,启动节点报错:createing server TCP listening socket *:7000:listen:Unknown error...
- 倍福TwinCAT(贝福Beckhoff)基础教程5.1 TwinCAT-2 运行可执行文件