问题陈述

我有一个JMS监听器作为一个线程来监听一个主题.一旦消息进入,我就会生成一个新的Thread来处理有界消息.因此,对于每个传入的消息,我产生一个新的线程.

我有一个场景,当重复消息按顺序注入时,也会处理重复消息.我需要防止这个被处理.我尝试使用ConcurrentHashMap来保存进程时间,一旦Thread生成就将其添加到条目中,并在Thread完成执行后立即将其从地图中删除.但是当我尝试使用同时以同时方式传递相同的场景时,它没有帮助.

在您深入了解实际代码库之前,我的问题的概要

onMessage(){

processIncomingMessage(){

ExecutorService executorService = Executors.newFixedThreadPool(1000);

//Map is used to make an entry before i spawn a new thread to process incoming message

//Map contains "Key as the incoming message" and "value as boolean"

//check map for duplicate check

//The below check is failing and allowing duplicate messages to be processed in parallel

if(entryisPresentInMap){

//return doing nothing

}else{

//spawn a new thread for each incoming message

//also ensure a duplicate message being processed when it in process by an active thread

executorService.execute(new Runnable() {

@Override

public void run() {

try {

//actuall business logic

}finally{

//remove entry from the map so after processing is done with the message

}

}

}

}

用于模拟场景的独立示例

public class DuplicateCheck {

private static Map duplicateCheckMap =

new ConcurrentHashMap(1000);

private static String name=null;

private static String[] nameArray = new String[20];

public static void processMessage(String message){

System.out.println("Processed message =" +message);

}

public static void main(String args[]){

nameArray[0] = "Peter";

nameArray[1] = "Peter";

nameArray[2] = "Adam";

for(int i=0;i<=nameArray.length;i++){

name=nameArray[i];

if(duplicateCheckMap.get(name)!=null && duplicateCheckMap.get(name)){

System.out.println("Thread detected for processing your name ="+name);

return;

}

addNameIntoMap(name);

new Thread(new Runnable() {

@Override

public void run() {

try {

processMessage(name);

} catch (Exception e) {

System.out.println(e.getMessage());

} finally {

freeNameFromMap(name);

}

}

}).start();

}

}

private static synchronized void addNameIntoMap(String name) {

if (name != null) {

duplicateCheckMap.put(name, true);

System.out.println("Thread processing the "+name+" is added to the status map");

}

}

private static synchronized void freeNameFromMap(String name) {

if (name != null) {

duplicateCheckMap.remove(name);

System.out.println("Thread processing the "+name+" is released from the status map");

}

}

代码片段如下

public void processControlMessage(final Message message) {

RDPWorkflowControlMessage rdpWorkflowControlMessage= unmarshallControlMessage(message);

final String workflowName = rdpWorkflowControlMessage.getWorkflowName();

final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();

if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){

log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");

return;

}else {

log.info("doing nothing");

}

Semaphore controlMessageLock = new Semaphore(1);

try{

controlMessageLock.acquire();

synchronized(this){

new Thread(new Runnable(){

@Override

public void run() {

try {

lock.lock();

log.info("Processing Workflow Control Message for the workflow :"+workflowName);

if (message instanceof TextMessage) {

if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {

clearControlMessageBuffer();

enableControlMessageStatus(workflowName);

List matchingValues=new ArrayList();

matchingValues.add(workflowName);

ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();

ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();

tasksSetDAO.deleteMatchingRecords(matchingValues);

workflowSetDAO.deleteMatchingRecords(matchingValues);

fetchNewWorkflowItems();

addShutdownHook(workflowName);

}

}

} catch (Exception e) {

log.error("Error extracting item of type RDPWorkflowControlMessage from message "

+ message);

} finally {

disableControlMessageStatus(workflowName);

lock.unlock();

}

}

}).start();

}

} catch (InterruptedException ie) {

log.info("Interrupted Exception during control message lock acquisition"+ie);

}finally{

controlMessageLock.release();

}

}

private void addShutdownHook(final String workflowName) {

Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {

disableControlMessageStatus(workflowName);

}

});

log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);

}

