java 多线程 压缩_Java 多线程拷贝文件夹并调用tinyPng算法接口压缩图片实现(生产消费变种)...
线程模型
生产者Provider线程为一,主要进行深搜目录文件;、
消费者Consumer线程多个, 因为RPC服务调用时延较长, 启用多个线程请求服务。
持久化线程Persist 将已经消费的消息存放在writeQueue, 启用一个线程从writeQueue取数据进行持久化到log.pic,这样每次启动压缩的时候,可以避免重复消费。 进而避免同一目录进行多次压缩
api_key.properties 为申请的https://tinypng.com/ 的key ,每个key一个月可以压缩500张, 采用线程名的hashCode对key的个数取模运算,选择所要使用的key。
可扩展性
doCompress方法可以进行任意业务逻辑。只是我的实现是用来压缩图片了。
代码已push到github, 已经打好的jar也已上传。需要的可以clone一下。源码已打入jar包
如何使用
1、下载Compress.jar包
2、maven手动安装jar
mvn install:install-file -DgroupId=com.hearglobal -DartifactId=multi -Dversion=2.0 -Dpackaging=jar -Dfile=C:/Users/cbam/Desktop/Compress.jar
3、公共接口说明:
compress.setApi_key_location("/api_key.properties"); // 指定api_key文件位置 key需要申请 可选设置 默认为项目根路径 如E:/work/{project}/api_key.properties或/work/{project}/api_key.properties
compress.setPic_log_location("/log.pic"); //指定log.pic路径 可选配置 默认为项目根路径 如/work/{project}/log.pic 或 /work/{project}/log.pic
compress.compress("E:/upan/test", "E:/upan/test_bak", 3);//压缩调用 第一个参数为要压缩的目录 第二个参数为 压缩输出目录 第三个参数 启动的线程数
3、程序调用jar示例一
建议指定绝对路径
public static void main(String[] args) throws Exception {
Starter compress = new Starter();
compress.setApi_key_location("/api_key.properties");
compress.setPic_log_location("/log.pic");
compress.compress("E:/upan/test", "E:/upan/test_bak", 3);
}
4、程序调用jar示例二
打开win控制台或 在Linux shell下jar包当前目录 运行如下命令
java -jar Compress.jar E:/upan/test E:/upan/test_bak 3 //压缩调用 第一个参数为要压缩的目录 第二个参数为 压缩输出目录 第三个参数 启动的线程数
运行截图:
注意:此方式无法设置配置文件位置, 故请将文件api_key.properties默认放置在 jar包所在目录
生产者Provider代码:
package com.hearglobal.multi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* CopyRright (c)2014-2016 Haerbin Hearglobal Co.,Ltd
* Project: tinypngThread
* Comments:
* Author:cbam
* Create Date:2017/3/21
* Modified By:
* Modified Date:
* Modified Reason:
*/
public class Provider implements Callable{
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
//消息队列
private BlockingQueue messageQueue;
private static LinkedList logList ;
//生产东西来源
private static File src;
/**
* Instantiates a new Provider.
*
*@param messageQueue the message queue
*@param src the src
*/
public Provider(BlockingQueue messageQueue, File src){
this.messageQueue = messageQueue;
this.src = src;
}
public Object call() throws Exception {
try {
load(src);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} finally {
return true;
}
}
private void load(File src) throws InterruptedException {
LOGGER.info("Provider load src:{}", src.getAbsolutePath());
// 当找到目录时,创建目录
if (src.isDirectory()) {
if(!logList.contains(src.getAbsolutePath())) {
if(!this.messageQueue.offer(src, 2, TimeUnit.SECONDS)) {
System.out.println("目录提交队列失败....");
};
}
File[] files = src.listFiles();
for(File file : files) {
load(file);
}
//当找到文件时
} else if (src.isFile()) {
if(validatePic(src) && !logList.contains(src.getAbsolutePath())) {
if(!this.messageQueue.offer(src, 2, TimeUnit.SECONDS)) {
System.out.println("文件提交队列失败....");
}
}
}
}
private boolean validatePic(File file) {
int loc = file.getAbsolutePath().lastIndexOf(".");
String suffix = file.getAbsolutePath().substring(++loc);
return suffix.equals("jpg") || suffix.equals("png");
}
/**
* Sets log list.
*
*@param logList the log list
*/
public static void setLogList(LinkedList logList) {
Provider.logList = logList;
}
}
消费者Consumer代码:
package com.hearglobal.multi;
import com.tinify.Options;
import com.tinify.Source;
import com.tinify.Tinify;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* CopyRright (c)2014-2016 Haerbin Hearglobal Co.,Ltd
* Project: tinypngThread
* Comments:
* Author:cbam
* Create Date:2017/3/21
* Modified By:
* Modified Date:
* Modified Reason:
*/
public class Consumer implements Runnable{
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
//多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
private volatile boolean isRunning = true;
private static String srcBase;
private static String destBase;
//已消费队列
private BlockingQueue writeQueue;
//消息队列
private BlockingQueue messageQueue;
private static Lock lock = new ReentrantLock();
private static List API_key = new CopyOnWriteArrayList<>();
private static File dest;
/**
* Instantiates a new Consumer.
*
*@param messageQueue the message queue
*@param writeQueue the write queue
*@param dest the dest
*@param srcBase the src base
*@param destBase the dest base
*/
public Consumer(BlockingQueue messageQueue,BlockingQueue writeQueue, File dest, String srcBase, String destBase){
this.messageQueue = messageQueue;
this.writeQueue = writeQueue;
this.dest = dest;
this.srcBase = srcBase;
this.destBase = destBase;
}
public void run() {
while(isRunning){
try {
lock.lock();
//获取数据
File file = this.messageQueue.poll(2, TimeUnit.SECONDS);
if(file == null) {
stop();
LOGGER.info("Current Consumer - {} - consume faild, messageQueue empty, thread is stopping...", Thread.currentThread().getName());
lock.unlock();
continue;
}
dest = new File(destBase + file.getAbsolutePath().substring(srcBase.length()));
lock.unlock();
//进行数据处理
doCompress(file, dest);
LOGGER.info("Current Consumer - {} - consume success, messageId : {} ", Thread.currentThread().getName(), file.getAbsolutePath());
writeQueue.add(file.getAbsolutePath());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void doCompress(File src, File dest) {
LOGGER.info("Current Consumer - {} - Comsumer doCompress src:{}",Thread.currentThread().getName(), src.getAbsolutePath());
if(src.isDirectory()) {
dest.mkdirs();
} else {
Tinify.setKey(API_key.get(Thread.currentThread().getName().hashCode() % API_key.size()));
try {
Source source = Tinify.fromFile(src.getAbsolutePath());
BufferedImage bufferedImage = ImageIO.read(src);
if(bufferedImage.getWidth() > 800) {
Options options = new Options()
.with("method", "scale")
.with("width", 800);
Source resized = source.resize(options);
resized.toFile(dest.getAbsolutePath());
} else {
source.toFile(dest.getAbsolutePath());
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("Current Consumer - {} - Consumer doCompress exception error:{}, src.path:{}", Thread.currentThread().getName(), e.getMessage(), src.getAbsolutePath());
copyFile(src, dest);
}
}
}
private void copyFile(File src, File dest) {
try {
LOGGER.info("Current Consumer - {} - src.path:{}, dest.path:{}", Thread.currentThread().getName(), src.getAbsolutePath(), dest.getAbsolutePath());
while(!dest.getParentFile().exists()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// lock.lock();
Files.copy(src.toPath(), dest.toPath());
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Consumer copyFile cause error:{}, currentThread.getName:{}", e.getMessage(), Thread.currentThread().getName());
} finally {
// lock.unlock();
}
}
/**
* Stop.
*/
public void stop() {
isRunning = false;
}
/**
* Gets src base.
*
*@return the src base
*/
public static String getSrcBase() {
return srcBase;
}
/**
* Sets src base.
*
*@param srcBase the src base
*/
public static void setSrcBase(String srcBase) {
Consumer.srcBase = srcBase;
}
/**
* Gets dest base.
*
*@return the dest base
*/
public static String getDestBase() {
return destBase;
}
/**
* Sets dest base.
*
*@param destBase the dest base
*/
public static void setDestBase(String destBase) {
Consumer.destBase = destBase;
}
/**
* Gets api key.
*
*@return the api key
*/
public static List getAPI_key() {
return API_key;
}
/**
* Sets api key.
*
*@param API_key the api key
*/
public static void setAPI_key(List API_key) {
Consumer.API_key = API_key;
}
}
持久化线程Persist代码:
package com.hearglobal.multi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* CopyRright (c)2014-2016 Haerbin Hearglobal Co.,Ltd
* Project: tinypngThread
* Comments:
* Author:cbam
* Create Date:2017/3/21
* Modified By:
* Modified Date:
* Modified Reason:
*/
public class Persist extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
private BlockingQueue writeQueue;
private boolean isRunning = true;
private static String pic_log_location = "log.pic";
private long start;
/**
* Instantiates a new Persist.
*
*@param writeQueue the write queue
*@param start the start
*/
public Persist(BlockingQueue writeQueue, long start) {
this.writeQueue = writeQueue;
this.start = start;
}
@Override
public void run() {
try {
while(!shouldStop()) {
Thread.sleep(10 * 1000);
List list = new LinkedList();
while(!writeQueue.isEmpty()) {
list.add(writeQueue.poll(5, TimeUnit.SECONDS));
}
doPersist(list);
}
LOGGER.info("Persist thread work finished");
LOGGER.info("耗时: {} ms",System.currentTimeMillis() - start);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doPersist(List messageIds) throws IOException {
FileWriter writer = new FileWriter(pic_log_location, true);
for(String str : messageIds) {
writer.write(str + "\n");
}
writer.close();
}
/**
* Is running boolean.
*
*@return the boolean
*/
public boolean isRunning() {
return isRunning;
}
/**
* Sets running.
*
*@param running the running
*/
public void setRunning(boolean running) {
isRunning = running;
}
//persist线程终止的条件是 所有消费线程已停止 && 当前 已消费消息队列为空
private boolean shouldStop() {
return isRunning == false && writeQueue.size() == 0;
}
public static void setPic_log_location(String pic_log_location) {
Persist.pic_log_location = pic_log_location;
}
}
启动类 Starter代码:
package com.hearglobal.multi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
/**
* CopyRright (c)2014-2016 Haerbin Hearglobal Co.,Ltd
* Project: tinypngThread
* Comments:
* Author:cbam
* Create Date:2017/3/21
* Modified By:
* Modified Date:
* Modified Reason:
*/
public class Starter {
private static CountDownLatch cdl = new CountDownLatch(1);
private static final Logger LOGGER = LoggerFactory.getLogger(Starter.class);
private static int coreNumber = Runtime.getRuntime().availableProcessors();
private static List API_key = new CopyOnWriteArrayList<>();
private static LinkedList logList = new LinkedList<>() ;
private static String api_key_location = "api_key.properties";
private static String pic_log_location = "log.pic";
private static String src;
private static String dest;
private static int threshold;
private static long start;
/**
* Main.
*
*@param args the args
*/
public static void main(String[] args){
String src = args[0];
String dest = args[1];
threshold = args.length < 3 || args[2] == null || args[2].equals("") ? coreNumber : Integer.parseInt(args[2]);
try {
new Starter().compress(src, dest, threshold);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Compress.
*
*@param src the src
*@param dest the dest
*@param threshold the threshold
*@throws Exception the exception
*/
public void compress(String src, String dest, int threshold) throws Exception {
start = System.currentTimeMillis();
if(!this.checkParam(src, dest)) {
throw new IllegalArgumentException("参数不正确!");
}
this.src = src;
this.dest = dest;
this.threshold = threshold <= 0 ? coreNumber : threshold;
LOGGER.info("运行参数:{}, {}, {}", src, dest, threshold);
try {
//消息队列
BlockingQueue messageQueue = new LinkedBlockingQueue<>();
//已消费队列 用于已消费消息的持久化 使得单线程对文件读写
final BlockingQueue writeQueue = new LinkedBlockingQueue<>();
ExecutorService cachePool = Executors.newCachedThreadPool();
//生产者提交 最多启动一个
Provider p = new Provider(messageQueue, new File(src));
initProvider();
p.setLogList(logList);
Future future = cachePool.submit(p);
//阻塞到provide 执行完成
while(!future.get()) {
future = cachePool.submit(p);
}
initConsumer();
//消费者提交
for(int i = 0; i < threshold; i++) {
Consumer c = new Consumer(messageQueue, writeQueue, new File(dest), src, dest);
c.setAPI_key(API_key);
cachePool.execute(c);
}
System.out.println(start + "start");
//启动持久化消息线程工作 消费writeQueue
Persist perssit = new Persist(writeQueue, start);
perssit.setPic_log_location(pic_log_location);
perssit.start();
cachePool.shutdown();
while(true) {
if(cachePool.isTerminated()) {
perssit.setRunning(false);
LOGGER.info("compress has finished !");
break;
}
}
} catch (Exception e) {
e.printStackTrace();
throw new Exception("压缩失败");
}
}
/**
* Check param boolean.
*
*@param src the src
*@param dest the dest
*@return the boolean
*/
private boolean checkParam(String src, String dest) {
if(src == null || dest == null || src.equals("") || dest.equals("")) {
LOGGER.error("参数为空!");
return false;
}
int loc_src = src.lastIndexOf(".");
String suffix_src = src.substring(++loc_src);
int loc_dest = dest.lastIndexOf(".");
String suffix_dest = src.substring(++loc_dest);
if(suffix_dest.equals("png") && suffix_src.equals("jpg")) {
LOGGER.error("错误图片类型转换!");
return false;
} else if(suffix_dest.equals("jpg") && suffix_src.equals("png")) {
LOGGER.error("错误图片类型转换!");
return false;
} else if(! new File(src).exists()) {
LOGGER.error("src 路径未找到!");
return false;
} else {
return true;
}
}
private void initConsumer() {
String line = "";
try {
//BufferedReader buff = new BufferedReader(new FileReader("E:\\workplace\\tinypngThread\\target\\api_key.properties"));
BufferedReader buff = new BufferedReader(new FileReader(api_key_location));
while ((line=buff.readLine()) != null) {
LOGGER.info("API_key => {} ",line);
API_key.add(line);
}
buff.close();
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Consumer initConsumer error:{}", e.getMessage());
}
}
/**
* Init provider.
*
*@throws IOException the io exception
*/
private static void initProvider() throws IOException {
LOGGER.info("Provider initProvider invoked");
File file = new File(pic_log_location);
if(!file.exists()) {
file.createNewFile();
}
FileReader reader = new FileReader(pic_log_location);
BufferedReader br = new BufferedReader(reader);
String str;
while((str = br.readLine()) != null) {
logList.add(str);
}
}
/**
* Sets api key location.
*
*@param api_key_location the api key location
*/
public static void setApi_key_location(String api_key_location) {
Starter.api_key_location = api_key_location;
}
/**
* Sets pic log location.
*
*@param pic_log_location the pic log location
*/
public static void setPic_log_location(String pic_log_location) {
Starter.pic_log_location = pic_log_location;
}
}
java 多线程 压缩_Java 多线程拷贝文件夹并调用tinyPng算法接口压缩图片实现(生产消费变种)...相关推荐
- java文件名大小_java 比较指定文件夹内.txt文件名的大小
我写了一个程序:每过一定时间向指定文件夹内写一个按时间命名的.txt文件现在想实现最新的txt文件和最原先的txt文件内容比对但是不知道如何获得这两个文家的内容.packagetest;import. ...
- java创建文件目录_java创建目录或文件夹的方法?
展开全部 1.File类的createNewFile根据抽象路径e5a48de588b662616964757a686964616f31333337393532创建一个新的空文件,当抽象路径制定的文件 ...
- java 打开目录_java 如何打开文件夹(包括文件夹内的文件夹)!
展开全部 文件太多,过滤了下后缀为.mp3的所有文件: package cn.zhidao.file; import java.io.File; /** * * @author Administrat ...
- java 级联删除_java 级联删除文件夹下的所有文件
public void deletefile(String delpath) throws Exception { try { File file = new File(delpath); // 当且 ...
- 使用多线程拷贝文件夹
import java.io.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Execut ...
- java 包含文件_java 文件夹拷贝(文件夹里包含文件和文件夹) 代码
java代码实现文件夹拷贝,文件夹可能包含文件夹和文件import java.io.BufferedReader; import java.io.File; import java.io.FileIn ...
- java中拷贝文件的代码_拷贝文件夹中的所有文件到另外一个文件夹
[java]代码库/** * * 拷贝文件夹中的所有文件到另外一个文件夹 * * @param srcDirector * 源文件夹 * * @param desDirector * 目标文件夹 * ...
- Java io流---拷贝文件夹下的所有文件和目录
Java io流-拷贝文件夹下的所有文件和目录 代码: package demo01;import java.io.*; import java.util.TreeMap;public class C ...
- java 拷贝文件夹的实现
public class CopyFiles {public static void main(String[] args) throws Exception {String src = " ...
最新文章
- 取得cpu核心序号_cpu的性能指标有哪些?
- 【前端】:jQuery上
- python debug【】
- 哥伦比亚大学计算机工程面试题
- MySQL:8种SQL典型错误用法,值得收藏!
- eclipse导出doc文档
- 基于arduino的光控窗帘_分别基于STM32和Arduino的智能窗帘硬件分析与程序设计
- 高级函数技巧-函数柯里化
- 计算机用户在使用计算机文件时6,201606-计算机基础选择题(含答案)(6页)-原创力文档...
- java许愿墙_wishingWall 一个好看的许愿墙板块,功能强大,页面美观 Java Develop 238万源代码下载- www.pudn.com...
- html动态线条背景鼠标,动态背景线条鼠标移动线条汇聚---背景特效
- Ra-08系列开发板入门教程,标准LoRaWAN对接私有服务器。
- Linux系统下挂载Windows分区的方法和技巧
- 软件构造(三) 软件构造过程与配置管理
- 沈海高速汕尾往深圳服务器维护报价,沈海高速收费
- 【Python数据科学】多表关联 merge、join、concat
- 浏览器-点击预览视频文件(自动播放、循环播放)
- 多线程并发下集合不安全类-ArrayList
- 第三章 模糊查询与分组查询 ② 代码
- google APP 说明
热门文章
- 通用接口开放平台设计与实现——(2)功能架构
- 苹果应用商店ASO优化技巧
- 老婆:“给我讲讲你们程序员好笑的事情呗?”我扔给她这篇文章,她狂笑不止!
- 安卓开发——android8.0应用崩溃,报错: Only fullscreen opaque activities can request orientation
- (RN)Region Normalization for Image Inpainting
- android studio 如何上传文件到模拟器的根目录
- HTML5期末大作业:电影在线网站设计——漫威电影(2页) 免费大学生网页设计制作作业作品下载dreamweaver制作静态html网页设计作业作
- FP、FN、TP、TN、精确率(Precision)、召回率(Recall)、准确率(Accuracy)是什么意思
- The Lost House POJ - 2057(树形dp+贪心 (双线最优子结构问题))
- android 支付宝微信原生以及HTML调用原生SDK