线程模型

生产者Provider线程为一,主要进行深搜目录文件;、

消费者Consumer线程多个, 因为RPC服务调用时延较长, 启用多个线程请求服务。

持久化线程Persist 将已经消费的消息存放在writeQueue, 启用一个线程从writeQueue取数据进行持久化到log.pic,这样每次启动压缩的时候,可以避免重复消费。 进而避免同一目录进行多次压缩

api_key.properties 为申请的https://tinypng.com/ 的key ,每个key一个月可以压缩500张, 采用线程名的hashCode对key的个数取模运算,选择所要使用的key。

可扩展性

doCompress方法可以进行任意业务逻辑。只是我的实现是用来压缩图片了。

代码已push到github, 已经打好的jar也已上传。需要的可以clone一下。源码已打入jar包

如何使用

1、下载Compress.jar包

2、maven手动安装jar

mvn install:install-file -DgroupId=com.hearglobal -DartifactId=multi -Dversion=2.0 -Dpackaging=jar -Dfile=C:/Users/cbam/Desktop/Compress.jar

3、公共接口说明:

compress.setApi_key_location("/api_key.properties"); // 指定api_key文件位置 key需要申请 可选设置 默认为项目根路径 如E:/work/{project}/api_key.properties或/work/{project}/api_key.properties

compress.setPic_log_location("/log.pic"); //指定log.pic路径 可选配置 默认为项目根路径 如/work/{project}/log.pic 或 /work/{project}/log.pic

compress.compress("E:/upan/test", "E:/upan/test_bak", 3);//压缩调用 第一个参数为要压缩的目录 第二个参数为 压缩输出目录 第三个参数 启动的线程数

3、程序调用jar示例一

建议指定绝对路径

public static void main(String[] args) throws Exception {

Starter compress = new Starter();

compress.setApi_key_location("/api_key.properties");

compress.setPic_log_location("/log.pic");

compress.compress("E:/upan/test", "E:/upan/test_bak", 3);

}

4、程序调用jar示例二

打开win控制台或 在Linux shell下jar包当前目录 运行如下命令

java -jar Compress.jar E:/upan/test E:/upan/test_bak 3 //压缩调用 第一个参数为要压缩的目录 第二个参数为 压缩输出目录 第三个参数 启动的线程数

运行截图:

注意:此方式无法设置配置文件位置, 故请将文件api_key.properties默认放置在 jar包所在目录

生产者Provider代码:

package com.hearglobal.multi;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.BufferedReader;

import java.io.File;

import java.io.FileReader;

import java.io.IOException;

import java.util.LinkedList;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.Callable;

import java.util.concurrent.TimeUnit;

/**

* CopyRright (c)2014-2016 Haerbin Hearglobal Co.,Ltd

* Project: tinypngThread

* Comments:

* Author:cbam

* Create Date:2017/3/21

* Modified By:

* Modified Date:

* Modified Reason:

*/

public class Provider implements Callable{

private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

//消息队列

private BlockingQueue messageQueue;

private static LinkedList logList ;

//生产东西来源

private static File src;

/**

* Instantiates a new Provider.

*

*@param messageQueue the message queue

*@param src the src

*/

public Provider(BlockingQueue messageQueue, File src){

this.messageQueue = messageQueue;

this.src = src;

}

public Object call() throws Exception {

try {

load(src);

} catch (InterruptedException e) {

e.printStackTrace();

return false;

} finally {

return true;

}

}

private void load(File src) throws InterruptedException {

LOGGER.info("Provider load src:{}", src.getAbsolutePath());

// 当找到目录时,创建目录

if (src.isDirectory()) {

if(!logList.contains(src.getAbsolutePath())) {

if(!this.messageQueue.offer(src, 2, TimeUnit.SECONDS)) {

System.out.println("目录提交队列失败....");

};

}

File[] files = src.listFiles();

for(File file : files) {

load(file);

}

//当找到文件时

} else if (src.isFile()) {

if(validatePic(src) && !logList.contains(src.getAbsolutePath())) {

if(!this.messageQueue.offer(src, 2, TimeUnit.SECONDS)) {

System.out.println("文件提交队列失败....");

}

}

}

}

private boolean validatePic(File file) {

int loc = file.getAbsolutePath().lastIndexOf(".");

String suffix = file.getAbsolutePath().substring(++loc);

return suffix.equals("jpg") || suffix.equals("png");

}

/**

* Sets log list.

*

*@param logList the log list

*/

public static void setLogList(LinkedList logList) {

Provider.logList = logList;

}

}