private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {

RDPWorkflowControlMessage rdpWorkflowControlMessage = null;

try {

TextMessage textMessage = (TextMessage) message;

rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);

} catch (Exception e) {

log.error("Error extracting item of type RDPWorkflowTask from message "

+ message);

}

return rdpWorkflowControlMessage;

}

private void fetchNewWorkflowItems() {

initSSL();

List allTasks=initAllTasks();

taskEventListener.addRDPWorkflowTasks(allTasks);

workflowEventListener.updateWorkflowStatus(allTasks);

}

private void clearControlMessageBuffer() {

taskEventListener.getRecordsForUpdate().clear();

workflowEventListener.getRecordsForUpdate().clear();

}

private synchronized void enableControlMessageStatus(String workflowName) {

if (workflowName != null) {

controlMessageStateMap.put(workflowName, true);

log.info("Thread processing the "+workflowName+" is added to the status map");

}

}

private synchronized void disableControlMessageStatus(String workflowName) {

if (workflowName != null) {

controlMessageStateMap.remove(workflowName);

log.info("Thread processing the "+workflowName+" is released from the status map");

}

}

我修改了我的代码以包含下面提供的建议,但仍然无效

public void processControlMessage(final Message message) {

ExecutorService executorService = Executors.newFixedThreadPool(1000);

try{

lock.lock();

RDPWorkflowControlMessage rdpWorkflowControlMessage= unmarshallControlMessage(message);

final String workflowName = rdpWorkflowControlMessage.getWorkflowName();

final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();

if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){

log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");

return;

}else {

log.info("doing nothing");

}

enableControlMessageStatus(workflowName);

executorService.execute(new Runnable() {

@Override

public void run() {

try {

//actual code

fetchNewWorkflowItems();

addShutdownHook(workflowName);

}

}

} catch (Exception e) {

log.error("Error extracting item of type RDPWorkflowControlMessage from message "

+ message);

} finally {

disableControlMessageStatus(workflowName);

}

}

});

} finally {

executorService.shutdown();

lock.unlock();

}

}

private void addShutdownHook(final String workflowName) {

Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {

disableControlMessageStatus(workflowName);

}

});

log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);

}

private synchronized void enableControlMessageStatus(String workflowName) {

if (workflowName != null) {

controlMessageStateMap.put(workflowName, true);

log.info("Thread processing the "+workflowName+" is added to the status map");

}

}

private synchronized void disableControlMessageStatus(String workflowName) {

if (workflowName != null) {

controlMessageStateMap.remove(workflowName);

log.info("Thread processing the "+workflowName+" is released from the status map");

}

}

解决方法:

这是您应该如何向地图添加值.这种双重检查确保在任何特定时刻只有一个线程向地图添加值,然后您可以控制访问.之后删除所有锁定逻辑.它是如此简单

public void processControlMessage(final String workflowName) {

if(!tryAddingMessageInProcessingMap(workflowName)){

Thread.sleep(1000); // sleep 1 sec and try again

processControlMessage(workflowName);

return ;

}

System.out.println(workflowName);

try{

// your code goes here

} finally{

controlMessageStateMap.remove(workflowName);

}

}

private boolean tryAddingMessageInProcessingMap(final String workflowName) {

if(controlMessageStateMap .get(workflowName)==null){

synchronized (this) {

if(controlMessageStateMap .get(workflowName)==null){

controlMessageStateMap.put(workflowName, true);

return true;

}

}

}

return false;

}

标签:java,multithreading,jms-topic

来源: https://codeday.me/bug/20190706/1397427.html

