Author:Eric

Version:9.0.0

文章目录

  • 一、引言
  • 二、RabbitMQ介绍
  • 三、RabbitMQ安装
  • 四、RabbitMQ架构【`重点`】
    • 4.1 官方的简单架构图
    • 4.2 RabbitMQ的完整架构图
    • 4.3 查看图形化界面并创建一个Virtual Host
  • 五、RabbitMQ的使用【`重点`】
    • 5.1 RabbitMQ的通讯方式
    • 5.2 Java连接RabbitMQ
      • 5.2.1 创建maven项目
      • 5.2.2 导入依赖
      • 5.2.3 创建工具类连接RabbitMQ
    • 5.3 Hello-World,点对点,简单模式
    • 5.4 Work Queue 工作队列模式
    • 5.5 Publish/Subscribe,发布订阅的方式
    • 5.6 Routing,路由的方式
    • 5.7 Topic ,RoutingKey通配符方式
  • 六、RabbitMQ整合SpringBoot【`重点`】
    • 6.1 SpringBoot整合RabbitMQ
      • 6.1.1 创建SpringBoot工程
      • 6.1.2 导入依赖
      • 6.1.3 编写配置文件
      • 6.1.4 声明exchange、queue
      • 6.1.5 发布消息到RabbitMQ
      • 6.1.6 创建消费者监听消息
    • 6.2 手动Ack
      • 6.2.1 添加配置文件
      • 6.2.2 手动ack
  • 七、RabbitMQ的其他操作
    • 7.1 消息的可靠性
      • 7.1.1 普通Confirm方式
      • 7.1.2 批量Confirm方式
      • 7.1.3 异步Confirm方式。
      • 7.1.4 Return机制
    • 7.2 SpringBoot实现
      • 7.2.1 编写配置文件
      • 7.2.2 开启Confirm和Return
    • 7.3 避免消息重复消费
    • 7.4 SpringBoot如何实现
      • 7.4.1 导入依赖
      • 7.4.2 编写配置文件
      • 7.4.3 修改生产者
      • 7.4.4 修改消费者
  • 八、RabbitMQ应用
    • 8.1 客户模块
      • 8.1.1 导入依赖
      • 8.1.2 编写配置文件
      • 8.1.3 编写配置类
      • 8.1.4 修改Service的添加操作
      • 8.1.5 修改Service的删除操作
      • 8.1.6 修改Service的删除操作
    • 8.2 搜索模块
      • 8.2.1 导入依赖
      • 8.2.2 编写配置文件
      • 8.2.3 编写配置类
      • 8.2.4 编写消费者-监听状态
      • 8.2.5 删除
    • 8.3 整个项目代码(客户和搜索)
      • 8.3.1客户模块
        • 前端页面html
        • 依赖
        • yml配置
        • 配置类config
        • 实体类
        • mapper的xml
        • service
        • service的实现类
        • controller
        • 工具类
      • 8.3.2 搜索模块
        • 依赖
        • yml配置
        • 配置类
        • 实体类
        • service
        • service的实现类
        • controller
        • ES的创建索引和添加数据

一、引言

模块之间的耦合度过高,导致一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。

RabbitMQ引言

二、RabbitMQ介绍


市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的(RabbitMQ基于Erlang编写的,面向并发编程)。
  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。

RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。

三、RabbitMQ安装


version: "3.1"
services:rabbitmq:#management图形化界面 15672图形画面端口image: daocloud.io/library/rabbitmq:managementrestart: alwayscontainer_name: rabbitmqports:- 5672:5672- 15672:15672volumes:- ./data:/var/lib/rabbitmq

四、RabbitMQ架构【重点


4.1 官方的简单架构图

  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange

  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息

  • Exchange - 交换机:和生产者建立连接并接收生产者的消息

  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

  • Routes - 路由:交换机以什么样的策略将消息发布到Queue

简单架构图

4.2 RabbitMQ的完整架构图

完整架构图

完整架构图

4.3 查看图形化界面并创建一个Virtual Host

创建一个全新的用户和全新的Virtual Host,并且将test用户设置上可以操作/test的权限

监控界面

添加用户

设置使用主机

删除

返回首页界面

五、RabbitMQ的使用【重点


5.1 RabbitMQ的通讯方式

通讯方式

5.2 Java连接RabbitMQ

5.2.1 创建maven项目

…………

5.2.2 导入依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
</dependencies>
5.2.3 创建工具类连接RabbitMQ
public static Connection getConnection(){// 创建Connection工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.206.142");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123");factory.setVirtualHost("/test");// 创建ConnectionConnection conn = null;try {conn = factory.newConnection();} catch (Exception e) {e.printStackTrace();}// 返回return conn;
}
效果图

5.3 Hello-World,点对点,简单模式

一个生产者,一个默认的交换机,一个队列,一个消费者

结构图

创建生产者,创建一个channel,发布消息到exchange,指定路由规则。

@Test
public void publish() throws Exception {//1. 获取rabbit连接Connection connection = RabbitMQClient.getConnection();//2. 创建ChannelChannel channel = connection.createChannel();//3. 发布消息到exchange,同时指定路由的规则String msg = "Hello-World!";//发送消息// 参数1:指定exchange,使用""。// 参数2:指定路由的规则,使用具体的队列名称。// 参数3:指定传递的消息所携带的properties,使用null。// 参数4:指定发布的具体消息,byte[]类型channel.basicPublish("","HelloWorld",null,msg.getBytes());// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。System.out.println("生产者发布消息成功!");//4. 释放资源channel.close();connection.close();
}

创建消费者,创建一个channel,创建一个队列,并且去消费当前队列

@Test
public void consume() throws Exception {//1. 获取连接对象Connection connection = RabbitMQClient.getConnection();//2. 创建channelChannel channel = connection.createChannel();//3. 声明队列-HelloWorld//参数1:queue - 指定队列的名称//参数2:durable - 当前队列是否需要持久化(true)//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除//参数5:arguments - 指定当前队列的其他信息channel.queueDeclare("HelloWorld",true,false,false,null);//4. 开启监听QueueDefaultConsumer consume = new DefaultConsumer(channel){@Override//body消费者传过来的内容public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body,"UTF-8"));}};//参数1:queue - 指定消费哪个队列//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)//参数3:consumer - 指定消费回调channel.basicConsume("HelloWorld",true,consume);System.out.println("消费者开始监听队列!");// System.in.read();System.in.read();//5. 释放资源channel.close();connection.close();
}

