1.1需求

数据库300 万条用户数据 ,遍历获取所有用户, 各种组合关联, 获取到一个新的json ,存到redis 上。

1.2 难点

数据库比较多, 不可能单线程查询所有的数据到内存。

1.3解决办法

多线程读取, 生产者 每次获取200 条数据, 消费者去消费。(这里 主要是根据MySQL分页去获取下一个200 条数据)

1.4 代码

1.4.1 调用方法

/*** 线程启动*/public void update() {//redis操作类HashRedisUtil redisUtil= HashRedisUtil.getInstance();//生产者消费者ProducerConsumer pc = new ProducerConsumer();//数据仓库Storage s = pc.new Storage();ExecutorService service = Executors.newCachedThreadPool();//一个线程进行查询Producer p = pc.new Producer(s,userMapper);service.submit(p);System.err.println("生产线程正在生产中。。。。。。。。。");//是个线程进行修改for(int i=0;i<10;i++){System.err.println("消费线程"+i+"正在消费中。。。。。。。。。。");service.submit(pc.new Consumer( redisUtil,userMapper,s));}}
复制代码

1.4.2 主要核心类

package com.ypp.thread;import java.math.BigDecimal;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.LocalDateTime;import com.alibaba.fastjson.JSONObject;
import com.ypp.constants.Constants;
import com.ypp.mapper.UserMapper;
import com.ypp.model.User;
import com.ypp.model.UserAlis;
import com.ypp.model.UserBaseModel;
import com.ypp.model.UserVip;
import com.ypp.util.HashRedisUtil;
import com.ypp.util.JsonUtils;
import com.ypp.util.PHPSerializer;public class ProducerConsumer {private static Logger logger = Logger.getLogger(ProducerConsumer.class);//这个page 是核心, 全局变量, 当生产者生产一次 ,获取200 个用户, 会把这个page++, 下次获取就是后一个200 条用户了private static Integer page = 0;//消费者public class Consumer implements Runnable {private HashRedisUtil redisUtil;private UserMapper userMapper;private Storage s = null;public Consumer(HashRedisUtil redisUtil, UserMapper userMapper, Storage s) {super();this.redisUtil = redisUtil;this.userMapper = userMapper;this.s = s;}public void run() {try {while (true) {User users = s.pop();long bbb = System.currentTimeMillis();// 获取一个用户的粉丝列表 并存到redistry {fansUpdate(users.getToken(), users.getUserId(), redisUtil);} catch (Exception e1) {e1.printStackTrace();}// 获取一个用户的关注列表, 并存到redistry {followUpdate(users.getToken(), users.getUserId(), redisUtil);} catch (Exception e) {e.printStackTrace();}// 获取一个用户的黑名单, 并存到redistry {blackUpdate(users.getToken(), users.getUserId(), redisUtil);} catch (Exception e) {e.printStackTrace();}// 用户基本信息try {userbaseUpdate(users.getToken(), users.getUserId(), redisUtil);} catch (Exception e) {e.printStackTrace();}long ccc = System.currentTimeMillis();System.out.println("用户:" + users.getToken() + " 全部总共耗时:" + (ccc - bbb) + "毫秒");Thread.sleep(500);}} catch (InterruptedException e) {e.printStackTrace();}}public List<User> getUserInfo(Integer iThread) {return userMapper.findUserInfo((iThread - 1) * 200 + 1);}/*** 用户基本信息修改* * @param token* @param myuserId* @param redisUtil* @throws Exception*/private void userbaseUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception {}/*** 更新一个用户的黑名单(原来的token改成userID)* * @param token* @param string* @param redisUtil* @throws Exception*/private void blackUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception {}/*** 获取一个用户的关注* * @param token* @param string* @param redisUtil* @throws Exception*/private void followUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception {}/*** 获取一个用户的粉丝列表* * @param token* @param userId* @param redisUtil* @throws Exception*/private void fansUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception {}//生产者public class Producer implements Runnable {private Storage s = null;private UserMapper mapper ;public Producer( Storage s, UserMapper mapper) {this.s = s;this.mapper = mapper;}public void run() {try {while (true) {System.err.println("当前分页是:"+page+"****************************************");List<User> list= mapper.findUserInfo(page);s.push(list);page++;}} catch (InterruptedException e1) {e1.printStackTrace();}}}//数据仓库public class Storage {BlockingQueue<User> queues = new LinkedBlockingQueue<User>(200);/*** 生产* * @param p*            产品* @throws InterruptedException*/public void push(List<User> p) throws InterruptedException {for(User user:p){queues.put(user);}}/*** 消费* * @return 产品* @throws InterruptedException*/public User pop() throws InterruptedException {return queues.take();}}}

Java 线程池 +生产者消费者+MySQL读取300 万条数据相关推荐

  1. Java线程实现生产者—消费者模式

    在这里插入代码片# Java 线程实现生产者-消费者模式 ##思路:实现类似消费者生产者线程之间通讯的功能,每创建一个工人,就让这个工人干活,干一段时间,工人自动消失,然后又去创建一个工人干活: 代码 ...

  2. JAVA线程之生产者消费者问题

    复习下JAVA线程基础知识: 1.线程的状态: 创建状态:创建了线程对象,此时线程有了相应的内存空间和其他资源,但处于不可运行状态. 就绪状态:线程对象调用start()方法启动线程,进入就绪状态,此 ...

  3. 复杂业务下向Mysql导入30万条数据代码优化的踩坑记录

    从毕业到现在第一次接触到超过30万条数据导入MySQL的场景(有点low),就是在顺丰公司接入我司EMM产品时需要将AD中的员工数据导入MySQL中,因此楼主负责的模块connector就派上了用场. ...

  4. 插入2万调数据耗时_教你如何6秒钟往MySQL插入100万条数据!然后删库跑路!

    一.思路 往MySQL中插入1000000条数据只花了6秒钟! 关键点: 1.使用PreparedStatement对象 2.rewriteBatchedStatements=true 开启批量插入, ...

  5. mysql插10万条数据_MySQL数据库插入100w条数据要花多久?

    MySQL数据库插入100w条数据要花多久? 1.多线程插入(单表) 2.多线程插入(多表) 3.预处理SQL 4.多值插入SQL 5.事务(N条提交一次) # 多线程插入(单表) 问:为何对同一个表 ...

  6. 使用sql语句往MySQL插入1000万条数据

    在学习或者工作生产环境中,我们经常要对数据库进行压力测试,往数据库中批量插入大量数据,这里我往Mysql中批量插入大量数据,采用存储过程的方法实现. 数据库版本:Mysql5.7 一.建表 1.创建数 ...

  7. 锁, threading.local, 线程池, 生产者消费者模型

    一. 锁:Lock (1次放1个) 线程安全,多线程操作时,内部会让所有线程排队处理.如:list/dict/Queue     线程不安全 + 人 => 排队处理. 需求:         a ...

  8. Java线程实现生产者消费者模式

    1 什么是生产者消费者模式 想一个现实生活中的例子,啤酒商---超市---消费者也就是我们,啤酒商生产了啤酒,然后将啤酒销售给了超市,我们消费之又会到超市将啤酒买回来自己喝,那么啤酒商和消费者之间是什 ...

  9. java线程模拟生产者消费者问题

    所谓的生产者消费者问题,就是存在生产者和消费者两个线程,当仓库还没满的时候,生产者可以生产,当仓库没空的时候,消费者可以取走商品. 我们用实例说话: 下面,我们创建几个类: 1.消费者--主要负责消费 ...

最新文章

  1. 年度书单盘点 | “裁员潮”持续蔓延?职场没有铁饭碗,只有硬技能
  2. [二叉树建树] 后序遍历与中序遍历建立二叉树
  3. GNU C中x++是原子操作吗?
  4. iterm php,iTerm2笔记
  5. chromium浏览器开发系列第五篇:Debugging with WinDBG
  6. 苹果:我们一直在App Store上展示竞争对手的应用程序
  7. python字符串追加字符_Python字符串追加
  8. missingno库—缺失值可视化
  9. 常用的限流框架都在这里了!
  10. Server Tomcat v8.5 Server at localhost was unable to start within 45 seconds. If the server requires
  11. linux的boot可用fat格式吗,u-boot中的FAT命令
  12. python数据结构-单链表
  13. 仿微信朋友圈图片上传
  14. 高速串行计算机扩展总线标准,高速串行计算机扩展总线标准Bosch Sensortec开发出BMP384...
  15. js连接mqtt进行通信
  16. python中关于命名的例子_利用Python批量重命名文件(给非技术人员的Python实例参考)...
  17. STM32 I2C驱动0.96寸OLED屏
  18. Prim算法实现最小生成树(Java)
  19. 怎么用matlab画误差椭圆,matlab画误差椭圆
  20. 最全最佳的wordpress插件汇总推荐-php

热门文章

  1. body英语什么意思是什么_body是什么意思_body翻译_读音_用法_翻译
  2. html背景图片带边框,在线给图片加边框和背景
  3. 学习笔记--人人都是产品经理
  4. Sentieon软件UMI单分子标记处理模块发布,大幅提升准确度和速度
  5. FFmpeg AVPacket和av_packet_unref函数剖析
  6. 2023 年 3 月阿拉丁指数榜单更替率 18%
  7. 【转载】pygame.key 键值说明
  8. 一维条码与二维条码的码制区别
  9. ISP——黑电平矫正(Black Level correction, BLC)
  10. android开发中Settings结构简单分析