java中workFlowEvent_防止线程在java中重复处理相关推荐

  1. java构造单例线程池_java中常见的六种线程池详解

    之前我们介绍了线程池的四种拒绝策略,了解了线程池参数的含义,那么今天我们来聊聊Java 中常见的几种线程池,以及在jdk7 加入的 ForkJoin 新型线程池 首先我们列出Java 中的六种线程池如 ...

  2. 0040 Java学习笔记-多线程-线程run()方法中的异常

    run()与异常 不管是Threade还是Runnable的run()方法都没有定义抛出异常,也就是说一条线程内部发生的checked异常,必须也只能在内部用try-catch处理掉,不能往外抛,因为 ...

  3. 获取父线程 java_java子线程中获取父线程的threadLocal中的值

    我们都知道线程本地变量表也就是ThreadLocal在我们做线程级的数据隔离时非常好用,但是有时候我们会想如何让子线程获取到父线程的ThreadLocal,其实在线程中除了ThreadLocal外还有 ...

  4. 深入理解Java虚拟机(第三版)-13.Java内存模型与线程

    13.Java内存模型与线程 1.Java内存模型 Java 内存模型的主要目的是定义程序中各种变量的访问规则,即关注在虚拟机中把变量值存储到主内存和从内存中取出变量值的底层细节 该变量指的是 实例字 ...

  5. java多线程-线程创建-线程池-java内存模型

    文章目录 ==多线程基础== 进程 线程 浏览器的进程和线程(案例) 线程的异步和同步 多线程的优势 ==多线程的实现方式== 第一种:继承Thread类 第二种:实现Runnable接口 第三种:通 ...

  6. java threadpoolexecutor 实例_Java线程池(ThreadPoolExecutor)示例

    Java线程池管理工作线程池,它包含一个队列,用于保持任务等待执行.我们可以使用ThreadPoolExecutor在Java中创建线程池. Java线程池管理Runnable线程的集合.工作线程从队 ...

  7. 【Java练手项目七】Java项目实战之天天酷跑

    首先,写一个需求文档: 一.项目名称:<天天酷跑>(RunDay) 二.功能介绍: 闯关类游戏,玩家登录后,选择进入游戏,通过键盘控制玩家的上下左右移动,来躲避 障碍物和吃金币,玩家躲避的 ...

  8. java gc机制 优点_聊聊Java的GC机制

    作者 某人Valar 如需转载请保留原文链接 部分图片来自百度,如有侵权请联系删除 本文目录 什么是GC JVM内存结构简单介绍 可达性分析与GC Roots 常见的垃圾收集算法 1. 什么是GC G ...

  9. java中我爱你_Java线程学习(转)

    编写具有多线程能力的程序经常会用到的方法有: run(),start(),wait(),notify(),notifyAll(),sleep(),yield(),join() 还有一个重要的关键字:s ...

最新文章

  1. c++学习笔记之友元函数
  2. shell数组中“和@的妙用
  3. ubuntu 14.04 java_Ubuntu14.04下配置Java环境
  4. 状态机学习(二)解析INI文件
  5. Android 解析JSON
  6. 【kafka系列】centos7系统安装kafka
  7. Gym - 102163M
  8. ES6学习笔记八(数值的扩展)
  9. 影响你成功最重要的两种人
  10. C#开发斑马RFID打印机zt410
  11. node_modules 困境
  12. 构建TCP套接字(socket)的概念及具体步骤
  13. 开源地图MapBox自定义(二):基本概念
  14. 【批量行驶证识别】如何批量行驶证OCR识别行驶本行车本图片或复印件并导出至excel表格或文本格式,下面教你方法
  15. JavaScript(九)
  16. linux操作mysql数据库
  17. Emacs入门指南(1)
  18. DirectX 性能优化
  19. 自媒体火热如雨后春笋,迅雷,牙牙,东方也来凑热闹?
  20. JavaWeb学习笔记五:BS结构系统的结构和协议

热门文章

  1. 初探 vue 插件开发
  2. HTTP/1.1与HTTP/1.0的区别
  3. Xftp5如何设置默认的文件夹
  4. 菜鸟学Linux 第093篇笔记 keepalived
  5. 我的购机(手机)之路
  6. 批处理学习总结之常用符号
  7. flex builder method
  8. Asp调用函数是否会影响性能?
  9. mysql报错 Row size too large ( 8126)
  10. 二分法(三):采用二分法解决“最大化最小值问题”