写作时间:2019-12-01
Spring Boot: 2.1 ,JDK: 1.8, IDE: IntelliJ IDEA

1. 说明

响应式编程操作RedisLettuce能够支持Reactive方式。
Spring Data Redis 中主要的支持:

  1. ReactiveRedisConnection
  2. ReactiveRedisConnectionFactory
  3. ReactiveRedisTemplate
    opsForXxx()

响应式编程的概念请参考:
第三十篇:SpringBoot Reactor响应式编程介绍
第三十一篇:SpringBoot Reactor响应式编程实战一。

2. Docker 启动Redis

查看历史上启动过的Redis进程

% docker ps -a
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS                      PORTS                      NAMES
8eb8d32453bb        redis               "docker-entrypoint.s…"   2 months ago        Up 19 minutes               0.0.0.0:6379->6379/tcp     redis

启动Redis进程

% docker start redis
redis

启动Redis客户端, 查看所有的keys,目前是空的。

% docker exec -it redis redis-cli
127.0.0.1:6379> keys *
(empty list or set)

详细解释可以参考:
第廿三篇:SpringBoot之Docker入门
第三十二篇:Redis Docker入门

3. 工程建立

参照教程【SpringBoot 2.1 | 第一篇:构建第一个SpringBoot工程】新建一个Spring Boot项目,名字叫RPCClient, 在目录src/main/java/resources 下找到配置文件application.properties,重命名为application.yml

Spring Boot 版本选择2.1.10,依赖勾选Developer Tools > Lombok.
SQL > Spring Data JDBC, H2 Database
NoSQL > Spring Data Reactive Redis

4. application.yml配置文件

spring:redis:host: "localhost"

5. 数据库建表和初始化数据

application.yml的同级目录,创建文件schema.sql

drop table t_coffee if exists;create table t_coffee (id bigint auto_increment,create_time timestamp,update_time timestamp,name varchar(255),price bigint,primary key (id)
);

application.yml的同级目录,创建文件data.sql

insert into t_coffee (name, price, create_time, update_time) values ('espresso', 2000, now(), now());
insert into t_coffee (name, price, create_time, update_time) values ('latte', 2500, now(), now());
insert into t_coffee (name, price, create_time, update_time) values ('capuccino', 2500, now(), now());
insert into t_coffee (name, price, create_time, update_time) values ('mocha', 3000, now(), now());
insert into t_coffee (name, price, create_time, update_time) values ('macchiato', 3000, now(), now());

6. 创建Model

com.zgpeace.reactive.Coffee

package com.zgpeace.reactive;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Coffee {private Long id;private String name;private Long price;
}

7. Controller

package com.zgpeace.reactive;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveHashOperations;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;@SpringBootApplication
@Slf4j
public class ReactiveApplication  implements ApplicationRunner {private static final String KEY = "COFFEE_MENU";@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate ReactiveStringRedisTemplate redisTemplate;public static void main(String[] args) {SpringApplication.run(ReactiveApplication.class, args);}@BeanReactiveStringRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {return new ReactiveStringRedisTemplate(factory);}@Overridepublic void run(ApplicationArguments args) throws Exception {ReactiveHashOperations<String, String, String> hashOps = redisTemplate.opsForHash();CountDownLatch cdl = new CountDownLatch(1);List<Coffee> list = jdbcTemplate.query("select * from t_coffee", (rs, i) ->Coffee.builder().id(rs.getLong("id")).name(rs.getString("name")).price(rs.getLong("price")).build());Flux.fromIterable(list).publishOn(Schedulers.single()).doOnComplete(() -> log.info("list ok")).flatMap(c -> {log.info("try to put {},{}", c.getName(), c.getPrice());return hashOps.put(KEY, c.getName(), c.getPrice().toString());}).doOnComplete(() -> log.info("set ok")).concatWith(redisTemplate.expire(KEY, Duration.ofMinutes(1))).doOnComplete(() -> log.info("expire ok")).onErrorResume(e -> {log.error("exception {}", e.getMessage());return Mono.just(false);}).subscribe(b -> log.info("Boolean: {}", b),e -> log.error("Exception {}", e.getMessage()),() -> cdl.countDown());log.info("Waiting");cdl.await();}
}

解析:

  1. jdbcTemplate.query 查询数据库中数据,并组装为List
  2. Flux.fromIterable(list)遍历列表
  3. publishOn(Schedulers.single())在单线程中Publisher执行
  4. flatMap 把数组存入redis
  5. concatWith(redisTemplate.expire(KEY, Duration.ofMinutes(1))) 设置Redis数据过期时间为1分钟。
  6. CountDownLatch 同步线程锁,一次只允许一个线程任务处理。

结果输出如下

Bootstrapping Spring Data repositories in DEFAULT mode.
[           main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 1ms. Found 0 repository interfaces.
[           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
[           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
[           main] c.zgpeace.reactive.ReactiveApplication   : Started ReactiveApplication in 1.279 seconds (JVM running for 1.779)
[           main] c.zgpeace.reactive.ReactiveApplication   : Waiting
[       single-1] c.zgpeace.reactive.ReactiveApplication   : try to put espresso,2000
[       single-1] io.lettuce.core.EpollProvider            : Starting without optional epoll library
[       single-1] io.lettuce.core.KqueueProvider           : Starting without optional kqueue library
[       single-1] c.zgpeace.reactive.ReactiveApplication   : try to put latte,2500
[       single-1] c.zgpeace.reactive.ReactiveApplication   : try to put capuccino,2500
[       single-1] c.zgpeace.reactive.ReactiveApplication   : try to put mocha,3000
[       single-1] c.zgpeace.reactive.ReactiveApplication   : try to put macchiato,3000
[       single-1] c.zgpeace.reactive.ReactiveApplication   : list ok
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication   : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication   : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication   : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication   : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication   : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication   : set ok
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication   : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication   : expire ok
[       Thread-5] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
[       Thread-5] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

8. 查看Redis数据

在Terminal中查看Redis数据

127.0.0.1:6379> keys *
1) "COFFEE_MENU"
127.0.0.1:6379> hgetall COFFEE_MENU1) "espresso"2) "2000"3) "latte"4) "2500"5) "capuccino"6) "2500"7) "mocha"8) "3000"9) "macchiato"
10) "3000"