5.4 Work Queue 工作队列模式

一个生产者,一个默认的交换机,一个队列,两个消费者

结构图

只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing

消费者指定Qoa和手动ack

//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));//2. 手动ackchannel.basicAck(envelope.getDeliveryTag(),false);}
};
//3. 指定手动ack
channel.basicConsume("Work",false,consumer);

5.5 Publish/Subscribe,发布订阅的方式

一个生产者,一个交换机,两个队列,两个消费者

结构图

声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。

让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。

//3. 创建exchange - 绑定某一个队列
//参数1: exchange的名称
//参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
//参数3:指定路由key
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
channel.queueBind("pubsub-queue1","pubsub-exchange","");//队列绑定到交换机
channel.queueBind("pubsub-queue2","pubsub-exchange","");

消费者还是正常的监听对应队列,consumer的消息队列名要一一对应上。

5.6 Routing,路由的方式

一个生产者,一个交换机,两个队列,两个消费者

结构图

生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。

//3. 创建exchange, routing-queue-error,routing-queue-info,队列绑定
//参数1: exchange的名称
//参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
//参数3:指定路由key
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind("routing-queue-error","routing-exchange","ERROR");
channel.queueBind("routing-queue-info","routing-exchange","INFO");//4. 发布消息到exchange,同时指定路由的规则
// 参数1:交换机名称
// 参数2:路由key,与绑定队列的路由key一致
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());

消费者没有变化

5.7 Topic ,RoutingKey通配符方式

一个生产者,一个交换机,两个队列,两个消费者

结构图

生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * 一个xxx,而# 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。

//2. 创建exchange并指定绑定方式
//  fast.red.cat  绑定队列可以写成以下其中一种对应方式:*.red.*
//  fast.white.dog 绑定队列可以写成以下其中一种对应方式:fast.# 或fast.*.*
//  slow.yello.dog
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
channel.queueBind("topic-queue-2","topic-exchange","fast.#");
channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");//3. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes());
channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());

消费者只是监听队列,没变化。

六、RabbitMQ整合SpringBoot【重点


6.1 SpringBoot整合RabbitMQ

6.1.1 创建SpringBoot工程
6.1.2 导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.1.3 编写配置文件
spring:rabbitmq:host: 192.168.199.109port: 5672username: testpassword: testvirtual-host: /test
6.1.4 声明exchange、queue
@Configuration
public class RabbitMQConfig {//1. 创建交换机exchange - topic@Beanpublic TopicExchange getTopicExchange(){return new TopicExchange("boot-topic-exchange",true,false);}//2. 创建队列queue@Beanpublic Queue getQueue(){return new Queue("boot-queue",true,false,false,null);}//3. 将交换机和队列绑定在一起@Beanpublic Binding getBinding(TopicExchange topicExchange,Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");}
}
6.1.5 发布消息到RabbitMQ
@Autowired
private RabbitTemplate rabbitTemplate;@Test
void contextLoads() {//通过模板发送消息rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
}
6.1.6 创建消费者监听消息
@Component
public class Consumer {@RabbitListener(queues = "boot-queue")//监听队列public void getMessage(Object message){System.out.println("接收到消息:" + message);}}

6.2 手动Ack

6.2.1 添加配置文件
spring:rabbitmq:listener:simple:acknowledge-mode: manual #手动ack方式
6.2.2 手动ack
//手动回复
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:" + msg);int i = 1 / 0;// 手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

七、RabbitMQ的其他操作


7.1 消息的可靠性

a.是否到达交换机

b.是否到达队列

使用确认机制

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍以上。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

7.1.1 普通Confirm方式
//在生产者类中添加//3.1 开启confirm
channel.confirmSelect();
//3.2 发送消息
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
//3.3 判断消息是否成功发送到交换机Exchange
if(channel.waitForConfirms()){System.out.println("消息发送到交换机成功");
}else{System.out.println("发送消息到交换机失败");
}
7.1.2 批量Confirm方式
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {String msg = "Hello-World!" + i;channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 确定批量操作是否成功
channel.waitForConfirmsOrDie();     // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
7.1.3 异步Confirm方式。
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {String msg = "Hello-World!" + i;channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);}
});
消息传递可靠性
7.1.4 Return机制

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

消息传递可靠性

开启Return机制,并在发送消息时,指定mandatory为true

// 开启return机制
channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {// 当消息没有送达到queue时,才会执行。System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");}
});// 在发送消息时,指定mandatory参数为true,当队列没有接收到消息时,执行returnListener回调
channel.basicPublish("","HelloWorld",true,null,msg.getBytes());

7.2 SpringBoot实现

7.2.1 编写配置文件
spring:rabbitmq:publisher-confirm-type: simple  #新版本用publisher-confirms: true  #老版本用,确认机制开启,消息是否到达交换机publisher-returns: true  #return机制开启,消息是否到达队列
7.2.2 开启Confirm和Return
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//模板@PostConstruct  // init-method这个注解相当于是初始化方法public void initMethod(){rabbitTemplate.setConfirmCallback(this);//当前对象,设置回调方法rabbitTemplate.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到交换机Exchange");}else{System.out.println("消息没有送达到交换机Exchange");}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息没有送达到Queue");}
}

7.3 避免消息重复消费

重复消费消息,会对非幂等性操作造成问题,幂等性:查询;非幂等性:增删改,每个操作都会对数据有改动

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

重复消费

为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,先将消息的id放到Redis中,

id-0(正在执行业务)id是消息id

id-1(执行业务成功)

如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

setnx:key value ;key存在就啥事都不做,不存在就添加

极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

生产者,发送消息时,指定messageId

//传递消息所携带的数据properties
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(1)     //指定消息是否需要持久化 1 - 需要持久化  2 - 不需要持久化.messageId(UUID.randomUUID().toString())//消息ID.build();
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());