消费者Consumer代码:

package com.hearglobal.multi;

import com.tinify.Options;

import com.tinify.Source;

import com.tinify.Tinify;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import javax.imageio.ImageIO;

import java.awt.image.BufferedImage;

import java.io.File;

import java.io.IOException;

import java.nio.file.Files;

import java.util.List;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.CopyOnWriteArrayList;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

/**

* CopyRright (c)2014-2016 Haerbin Hearglobal Co.,Ltd

* Project: tinypngThread

* Comments:

* Author:cbam

* Create Date:2017/3/21

* Modified By:

* Modified Date:

* Modified Reason:

*/

public class Consumer implements Runnable{

private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

//多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态

private volatile boolean isRunning = true;

private static String srcBase;

private static String destBase;

//已消费队列

private BlockingQueue writeQueue;

//消息队列

private BlockingQueue messageQueue;

private static Lock lock = new ReentrantLock();

private static List API_key = new CopyOnWriteArrayList<>();

private static File dest;

/**

* Instantiates a new Consumer.

*

*@param messageQueue the message queue

*@param writeQueue the write queue

*@param dest the dest

*@param srcBase the src base

*@param destBase the dest base

*/

public Consumer(BlockingQueue messageQueue,BlockingQueue writeQueue, File dest, String srcBase, String destBase){

this.messageQueue = messageQueue;

this.writeQueue = writeQueue;

this.dest = dest;

this.srcBase = srcBase;

this.destBase = destBase;

}

public void run() {

while(isRunning){

try {

lock.lock();

//获取数据

File file = this.messageQueue.poll(2, TimeUnit.SECONDS);

if(file == null) {

stop();

LOGGER.info("Current Consumer - {} - consume faild, messageQueue empty, thread is stopping...", Thread.currentThread().getName());

lock.unlock();

continue;

}

dest = new File(destBase + file.getAbsolutePath().substring(srcBase.length()));

lock.unlock();

//进行数据处理

doCompress(file, dest);

LOGGER.info("Current Consumer - {} - consume success, messageId : {} ", Thread.currentThread().getName(), file.getAbsolutePath());

writeQueue.add(file.getAbsolutePath());

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

private void doCompress(File src, File dest) {

LOGGER.info("Current Consumer - {} - Comsumer doCompress src:{}",Thread.currentThread().getName(), src.getAbsolutePath());

if(src.isDirectory()) {

dest.mkdirs();

} else {

Tinify.setKey(API_key.get(Thread.currentThread().getName().hashCode() % API_key.size()));

try {

Source source = Tinify.fromFile(src.getAbsolutePath());

BufferedImage bufferedImage = ImageIO.read(src);

if(bufferedImage.getWidth() > 800) {

Options options = new Options()

.with("method", "scale")

.with("width", 800);

Source resized = source.resize(options);

resized.toFile(dest.getAbsolutePath());

} else {

source.toFile(dest.getAbsolutePath());

}

} catch (Exception e) {

e.printStackTrace();

LOGGER.error("Current Consumer - {} - Consumer doCompress exception error:{}, src.path:{}", Thread.currentThread().getName(), e.getMessage(), src.getAbsolutePath());

copyFile(src, dest);

}

}

}

private void copyFile(File src, File dest) {

try {

LOGGER.info("Current Consumer - {} - src.path:{}, dest.path:{}", Thread.currentThread().getName(), src.getAbsolutePath(), dest.getAbsolutePath());

while(!dest.getParentFile().exists()) {

try {

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

// lock.lock();

Files.copy(src.toPath(), dest.toPath());

} catch (IOException e) {

e.printStackTrace();

LOGGER.error("Consumer copyFile cause error:{}, currentThread.getName:{}", e.getMessage(), Thread.currentThread().getName());

} finally {

// lock.unlock();

}

}

/**

* Stop.

*/

public void stop() {

isRunning = false;

}

/**

* Gets src base.

*

*@return the src base

*/

public static String getSrcBase() {

return srcBase;

}

/**

* Sets src base.

*

*@param srcBase the src base

*/

public static void setSrcBase(String srcBase) {

Consumer.srcBase = srcBase;

}

/**

* Gets dest base.

*

*@return the dest base

*/

public static String getDestBase() {

return destBase;

}

/**

* Sets dest base.

*

*@param destBase the dest base

*/

public static void setDestBase(String destBase) {

Consumer.destBase = destBase;

}

/**

* Gets api key.

*

*@return the api key

*/

public static List getAPI_key() {

return API_key;

}

/**

* Sets api key.

*

*@param API_key the api key

*/

public static void setAPI_key(List API_key) {

Consumer.API_key = API_key;

}

}

持久化线程Persist代码:

package com.hearglobal.multi;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.File;

import java.io.FileWriter;

import java.io.IOException;

import java.util.LinkedList;

import java.util.List;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeUnit;

/**

* CopyRright (c)2014-2016 Haerbin Hearglobal Co.,Ltd

* Project: tinypngThread

* Comments:

* Author:cbam

* Create Date:2017/3/21

* Modified By:

* Modified Date:

* Modified Reason:

*/

public class Persist extends Thread {

private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

private BlockingQueue writeQueue;

private boolean isRunning = true;

private static String pic_log_location = "log.pic";

private long start;

/**

* Instantiates a new Persist.

*

*@param writeQueue the write queue

*@param start the start

*/

public Persist(BlockingQueue writeQueue, long start) {

this.writeQueue = writeQueue;

this.start = start;

}

@Override

public void run() {

try {

while(!shouldStop()) {

Thread.sleep(10 * 1000);

List list = new LinkedList();

while(!writeQueue.isEmpty()) {

list.add(writeQueue.poll(5, TimeUnit.SECONDS));

}

doPersist(list);

}

LOGGER.info("Persist thread work finished");

LOGGER.info("耗时: {} ms",System.currentTimeMillis() - start);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

private void doPersist(List messageIds) throws IOException {

FileWriter writer = new FileWriter(pic_log_location, true);

for(String str : messageIds) {

writer.write(str + "\n");

}

writer.close();

}

/**

* Is running boolean.

*

*@return the boolean

*/

public boolean isRunning() {

return isRunning;

}

/**

* Sets running.

*

*@param running the running

*/

public void setRunning(boolean running) {

isRunning = running;

}

//persist线程终止的条件是 所有消费线程已停止 && 当前 已消费消息队列为空

private boolean shouldStop() {

return isRunning == false && writeQueue.size() == 0;

}

public static void setPic_log_location(String pic_log_location) {

Persist.pic_log_location = pic_log_location;

}

}

启动类 Starter代码:

package com.hearglobal.multi;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.*;

import java.util.LinkedList;

import java.util.List;

import java.util.concurrent.*;

/**

* CopyRright (c)2014-2016 Haerbin Hearglobal Co.,Ltd

* Project: tinypngThread

* Comments:

* Author:cbam

* Create Date:2017/3/21

* Modified By:

* Modified Date:

* Modified Reason:

*/

public class Starter {

private static CountDownLatch cdl = new CountDownLatch(1);

private static final Logger LOGGER = LoggerFactory.getLogger(Starter.class);

private static int coreNumber = Runtime.getRuntime().availableProcessors();

private static List API_key = new CopyOnWriteArrayList<>();

private static LinkedList logList = new LinkedList<>() ;

private static String api_key_location = "api_key.properties";

private static String pic_log_location = "log.pic";

private static String src;

private static String dest;

private static int threshold;

private static long start;

/**

* Main.

*

*@param args the args

*/

public static void main(String[] args){

String src = args[0];

String dest = args[1];

threshold = args.length < 3 || args[2] == null || args[2].equals("") ? coreNumber : Integer.parseInt(args[2]);

try {

new Starter().compress(src, dest, threshold);

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* Compress.

*

*@param src the src

*@param dest the dest

*@param threshold the threshold

*@throws Exception the exception

*/

public void compress(String src, String dest, int threshold) throws Exception {

start = System.currentTimeMillis();

if(!this.checkParam(src, dest)) {

throw new IllegalArgumentException("参数不正确!");

}

this.src = src;

this.dest = dest;

this.threshold = threshold <= 0 ? coreNumber : threshold;

LOGGER.info("运行参数:{}, {}, {}", src, dest, threshold);

try {

//消息队列

BlockingQueue messageQueue = new LinkedBlockingQueue<>();

//已消费队列 用于已消费消息的持久化 使得单线程对文件读写

final BlockingQueue writeQueue = new LinkedBlockingQueue<>();

ExecutorService cachePool = Executors.newCachedThreadPool();

//生产者提交 最多启动一个

Provider p = new Provider(messageQueue, new File(src));

initProvider();

p.setLogList(logList);

Future future = cachePool.submit(p);

//阻塞到provide 执行完成

while(!future.get()) {

future = cachePool.submit(p);

}

initConsumer();

//消费者提交

for(int i = 0; i < threshold; i++) {

Consumer c = new Consumer(messageQueue, writeQueue, new File(dest), src, dest);

c.setAPI_key(API_key);

cachePool.execute(c);

}

System.out.println(start + "start");

//启动持久化消息线程工作 消费writeQueue

Persist perssit = new Persist(writeQueue, start);

perssit.setPic_log_location(pic_log_location);

perssit.start();

cachePool.shutdown();

while(true) {

if(cachePool.isTerminated()) {

perssit.setRunning(false);

LOGGER.info("compress has finished !");

break;

}

}

} catch (Exception e) {

e.printStackTrace();

throw new Exception("压缩失败");

}

}

/**

* Check param boolean.

*

*@param src the src

*@param dest the dest

*@return the boolean

*/

private boolean checkParam(String src, String dest) {

if(src == null || dest == null || src.equals("") || dest.equals("")) {

LOGGER.error("参数为空!");

return false;

}

int loc_src = src.lastIndexOf(".");

String suffix_src = src.substring(++loc_src);

int loc_dest = dest.lastIndexOf(".");

String suffix_dest = src.substring(++loc_dest);

if(suffix_dest.equals("png") && suffix_src.equals("jpg")) {

LOGGER.error("错误图片类型转换!");

return false;

} else if(suffix_dest.equals("jpg") && suffix_src.equals("png")) {

LOGGER.error("错误图片类型转换!");

return false;

} else if(! new File(src).exists()) {

LOGGER.error("src 路径未找到!");

return false;

} else {

return true;

}

}

private void initConsumer() {

String line = "";

try {

//BufferedReader buff = new BufferedReader(new FileReader("E:\\workplace\\tinypngThread\\target\\api_key.properties"));

BufferedReader buff = new BufferedReader(new FileReader(api_key_location));

while ((line=buff.readLine()) != null) {

LOGGER.info("API_key => {} ",line);

API_key.add(line);

}

buff.close();

} catch (IOException e) {

e.printStackTrace();

LOGGER.error("Consumer initConsumer error:{}", e.getMessage());

}

}

/**

* Init provider.

*

*@throws IOException the io exception

*/

private static void initProvider() throws IOException {

LOGGER.info("Provider initProvider invoked");

File file = new File(pic_log_location);

if(!file.exists()) {

file.createNewFile();

}

FileReader reader = new FileReader(pic_log_location);

BufferedReader br = new BufferedReader(reader);

String str;

while((str = br.readLine()) != null) {

logList.add(str);

}

}

/**

* Sets api key location.

*

*@param api_key_location the api key location

*/

public static void setApi_key_location(String api_key_location) {

Starter.api_key_location = api_key_location;

}

/**

* Sets pic log location.

*

*@param pic_log_location the pic log location

*/

public static void setPic_log_location(String pic_log_location) {

Starter.pic_log_location = pic_log_location;

}

}

java 多线程 压缩_Java 多线程拷贝文件夹并调用tinyPng算法接口压缩图片实现(生产消费变种)...相关推荐

  1. java文件名大小_java 比较指定文件夹内.txt文件名的大小

    我写了一个程序:每过一定时间向指定文件夹内写一个按时间命名的.txt文件现在想实现最新的txt文件和最原先的txt文件内容比对但是不知道如何获得这两个文家的内容.packagetest;import. ...

  2. java创建文件目录_java创建目录或文件夹的方法?

    展开全部 1.File类的createNewFile根据抽象路径e5a48de588b662616964757a686964616f31333337393532创建一个新的空文件,当抽象路径制定的文件 ...

  3. java 打开目录_java 如何打开文件夹(包括文件夹内的文件夹)!

    展开全部 文件太多,过滤了下后缀为.mp3的所有文件: package cn.zhidao.file; import java.io.File; /** * * @author Administrat ...

  4. java 级联删除_java 级联删除文件夹下的所有文件

    public void deletefile(String delpath) throws Exception { try { File file = new File(delpath); // 当且 ...

  5. 使用多线程拷贝文件夹

    import java.io.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Execut ...

  6. java 包含文件_java 文件夹拷贝(文件夹里包含文件和文件夹) 代码

    java代码实现文件夹拷贝,文件夹可能包含文件夹和文件import java.io.BufferedReader; import java.io.File; import java.io.FileIn ...

  7. java中拷贝文件的代码_拷贝文件夹中的所有文件到另外一个文件夹

    [java]代码库/** * * 拷贝文件夹中的所有文件到另外一个文件夹 * * @param srcDirector * 源文件夹 * * @param desDirector * 目标文件夹 * ...

  8. Java io流---拷贝文件夹下的所有文件和目录

    Java io流-拷贝文件夹下的所有文件和目录 代码: package demo01;import java.io.*; import java.util.TreeMap;public class C ...

  9. java 拷贝文件夹的实现

    public class CopyFiles {public static void main(String[] args) throws Exception {String src = " ...

最新文章

  1. 取得cpu核心序号_cpu的性能指标有哪些?
  2. 【前端】:jQuery上
  3. python debug【】
  4. 哥伦比亚大学计算机工程面试题
  5. MySQL:8种SQL典型错误用法,值得收藏!
  6. eclipse导出doc文档
  7. 基于arduino的光控窗帘_分别基于STM32和Arduino的智能窗帘硬件分析与程序设计
  8. 高级函数技巧-函数柯里化
  9. 计算机用户在使用计算机文件时6,201606-计算机基础选择题(含答案)(6页)-原创力文档...
  10. java许愿墙_wishingWall 一个好看的许愿墙板块,功能强大,页面美观 Java Develop 238万源代码下载- www.pudn.com...
  11. html动态线条背景鼠标,动态背景线条鼠标移动线条汇聚---背景特效
  12. Ra-08系列开发板入门教程,标准LoRaWAN对接私有服务器。
  13. Linux系统下挂载Windows分区的方法和技巧
  14. 软件构造(三) 软件构造过程与配置管理
  15. 沈海高速汕尾往深圳服务器维护报价,沈海高速收费
  16. 【Python数据科学】多表关联 merge、join、concat
  17. 浏览器-点击预览视频文件(自动播放、循环播放)
  18. 多线程并发下集合不安全类-ArrayList
  19. 第三章 模糊查询与分组查询 ② 代码
  20. google APP 说明

热门文章

  1. 通用接口开放平台设计与实现——(2)功能架构
  2. 苹果应用商店ASO优化技巧
  3. 老婆:“给我讲讲你们程序员好笑的事情呗?”我扔给她这篇文章,她狂笑不止!
  4. 安卓开发——android8.0应用崩溃,报错: Only fullscreen opaque activities can request orientation
  5. (RN)Region Normalization for Image Inpainting
  6. android studio 如何上传文件到模拟器的根目录
  7. HTML5期末大作业:电影在线网站设计——漫威电影(2页) 免费大学生网页设计制作作业作品下载dreamweaver制作静态html网页设计作业作
  8. FP、FN、TP、TN、精确率(Precision)、召回率(Recall)、准确率(Accuracy)是什么意思
  9. The Lost House POJ - 2057(树形dp+贪心 (双线最优子结构问题))
  10. android 支付宝微信原生以及HTML调用原生SDK