9. 代码下载

https://github.com/zgpeace/Spring-Boot2.1/tree/master/reactor/ReactiveRedisDemo

10. 参考

https://github.com/geektime-geekbang/geektime-spring-family

易筋SpringBoot 2.1 | 第三十三篇:SpringBoot Reactor响应式编程实战二 Redis Lettuce相关推荐

  1. springboot异步注解_Spring Boot 2 :Spring Boot 中的响应式编程和 WebFlux 入门

    [小宅按]Spring 5.0 中发布了重量级组件 Webflux,拉起了响应式编程的规模使用序幕. WebFlux 使用的场景是异步非阻塞的,使用 Webflux 作为系统解决方案,在大多数场景下可 ...

  2. java9 反应编程_Java9第四篇-Reactive Stream API响应式编程

    file 我计划在后续的一段时间内,写一系列关于java 9的文章,虽然java 9 不像Java 8或者Java 11那样的核心java版本,但是还是有很多的特性值得关注.期待您能关注我,我将把ja ...

  3. SpringBoot响应式编程(整理一)

    1.SprinBoot响应式编程简介 基础理解: 依赖于事件,事件驱动(Event-driven) 一系列事件称为"流" 异步 非阻塞 观察者模式 (1)Spring WebFlu ...

  4. RXJava2响应式编程框架设计三---Rxjava2背压、生命周期

    在上一次https://www.cnblogs.com/webor2006/p/12348890.html中已经完成了对RxJava2的整个线程切换原理的详细剖析了,这次继续来学习它其它比较重要的知识 ...

  5. Kotlin 协程 (5/7篇) - 响应式编程(冷流) Flow

    一.概念 场景:List 是同步一次性返回多个值,Sequence 是同步(阻塞)分开返回多个值,Suspend 是异步返回单个值,Flow可以异步返回多个值. 冷流:只能在创建的时候定义生产数据的代 ...

  6. c# 基于layui的通用后台管理系统_【SpringBoot】三十三、SpringBoot+LayUI后台管理系统开发脚手架...

    点击蓝色字免费订阅,每天收到这样的好信息 前言:最近有不少粉丝关注本公众号.并且我已经成功开通了流量主同时会赚一点点广告费,我打算每个月把这部分钱拿出来给大家买点书刊,算是给大家一点福利吧.大家想买什 ...

  7. Android响应式编程(一)RxJava前篇[入门基础]

    1.RxJava概述 ReactiveX与RxJava 在讲到RxJava之前我们首先要了解什么是ReactiveX,因为RxJava是ReactiveX的一种java实现. ReactiveX是Re ...

  8. Android UI开发第三十三篇——Navigation Drawer For Android API 7

    Creating a Navigation Drawer中使用的Navigation Drawer的android:minSdkVersion="14",现在Android API ...

  9. 第三十三篇 transition-group 列表过渡

    上一篇内容讲到的是transiotion,其中还记得有一个报错吗?如下: 先来回顾一下,<transition> 只能用于单个元素,如果在<transition>单中并列两个& ...

  10. 第三十三篇,网络编程TCP协议通讯过程实现和函数接口

    网络编程知识点概览 ========================================    1.核心知识点        传输层的两个重要协议             tcp协议:   ...

最新文章

  1. targetcli读取prefs.bin出错的问题的解决方法
  2. 华为成为全球领先的移动设备供应商
  3. 敏捷转型该怎么转?来看看这本书怎么说的吧
  4. Windows杀死结束进程图形化界面操作方法
  5. mysql备份与还原-mysqldump备份、mysql与source还原
  6. idea java8_太赞了,Intellij IDEA 竟然把 Java8 的数据流问题这么完美的解决掉了!...
  7. 通过反射突破private访问修饰符获取私有和公共的属性,设置和获取值
  8. 记住:永远不要在MySQL中使用UTF-8
  9. Java、JS、C++ 依然强势,Go、Kotlin 等新语言为何不能破局?
  10. GO语言学习之路26
  11. Linux操作系统中readelf命令常见用法
  12. CICD详解(二)——Jenkins持续集成原理
  13. leetcode 226 Invert Binary Tree 翻转二叉树
  14. android ion --system heap(个人理解,不确定完全对)
  15. scrum角色及其职责介绍
  16. 【灵感一剖】WPS简历助手使用反馈
  17. 大数据Hive(四):Hive查询语法
  18. REMOTE HOST IDENTIFICATION HAS CHANGED问题解决
  19. 小红书品牌营销白皮书(2021半年报)
  20. word骨灰级水平,赶紧留一份

热门文章

  1. 实验一matlab编程基础,实验一、MATLAB编程基础及典型实例.doc
  2. [转]余弦cos计算相似度
  3. asp.net2.0 发布网站选项说明
  4. 飘刃 0.1.1 发布,速度碾压 Vue-CLI 的轻量级 Vue 项目构建工具
  5. 整合SSH 遇到错误之一
  6. Know Difference between Oracle Reserved Words and Keywords
  7. 向云上迁移数据时如何避免停机和中断
  8. jQuery增加删除修改tab导航特效
  9. 以德服人——合格的产品经理
  10. .NET程序员修炼之道