消费者,在消费消息时,根据具体业务逻辑去操作redis

DefaultConsumer consume = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {Jedis jedis = new Jedis("192.168.199.109",6379);String messageId = properties.getMessageId();//1. setnx到Redis中,默认指定value-0, 添加messageId到redis中 key:message  value:0//如果结果是OK证明redis没有这个数据,则添加成功//如果有的话,则判断value值,若是0,不做任何事,若是1,则手动ackString result = jedis.set(messageId, "0", "NX", "EX", 10);//0正在执行业务,NX是否存在,EX生存时间if(result != null && result.equalsIgnoreCase("OK")) {System.out.println("接收到消息:" + new String(body, "UTF-8"));//2. 消费成功,set messageId 1jedis.set(messageId,"1");channel.basicAck(envelope.getDeliveryTag(),false);}else {//3. 如果setnx失败,获取key对应的value,如果是0,return,如果是1,则回复ackString s = jedis.get(messageId);if("1".equalsIgnoreCase(s)){channel.basicAck(envelope.getDeliveryTag(),false);}}}
};

7.4 SpringBoot如何实现

7.4.1 导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
7.4.2 编写配置文件
spring:redis:host: 192.168.199.109port: 6379
7.4.3 修改生产者
在SpringbootRabbitmqApplicationTests类修改@Test
void contextLoads() throws IOException {//携带数据CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);System.in.read();
}
7.4.4 修改消费者
//解决消息重复消费问题
@Autowired
private StringRedisTemplate redisTemplate;@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {//0. 获取MessageIdString messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//1. 设置key到Redis,redisTemplate模板对象,opsForValue相当于keyString,setIfAbsent相当于setnxif(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {//2. 消费消息System.out.println("接收到消息:" + msg);//3. 设置key的value为1redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);//4.  手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else {//5. 获取Redis中的value即可 如果是1,手动ackif("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
}

八、RabbitMQ应用


8.1 客户模块

8.1.1 导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
8.1.2 编写配置文件
spring:rabbitmq:host: 192.168.199.109port: 5672username: testpassword: testvirtual-host: /test
8.1.3 编写配置类
@Configuration
public class RabbitMQConfig {@Beanpublic TopicExchange topicExchange(){return new TopicExchange("openapi-customer-exchange",true,false);}@Beanpublic Queue queue(){return new Queue("openapi-customer-queue");}@Beanpublic Binding binding(Queue queue,TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");}}
8.1.4 修改Service的添加操作
//3. 发送消息
rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.add",JSON.toJSON(customer));/*//3. 调用搜索模块,添加数据到ES//1. 准备请求参数和请求头信息String json = JSON.toJSON(customer);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));HttpEntity<String> entity = new HttpEntity<>(json,headers);//2. 使用RestTemplate调用搜索模块restTemplate.postForObject("http://localhost:8080/search/customer/add", entity, String.class);*/
8.1.5 修改Service的删除操作
    /*修改*/@Overridepublic void updateCustomer(Customer customer) {int i = customerMapper.updateCustomer(customer);if (i != 1) {log.error("【修改客户信息到数据库失败!】customer={}", customer);throw new RuntimeException("【修改客户信息到数据库失败!】");}//发送消息rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.update",JSON.toJson(customer));
//        String json = JSON.toJson(customer);
//        HttpHeaders httpHeaders = new HttpHeaders();
//        httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
//        HttpEntity httpEntity = new HttpEntity(json, httpHeaders);
//        restTemplate.postForObject("http://localhost:8080/search/customer/update", httpEntity, String.class);}
8.1.6 修改Service的删除操作
 /*批量删除*/@Overridepublic void delCustomer(Integer[] ids) {int i = customerMapper.delCustomer(ids);System.out.println(i);if (i < 1) {for (int j = 0; j < ids.length; j++) {log.error("【删除客户信息到数据库失败!】ids={}", ids[j]);}
//            log.error("【删除客户信息到数据库失败!】ids={}", ids);
//            throw new RuntimeException("【删除客户信息到数据库失败!】");}//发送消息rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.del",JSON.toJson(ids));
//        String json = JSON.toJson(ids);
//        HttpHeaders httpHeaders = new HttpHeaders();
//        httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
//        HttpEntity httpEntity = new HttpEntity(json, httpHeaders);
//        restTemplate.postForObject("http://localhost:8080/search/customer/del", httpEntity, String.class);}

8.2 搜索模块

8.2.1 导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
8.2.2 编写配置文件
spring:rabbitmq:host: 192.168.199.109port: 5672username: testpassword: testvirtual-host: /testlistener:simple:acknowledge-mode: manual
8.2.3 编写配置类
@Configuration
public class RabbitMQConfig {@Beanpublic TopicExchange topicExchange(){return new TopicExchange("openapi-customer-exchange",true,false);}@Beanpublic Queue queue(){return new Queue("openapi-customer-queue");}@Beanpublic Binding binding(Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");}
}
8.2.4 编写消费者-监听状态
@Component
public class SearchListener {@Autowiredprivate SearchService searchService;@RabbitListener(queues = "openapi-customer-queue")public void consume(String json, Channel channel, Message message) throws IOException {//1. 获取RoutingKeyString receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();//2. 使用switchswitch (receivedRoutingKey){case "openapi.customer.add"://3. add操作调用Service完成添加searchService.addCustomer(JSON.parseJSON(json, Customer.class));//4. 手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
}

工具类

public class JSON {/*对象转成JSON数据*/public static String toJson(Object obj) {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(obj);} catch (JsonProcessingException e) {e.printStackTrace();return "";}}/*JSON数据转成对象*/public static <T> T parseJson(String data, Class<T> clazz) {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.readValue(data, clazz);} catch (Exception e) {e.printStackTrace();return null;}}/*** 将data转化为1维整型数组, data形如:"1,3,4,5,6"*/public static Integer[] toIntegerArray(String data) {//[21,22]data = data.substring(1, data.length() - 1);//将前端传过来的JSON数据的中括号切掉String[] datas = data.split(",");//将字符串转成一个字符串数组Integer[] tmp = new Integer[datas.length];//创建包装类整型数组//字符串数组转成包装类整型数组for (int i = 0; i < datas.length; i++) {tmp[i] = Integer.parseInt(datas[i]);}return tmp;}
}
8.2.5 删除

编写消费者-监听

@Component
public class SearchListener {@Autowiredprivate SearchService searchService;@RabbitListener(queues = "openapi-customer-queue")public void msg(String msg, Channel channel, Message message) throws IOException, TimeoutException {//得到routingKeyString routingKey = message.getMessageProperties().getReceivedRoutingKey();switch (routingKey){          case "openapi.customer.del"://删除客户操作 "1","2"searchService.delCustomer(JSON.toIntegerArray(msg.toString()));//手动ack回复channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);break;}channel.close();}
}

controller

    @RequestMapping(value ="/del", produces = "application/json;charset=utf-8")public ResultVO updateCustomer(Integer[] ids){try {customerService.delCustomer(ids);return new ResultVO(true,"删除客户信息成功");} catch (Exception e) {e.printStackTrace();return new ResultVO(false,"删除客户信息失败!");}}

8.3 整个项目代码(客户和搜索)

8.3.1客户模块
前端页面html
<!DOCTYPE html>
<html >
<head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"><title>开放平台管理系统-客户模块</title><link rel="stylesheet" href="layui/css/layui.css"><link rel="stylesheet" href="easyui/default/easyui.css"><script src="layui/jquery-1.10.2.min.js"type="text/javascript"></script><script src="easyui/jquery.easyui.min.js"type="text/javascript"></script>
</head>
<body class="layui-layout-body">
<div class="layui-layout layui-layout-admin"><div id="main" style="padding-top:20px;padding-left:50px;"><!-- 内容主体区域 --><form action="#" class="layui-form"><div class="layui-form-item"><div class="layui-inline"><label>名称:</label><div class="layui-inline"><input type="text" name="username" autocomplete="off" placeholder="请输入名称"class="layui-input"></div>状态:<div class="layui-inline"><select name="state"><option value="">请选择</option><option value="0">无效</option><option value="1">有效</option></select></div><button class="layui-btn" lay-submit lay-filter="customer-table">搜索</button></div></div></form><script type="text/html" id="customer-head-bar"><div class="layui-btn-container"><button class="layui-btn layui-btn-sm" lay-event="goEdit"><i class="layui-icon">&#xe654;</i>添加</button><button class="layui-btn layui-btn-sm layui-btn-danger" lay-event="delete"><i class="layui-icon">&#xe640;</i>删除</button></div></script><script type="text/html" id="customer-customer-bar"><a class="layui-btn layui-btn-xs" lay-event="edit-customer">编辑</a></script><table class="layui-table" lay-filter="customer-table" id="customer-table"></table></div><div class="layui-footer" style="left: 0;"><!-- 底部固定区域 -->© xxxjava.cn -程序员</div>
</div>
<script src="layui/layui.js"></script><script type="text/javascript" >layui.use(['table', 'layer', 'form'], function () {var table = layui.table;var layer = layui.layer;var form = layui.form;form.render();table.render({id: "customer-table",elem: '#customer-table', toolbar: '#customer-head-bar', cellMinWidth: 80, url: 'sys/customer/table' //数据接口, page: true //开启分页, cols: [[{checkbox: true},{field: 'id', title: 'ID', sort: true}, {field: 'username', title: '账号'}, {field: 'nickname', title: '公司名称'}, {field: 'money', title: '账户金额'}, {field: 'address', title: '公司地址'}, {field: 'state', title: '状态', templet: function (data) {return (data.state == 1) ? '<span class="layui-badge layui-bg-green">有效</span>' : '<span class="layui-badge layui-bg-red" >无效</span>'}}, {fixed: 'right', title: '操作', toolbar: '#customer-customer-bar', width: 180}]]});//头工具栏事件table.on('toolbar(customer-table)', function (obj) {switch (obj.event) {case 'goEdit':openEditWindow(null);break;case 'delete':var data = table.checkStatus('customer-table').data;if (data.length > 0) {layer.confirm('真的删除行么', function (index) {var param = ""$.each(data, function (i, obj) {param += "&ids=" + obj.id})$.ajax({url: 'sys/customer/del',data: param,method: 'post',success: function (result) {if (result.status) {table.reload('customer-table', {});} else {alert(result.message)}layer.close(index);}})});}break;};;});//监听行工具事件table.on('tool(customer-table)', function (obj) {var data = obj.data;switch (obj.event) {case 'edit-customer': {openEditWindow(data);break;}}});function openEditWindow(data) {layer.open({type: 1,content: data == null ? $('#customer-add-layer').html():$('#customer-edit-layer').html(),title: data == null ? '添加客户':'编辑客户', area: ['500px', '450px'],btn: ['提交', '取消'] //可以无限个按钮, yes: function (index, layero) {layer.load()$.ajax({url: "sys/customer/" + (data == null ? 'add' : 'update'),//  url: '/sys/customer/update',data: data == null ?$("#customer-add-form").serialize():$("#customer-edit-form").serialize(),method: 'post',success: function (result) {if (result.status) {table.reload('customer-table', {});layer.close(index);} else {alert(result.message)}layer.closeAll('loading');}})}, success: function (layero, index) {form.render()if (data != null) {form.val("customer-edit-form", data);form.val("customer-edit-form", {"state": data.state + ""});}}});}form.on('submit(customer-table)', function (data) {table.reload('customer-table', {page: {curr: 1 //重新从第 1 页开始},where: data.field});return false; //阻止表单跳转。如果需要表单跳转,去掉这段即可。});});</script><script type="text/html" id="customer-edit-layer"><form id="customer-edit-form" style="width: 80%" class="layui-form" lay-filter="customer-edit-form"><input type="hidden" name="id"><div class="layui-form-item"><label class="layui-form-label">用户名</label><div class="layui-input-block"><input type="text" name="username" required lay-verify="required" placeholder="请输入用户名"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">公司名称</label><div class="layui-input-block"><input type="text" name="nickname" required lay-verify="required" placeholder="请输入公司名称"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">密码</label><div class="layui-input-block"><input type="password" name="password" required lay-verify="required" placeholder="请输入密码"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">地址</label><div class="layui-input-block"><input type="text" name="address" required lay-verify="required" placeholder="请输入地址"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">账户金额(元)</label><div class="layui-input-block"><input type="number" name="money" required lay-verify="required" placeholder="请输入账号金额"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">状态</label><div class="layui-input-block"><input type="radio" name="state" title="有效" value="1" checked/><input type="radio" name="state" title="无效" value="0"/></div></div></form>
</script><script type="text/html" id="customer-add-layer"><form id="customer-add-form" style="width:80%;padding-top:10px;" class="layui-form" lay-filter="customer-edit-form"><input type="hidden" name="id"><div class="layui-form-item"><label class="layui-form-label">用户名</label><div class="layui-input-block"><input type="text" name="username" required lay-verify="required" placeholder="请输入用户名"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">名称</label><div class="layui-input-block"><input type="text" name="nickname" required lay-verify="required" placeholder="请输入公司名称"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">密码</label><div class="layui-input-block"><input type="password" name="password" required lay-verify="required" placeholder="请输入密码"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">地址</label><div class="layui-input-block"><input type="text" name="address" required lay-verify="required" placeholder="请输入地址"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">账户金额(元)</label><div class="layui-input-block"><input type="number" name="money" required lay-verify="required" placeholder="请输入账号金额"autocomplete="off"class="layui-input"></div></div><div class="layui-form-item"><label class="layui-form-label">状态</label><div class="layui-input-block"><input type="radio" name="state" title="有效" value="1" checked/><input type="radio" name="state" title="无效" value="0"/></div></div></form>
</script>
</body>
</html>
依赖
    <properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.10</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
yml配置
#tomcat配置信息
server:port: 80
#连接数据库的配置spring:#数据源datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql:///openapi_2005?characterEncoding=utf8&serverTimezone=UTCusername: rootpassword: roottype: com.alibaba.druid.pool.DruidDataSource#rabbitmq配置rabbitmq:host: 192.168.246.129port: 5672username: mallowpassword: 123virtual-host: /mallow #主机#mybatis的配置
mybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.mallow.springboot02.entityconfiguration:map-underscore-to-camel-case: true
配置类config

RestTemplate的配置

@Configuration
public class RestTemplateConfig {@Beanpublic RestTemplate getTemplate(){return new RestTemplate();}}

RabbitMQ的配置

@Configuration
public class RabbitMQConfig {/*交换机*/@Beanpublic TopicExchange topicExchange(){// return new TopicExchange(交换名称,是否持久化,是否自动删除);return new TopicExchange("openapi-customer-exchange",true,false);}/*队列*/@Beanpublic Queue queue(){// return new Queue(队列名称,是否持久化,是否排外,是否自动删除);return new Queue("openapi-customer-queue",true,false,false);}/*将队列绑定到交换机*/@Beanpublic Binding binding(TopicExchange topicExchange,Queue queue){// return BindingBuilder.bind(队列).to(交换机).with(路由key);return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");}}
实体类
@Data
public class Customer implements Serializable {private static final long serialVersionUID = 1586034423739L;// 主键private Integer id;// 公司名private String username;// 密码private String password;// 昵称private String nickname;// 金钱private Long money;// 地址private String address;// 状态  private Integer state;
}

mapper

public interface CustomerMapper {int addCustomer(Customer customer);int updateCustomer(Customer customer);int delCustomer( Integer[] ids);
}
mapper的xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.mallow.springboot02.mapper.CustomerMapper"><!--主键回填,keyProperty回填的属性,useGeneratedKeys回填的值是否要放到keyProperty中--><insert id="addCustomer" keyProperty="id" useGeneratedKeys="true">insert into t_customer values (null,#{username},#{password},#{nickname},#{address},#{money},#{state})</insert><update id="updateCustomer">update t_customer<set><if test="username!=null and username!='' ">username=#{username},</if><if test="password!=null and password!='' ">password=#{password},</if><if test="nickname!=null and nickname!='' ">nickname=#{nickname},</if><if test="address!=null and address!='' ">address=#{address},</if><if test="money!=null and money!='' ">money=#{money},</if><if test="state!=null ">state=#{state},</if></set>where id=#{id}</update><delete id="delCustomer">update t_customer set state=0 where id in<foreach collection="array" open="(" close=")" item="id" separator=",">#{id}</foreach></delete></mapper>
service
public interface CustomerService {String queryByCondition(Map<String,Object> map);void addCustomer(Customer customer);void updateCustomer(Customer customer);void delCustomer(Integer[] ids);
}
service的实现类
@Service
@Slf4j
public class CustomerServiceImpl2 implements CustomerService {@Autowiredprivate RestTemplate restTemplate;@Autowiredprivate CustomerMapper customerMapper;@Autowiredprivate RabbitTemplate rabbitTemplate;/*查询*/@Overridepublic String queryByCondition(Map<String, Object> map) {//把请求的参数转成jsonString json = JSON.toJson(map);HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));//指定类型//设置请求的json的数据和数据类型HttpEntity httpEntity = new HttpEntity(json, httpHeaders);//通过restTemplate调用search模块,发送请求去请求searchController的接口,返回json数据String result = restTemplate.postForObject("http://localhost:8080/search/customer/query", httpEntity, String.class);//String.class是返回类型return result;}/*添加*/@Overridepublic void addCustomer(Customer customer) {//往数据库中添加数据int i = customerMapper.addCustomer(customer);if (i != 1) {log.error("【添加客户信息到数据库失败!】customer={}", customer);throw new RuntimeException("【添加客户信息到数据库失败!】");}//发送消息rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.add",JSON.toJson(customer));/* //往es中添加数据//把请求的参数转成jsonString json = JSON.toJson(customer);HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));//设置请求的json的数据和数据类型HttpEntity httpEntity = new HttpEntity(json, httpHeaders);//通过restTemplate调用search模块,发送请求去请求searchController的接口,返回json数据restTemplate.postForObject("http://localhost:8080/search/customer/add", httpEntity, String.class);*/}/*修改*/@Overridepublic void updateCustomer(Customer customer) {int i = customerMapper.updateCustomer(customer);if (i != 1) {log.error("【修改客户信息到数据库失败!】customer={}", customer);throw new RuntimeException("【修改客户信息到数据库失败!】");}//发送消息rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.update",JSON.toJson(customer));
//        String json = JSON.toJson(customer);
//        HttpHeaders httpHeaders = new HttpHeaders();
//        httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
//        HttpEntity httpEntity = new HttpEntity(json, httpHeaders);
//        restTemplate.postForObject("http://localhost:8080/search/customer/update", httpEntity, String.class);}/*批量删除*/@Overridepublic void delCustomer(Integer[] ids) {int i = customerMapper.delCustomer(ids);System.out.println(i);if (i < 1) {for (int j = 0; j < ids.length; j++) {log.error("【删除客户信息到数据库失败!】ids={}", ids[j]);}
//            log.error("【删除客户信息到数据库失败!】ids={}", ids);
//            throw new RuntimeException("【删除客户信息到数据库失败!】");}//发送消息rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.del",JSON.toJson(ids));
//        String json = JSON.toJson(ids);
//        HttpHeaders httpHeaders = new HttpHeaders();
//        httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
//        HttpEntity httpEntity = new HttpEntity(json, httpHeaders);
//        restTemplate.postForObject("http://localhost:8080/search/customer/del", httpEntity, String.class);}
}
controller
@RestController
@RequestMapping("/sys/customer")
public class CustomerController {@Autowiredprivate CustomerService customerService;@GetMapping(value = "/table", produces = "application/json;charset=utf-8")public String table(@RequestParam(defaultValue = "1") Integer page, @RequestParam(defaultValue = "5") Integer limit) {Map<String, Object> map = new HashMap<>();map.put("page", page);map.put("limit", limit);String json = customerService.queryByCondition(map);return json;}@RequestMapping(value ="/add", produces = "application/json;charset=utf-8")public ResultVO addCustomer(Customer customer){try {customerService.addCustomer(customer);return new ResultVO(true,"添加客户信息成功");} catch (Exception e) {e.printStackTrace();return new ResultVO(false,"添加客户信息失败!");}}@PostMapping(value ="/update", produces = "application/json;charset=utf-8")public ResultVO updateCustomer( Customer customer){try {customerService.updateCustomer(customer);return new ResultVO(true,"修改客户信息成功");} catch (Exception e) {e.printStackTrace();return new ResultVO(false,"修改客户信息失败!");}}@RequestMapping(value ="/del", produces = "application/json;charset=utf-8")public ResultVO updateCustomer(Integer[] ids){try {customerService.delCustomer(ids);return new ResultVO(true,"删除客户信息成功");} catch (Exception e) {e.printStackTrace();return new ResultVO(false,"删除客户信息失败!");}}
}
工具类

ResultVO

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ResultVO {private boolean status;private String message;
}

JSON

public class JSON {public static String toJson(Object obj) {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(obj);} catch (JsonProcessingException e) {e.printStackTrace();return "";}}}
8.3.2 搜索模块
依赖
    <properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.5.4</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.5.4</version></dependency><dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.9.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
yml配置
elasticsearch:host: 192.168.246.129port: 9200
#rabbitmq配置
spring:rabbitmq:host: 192.168.246.129port: 5672username: mallowpassword: 123virtual-host: /mallow #主机
配置类

ElasticSearch

@Configuration
public class ElasticSearch {@Value("${elasticsearch.host}")private String host;@Value("${elasticsearch.port}")private Integer port;@Beanpublic RestHighLevelClient client() {//创建HttpHost httpHost = new HttpHost(host, port);//创建RestClientBuilderRestClientBuilder clientBuilder = RestClient.builder(httpHost);//创建RestHighLevelClientRestHighLevelClient client = new RestHighLevelClient(clientBuilder);return client;}
}

RabbitMQConfig

@Configuration
public class RabbitMQConfig {/*交换机*/@Beanpublic TopicExchange topicExchange(){// return new TopicExchange(交换名称,是否持久化,是否自动删除);return new TopicExchange("openapi-customer-exchange",true,false);}/*队列*/@Beanpublic Queue queue(){// return new Queue(队列名称,是否持久化,是否排外,是否自动删除);return new Queue("openapi-customer-queue",true,false,false);}/*将队列绑定到交换机*/@Beanpublic Binding binding(TopicExchange topicExchange, Queue queue){// return BindingBuilder.bind(队列).to(交换机).with(路由key);return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");}}
实体类
@Data
public class Customer implements Serializable {private static final long serialVersionUID = 1586034423739L;// 主键private Integer id;// 公司名private String username;// 密码private String password;// 昵称private String nickname;// 金钱private Long money;// 地址private String address;// 状态  private Integer state;
}
service
public interface SearchService {//返回的是json数据,可以使用StringString searchByCondition(Map<String, Object> map) throws IOException, InvocationTargetException, IllegalAccessException;void addCustomer(Customer customer) throws IOException;void updateCustomer(Customer customer) throws IOException;void delCustomer(Integer[] ids) throws IOException;
}
service的实现类
@Service
@Slf4j
public class SearchServiceImpl implements SearchService {@Autowiredprivate RestHighLevelClient client;String index = "openapi_customer";String type = "customer";/*查询*/@Overridepublic String searchByCondition(Map<String, Object> map) throws IOException, InvocationTargetException, IllegalAccessException {//创建request对象SearchRequest request = new SearchRequest(index);request.types(type);SearchSourceBuilder builder = new SearchSourceBuilder();builder.sort("id", SortOrder.ASC);//升序//取值,前端传过来的数据Object name = map.get("name");Object state = map.get("state");if (!StringUtils.isEmpty(name)) {builder.query(QueryBuilders.termsQuery("username", name));//es对应的属性名username}if (state != null) {builder.query(QueryBuilders.termsQuery("state", state));}//页数Integer page = Integer.parseInt(map.get("page").toString());Integer limit = Integer.parseInt(map.get("limit").toString());//查询分页的数据builder.from((page - 1) * limit);//起始页下标builder.size(limit);//每页显示的条数request.source(builder);//操作esSearchResponse response = client.search(request, RequestOptions.DEFAULT);TableDataVo<Customer> dataVo = new TableDataVo<>();//设置总条数dataVo.setCount(response.getHits().getTotalHits());//存储数据List<Customer> customerList = new ArrayList<>();for (SearchHit hit : response.getHits().getHits()) {Map<String, Object> sourceAsMap = hit.getSourceAsMap();//每遍历一条数据,则创建一个对象Customer customer = new Customer();//会把map中key对应的value值,赋值给对象的属性,前提条件是key的名称必须和对象的属性一致BeanUtils.populate(customer, sourceAsMap);//循环的添加客户对象customerList.add(customer);}//设置存储数据dataVo.setData(customerList);//把对象转成对象,返回return JSON.toJson(dataVo);}/*添加*/@Overridepublic void addCustomer(Customer customer) throws IOException {//创建request对象IndexRequest request = new IndexRequest(index, type, customer.getId() + "");//设置数据,把对象转成json数据String customerJson = JSON.toJson(customer);request.source(customerJson, XContentType.JSON);//client执行添加操作IndexResponse response = client.index(request, RequestOptions.DEFAULT);//返回结果if (!"CREATED".equalsIgnoreCase(response.getResult().toString())) {log.error("【添加文档失败!!】index={},type={},customerId={}" + index, type, customer.getId());}}/*修改*/@Overridepublic void updateCustomer(Customer customer) throws IOException {UpdateRequest request = new UpdateRequest(index, type, customer.getId() + "");String customerJson = JSON.toJson(customer);request.doc(customerJson, XContentType.JSON);UpdateResponse response = client.update(request, RequestOptions.DEFAULT);if (!"UPDATED".equalsIgnoreCase(response.getResult().toString())) {log.error("【修改文档失败!!】index={},type={},customerId={}" , index, type, customer.getId());}}/*删除*/@Overridepublic void delCustomer(Integer[] ids) throws IOException {//批量删除BulkRequest request=new BulkRequest();for (int i = 0; i < ids.length; i++) {request.add(new DeleteRequest(index,type,ids[i]+""));}String idsJson = JSON.toJson(ids);BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);System.out.println(response.toString());
//        if(!"DELETED".equalsIgnoreCase(response)){//
//        }}}
controller
@RestController
@RequestMapping("/search/customer")
public class SearchController {@Autowiredprivate SearchService searchService;@PostMapping(value = "/query", produces = "application/json;charset=utf-8")public String query(@RequestBody Map<String, Object> map) throws IllegalAccessException, IOException, InvocationTargetException {return searchService.searchByCondition(map);}@RequestMapping(value = "/add", produces = "application/json;charset=utf-8")public void addCustomer(@RequestBody Customer customer) throws IOException {//@RequestBody获取前端参数searchService.addCustomer(customer);}@RequestMapping(value = "/update", produces = "application/json;charset=utf-8")public void updateCustomer(@RequestBody Customer customer) throws IOException {searchService.updateCustomer(customer);}@RequestMapping(value = "/del", produces = "application/json;charset=utf-8")public void delCustomer(@RequestBody Integer[] ids) throws IOException {searchService.delCustomer(ids);}@PostMapping(value = "/search", produces = "application/json;charset=utf-8")public String search(@RequestBody Map<String, Object> map) throws IllegalAccessException, IOException, InvocationTargetException {return searchService.searchByCondition(map);}
}

工具类

JSON

public class JSON {/*对象转成JSON数据*/public static String toJson(Object obj) {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(obj);} catch (JsonProcessingException e) {e.printStackTrace();return "";}}/*JSON数据转成对象*/public static <T> T parseJson(String data, Class<T> clazz) {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.readValue(data, clazz);} catch (Exception e) {e.printStackTrace();return null;}}/*** 将data转化为1维整型数组, data形如:"1,3,4,5,6"*/public static Integer[] toIntegerArray(String data) {//[21,22]data = data.substring(1, data.length() - 1);//将前端传过来的JSON数据的中括号切掉String[] datas = data.split(",");//将字符串转成一个字符串数组Integer[] tmp = new Integer[datas.length];//创建包装类整型数组//字符串数组转成包装类整型数组for (int i = 0; i < datas.length; i++) {tmp[i] = Integer.parseInt(datas[i]);}return tmp;}}

TableDataVO

@Data
public class TableDataVo<T> {private Integer code=0;private String msg="";private Long count;private List<T> data;}
ES的创建索引和添加数据
@RunWith(SpringRunner.class)
@SpringBootTest
public class ElasticInitTests {@Autowiredprivate RestHighLevelClient client;String index = "openapi_customer";String type = "customer";@Testpublic void createIndex() throws IOException {//1. 准备关于索引的settingsSettings.Builder settings = Settings.builder().put("number_of_shards", 5).put("number_of_replicas", 1);//2. 准备关于索引的结构mappingsXContentBuilder mappings = JsonXContent.contentBuilder().startObject().startObject("properties").startObject("id").field("type","integer").endObject().startObject("username").field("type","keyword").endObject().startObject("password").field("type","keyword").endObject().startObject("nickname").field("type","text").endObject().startObject("money").field("type","long").endObject().startObject("address").field("type","text").endObject().startObject("state").field("type","integer").endObject().endObject().endObject();//3. 将settings和mappings封装到一个Request对象CreateIndexRequest request = new CreateIndexRequest(index).settings(settings).mapping(type,mappings);//4. 通过client对象去连接ES并执行创建索引CreateIndexResponse resp = client.indices().create(request, RequestOptions.DEFAULT);//5. 输出System.out.println("resp:" + resp.toString());}@Testpublic void bulkCreateDoc() throws IOException {//1. 准备多个json数据Customer c1 = new Customer();c1.setId(1);c1.setUsername("haier");c1.setPassword("111111");c1.setNickname("海尔集团");c1.setMoney(2000000L);c1.setAddress("青岛");c1.setState(1);Customer c2 = new Customer();c2.setId(2);c2.setUsername("lianxiang");c2.setPassword("111111");c2.setNickname("联想");c2.setMoney(1000000L);c2.setAddress("联想");c2.setState(1);Customer c3 = new Customer();c3.setId(3);c3.setUsername("google");c3.setPassword("111111");c3.setNickname("谷歌");c3.setMoney(1092L);c3.setAddress("没过");c3.setState(1);ObjectMapper mapper = new ObjectMapper();String json1 = mapper.writeValueAsString(c1);String json2 = mapper.writeValueAsString(c2);String json3 = mapper.writeValueAsString(c3);//2. 创建Request,将准备好的数据封装进去BulkRequest request = new BulkRequest();request.add(new IndexRequest(index,type,c1.getId().toString()).source(json1, XContentType.JSON));request.add(new IndexRequest(index,type,c2.getId().toString()).source(json2,XContentType.JSON));request.add(new IndexRequest(index,type,c3.getId().toString()).source(json3,XContentType.JSON));//3. 用client执行BulkResponse resp = client.bulk(request, RequestOptions.DEFAULT);//4. 输出结果System.out.println(resp.toString());}}

个人笔记,思路,仅供参考

RabbitMQ学习之旅相关推荐

  1. 如何系统地自学python100天_Github上发布了一个Python学习秘笈,从萌新到王者的100天Python学习之旅...

    北京千锋互联科技有限公司成都分公司骆昊(jackfrued)在Github上发布了一个Python学习秘笈,从萌新到王者的100天Python学习之旅. 简单的说,Python是一个"优雅& ...

  2. hadoop学习之旅1

    大数据介绍 大数据本质也是数据,但是又有了新的特征,包括数据来源广.数据格式多样化(结构化数据.非结构化数据.Excel文件.文本文件等).数据量大(最少也是TB级别的.甚至可能是PB级别).数据增长 ...

  3. 基于设计模式的学习之旅-----访问者模式(附源码)

    基于设计模式的学习之旅-----访问者模式 1.初始访问者模式 2.什么是访问者模式 表示一个作用于某对象结构中的各元素的操作.它使你可以在不改变各元素的类的前提下定义作用于这些元素的新操作. 3.模 ...

  4. WCF学习之旅—WCF服务的WAS寄宿(十二)

    上接    WCF学习之旅-WCF服务部署到IIS7.5(九) WCF学习之旅-WCF服务部署到应用程序(十) WCF学习之旅-WCF服务的Windows 服务程序寄宿(十一) 八.WAS宿主 IIS ...

  5. RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列

    上一篇已经讲了Rabbitmq如何在Windows平台安装,不懂请移步:RabbitMQ学习系列一:windows下安装RabbitMQ服务 一.理论: .net环境下,C#代码调用RabbitMQ消 ...

  6. 基于 Android NDK 的学习之旅-----资源释放

    基于 Android NDK 的学习之旅-----资源释放 做上一个项目的时候因为与C引擎交互频繁,有时候会突然莫名其妙的的整个应用程序直接挂掉.因为我是学Java 开始的,所以对主动释放内存没多大概 ...

  7. java message bus_【Microsoft Azure学习之旅】消息服务Service Bus的学习笔记及Demo示例...

    今年项目组做的是Cloud产品,有幸接触到了云计算的知识,也了解并使用了当今流行的云计算平台Amazon AWS与Microsoft Azure.我们的产品最初只部署在AWS平台上,现在产品决定同时支 ...

  8. 基于 Android NDK 的学习之旅----- C调用Java

    2019独角兽企业重金招聘Python工程师标准>>> 基于 Android NDK 的学习之旅----- C调用Java 许多成熟的C引擎要移植到Android 平台上使用 , 一 ...

  9. 我的angularjs源码学习之旅2——依赖注入

    依赖注入起源于实现控制反转的典型框架Spring框架,用来削减计算机程序的耦合问题.简单来说,在定义方法的时候,方法所依赖的对象就被隐性的注入到该方法中,在方法中可以直接使用,而不需要在执行该函数的时 ...

最新文章

  1. squid中的X-Cache和X-Cache-Lookup的意义
  2. Scrum Mastery:有效利用组织的5个步骤
  3. [BZOJ1187]神奇游乐园(插头DP)
  4. linux sleep alarm,Linux环境编程之信号处理(三、利用alarm()和pause()函数实现sleep()函数)...
  5. django入门三(视图)
  6. input框传值是怎么才能是整形_做了这些项目,到底多久才能化妆?
  7. 消息队列 ActiveMQ 、RocketMQ 、RabbitMQ 和 Kafka 如何选择?
  8. 【前端必备】七、页面性能优化
  9. 推荐几本对于学习WebGL有帮助的书籍
  10. python2.7图像局部增强_Python OpenCV图像增强
  11. gridview為什麼分頁後,GridView1_RowDataBound就運行不了
  12. python读取dat文件并保存为Excel格式
  13. 梯度消失、梯度爆炸产生的原因
  14. 华南x79主板u盘装系统教程_[Hackintosh] X79黑苹果
  15. python中readlines是什么意思_python中read、readline、readlines之间的区别
  16. ArcGIS 坡度分析及坡度集中在80-90间的原因
  17. 发送邮件常见出错代码
  18. 快速入门 | 篇十七:运动控制器多轴插补运动指令的使用
  19. git Filename too long解决方案
  20. rundeck上创建project和job

热门文章

  1. 光猫+路由器双工配置
  2. 关于mplayer dvd seek定位函数dvd_seek_to_time
  3. 前锋java教学大纲,【人教版初中英语教学大纲模板资讯】人教版初中英语教学大纲模板足球知识与常识 - 足球百科 - 599比分...
  4. root后开启指纹支付,root之后不能用指纹支付
  5. 【解救ROS】clion2022.2.2的安装永久使用教程
  6. 用python定时自动发微博_Python脚本实现自动发带图的微博
  7. 太原理工大学移动应用软件开发技术实验报告
  8. 如何快速体验腾讯云区块链长安链
  9. “独立站+私域”的DTC直客模式电商,是告别互联网内卷唯一有效方式
  10. C语言——数组定义及用法