1. 需求

1)需求说明

a. 请求服务器发送到kafka中的请求,包含最少一个终端的信息,如ID
b. 增加一个数据库,保存评估结果。
c. 终端循环向请求服务器发送http请求
d. 请求处理服务器收到终端的请求,先访问数据库,得到评估结果并返回,如果没有结果,就将请求内容发送到kafka。

2)系统框架流程图

2. 实现思路

1)建立数据库,数据库表
2)虚拟发送请求信息—user_id, pic — kafka中的生产者
3)将图片名称自动更新并存入临时文件夹
4)得到传过来的图片编号和用户id
5)查询数据库,看对应图片编号是否存在 — kafka中的消费者接收

(1) 存在,看是否有score;
a. 存在,直接返回score;
b. 不存在,将图片编号和用户id存入数据库,将图片传入Kafka中,评估系统进行处理,得到一个score,存入对应图片编号的score,返回score。
(2) 不存在,将图片编号与用户id存入数据库

3.数据库逻辑部分

1)建表

SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for `assess`
-- ----------------------------
DROP TABLE IF EXISTS `assess`;
CREATE TABLE `assess` (`a_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',`user_id` int(11) NOT NULL COMMENT '用户id',`score` float DEFAULT NULL COMMENT '评估分数',`pic_name` char(255) DEFAULT NULL COMMENT '图片名称-不重复',PRIMARY KEY (`a_id`)
) ENGINE=MyISAM AUTO_INCREMENT=16 DEFAULT CHARSET=utf8;

2)Dao层功能

Dao层主要实现四个功能:

  1. 通过图片编号查找该图片是否存在
// 通过图片编号查找该图片是否存在public boolean isExist(String pic){Connection conn=null;Statement st=null;ResultSet rs=null;try {String sql = "select count(*) from assess where pic_name = '"+pic+"';";System.out.println(sql);conn = BaseDao.getConnection();st = conn.createStatement();rs = st.executeQuery(sql);//如果找到返回truewhile (rs.next()){if(rs.getBoolean(1))return true;elsereturn false;}} catch (SQLException e) {e.printStackTrace();}finally {BaseDao.closeAll(rs, st, conn);}return false;}
  1. 将用户id和图片编号加入数据库
//将用户id和图片编号加入数据库public void addData(int user_id, String pic){Connection con=null;ResultSet rs=null;Statement st=null;try {String sql = "INSERT INTO `assess` (`user_id`, `pic_name`) VALUES ('"+user_id+"', '"+pic+"');\n";System.out.println(sql);con= BaseDao.getConnection();st=con.createStatement();st.execute(sql);}catch(Exception e) {e.printStackTrace();}finally{BaseDao.closeAll(rs, st, con);}}
  1. 将评估系统评估得到的分数加入数据库
//将score加入数据库public void addScore(float score, String pic){Connection con=null;ResultSet rs=null;Statement st=null;try {String sql = "UPDATE `assess` set assess.score = '"+score+"' where assess.pic_name = '"+pic+"';";System.out.println(sql);con= BaseDao.getConnection();st=con.createStatement();st.execute(sql);}catch(Exception e) {e.printStackTrace();}finally{BaseDao.closeAll(rs, st, con);}}
  1. 根据图片编号查询评估分数
//根据图片编号查询评估分数public float findScore(String pic){Connection con=null;ResultSet rs=null;Statement st=null;Assess assess = new Assess();try {String sql="SELECT * FROM assess where pic_name = "+pic+";";System.out.println(sql);con= BaseDao.getConnection();st=con.createStatement();rs = st.executeQuery(sql);if (rs.next()){assess.setScore(rs.getFloat("score"));}}catch(Exception e) {e.printStackTrace();}finally{BaseDao.closeAll(rs, st, con);}return assess.getScore();}

4. 处理图片和传入用户id(虚拟API实现)

/*** 获取需要的数据 - user_id, pic* 将得到的图片保存到指定路径 - 临时文件夹,名称按时间戳自动更新*/public String getPic() throws IOException{URL url1 = new URL("http://pic38.nipic.com/20140225/2531170_214014788000_2.jpg");URLConnection uc = url1.openConnection();InputStream inputStream = uc.getInputStream();String path = "D:\\image\\"+System.currentTimeMillis()+".jpg";FileOutputStream out = new FileOutputStream(path);int j = 0;while ((j = inputStream.read()) != -1) {out.write(j);}inputStream.close();//得到picString pic = path.substring(9, path.length()-4);System.out.println(pic);return pic;}public static int getUid(){//得到user_idint user_id = 1;return user_id;}

5. Kafka生产者

public class KafkaProducer extends Thread
{private final Producer<String, String> producer;private final String topic;private final Properties props = new Properties();public KafkaProducer(String topic){props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "172.18.37.14:9092");props.put("partitioner.class", "kafka.kafka.SimplePartitioner"); //指定分区规则的类producer = new Producer<String, String>(new ProducerConfig(props));this.topic = topic;}@Overridepublic void run() {Partitioner Partitioner = new SimplePartitioner();// 这里虚拟发送请求 user_id , pic// 接收请求,处理图片等信息DownloadImage img = new DownloadImage();int user_id = img.getUid();String pic = null;try {pic = img.getPic();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}String k = "key" + System.currentTimeMillis(); //key+当前毫秒,形成唯一key,也可以使用时间戳String v = pic + " " + user_id; //消息内容int partition = Partitioner.partition(k, 1); //计算消息key所保存的分区号,2是分区数,实际生产这里可以不用,因为要方便观看所以输出分区System.out.println("发送消息---"+"topic:" + topic + ", partition:" + partition + ", key:" + k + ", value:" +v);producer.send(new KeyedMessage<String, String>(topic, k, v));producer.close();}}

6. Kafka消费者

public class KafkaConsumerProducerDemo extends Thread{public static void main(String[] args) throws InterruptedException {KafkaProducer producer = new KafkaProducer(KafkaProperties.topic);producer.start();KafkaConsumerProducerDemo consumer1 = new KafkaConsumerProducerDemo("consumer-1", KafkaProperties.topic);//消费者consumer-1consumer1.start();}private String consumerName = null;private final String topic;public KafkaConsumerProducerDemo ( String consumerName, String topic) {this.consumerName = consumerName;this.topic = topic;} @Overridepublic void run() {//System.out.println("消费者:" +consumerName + "开始处理消息");Properties props = new Properties();props.put("group.id", KafkaProperties.groupId);props.put("zookeeper.connect", KafkaProperties.zkConnect);//连接 zookeeperprops.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "smallest"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()) {saveData save = new saveData(); //连接数据库MessageAndMetadata<byte[], byte[]> mam = it.next();String receive = new String(mam.message());String[] list = receive.split(" ");String pic = list[0];String id = list[1];int user_id = Integer.parseInt(id);// 查询数据库,看对应图片编号是否存在scoreif(save.isExist(pic)){if(save.findScore(pic) != 0.0){System.out.println("数据库中存在图片编号,并且分数不为0,取出分数");System.out.println("图片对应分数为:" + save.findScore(pic));}else{System.out.println("没有分数,进行下一步处理");// 1. 调用评估函数TestSo ts = new TestSo();float score = 0;// 2. 得到一个scorescore = (float) ts.add(80);System.out.println("评估系统进行评估(虚拟评估),得到分数:"+score);// 3. 将评估系统计算完后的分数存入数据库save.addScore(score, pic);System.out.println("图片对应分数为:" + save.findScore(pic));}}else{System.out.println("图片不存在,将用户id和图片编号存入数据库");save.addData(user_id, pic);// 1. 调用评估函数TestSo ts = new TestSo();float score = 0;// 2. 得到一个scorescore = (float) ts.add(80);System.out.println("评估系统进行评估(虚拟评估),得到分数:"+score);// 3. 将评估系统计算完后的分数存入数据库save.addScore(score, pic);System.out.println("图片对应分数为:" + save.findScore(pic));}//            MessageAndMetadata<byte[], byte[]> mam = it.next();System.out.println("消费者消费的消息:" + "consumer: " + consumerName + ", Partition: " + mam.partition() + ", Message: " + new String(mam.message()) + ", Offset: " + mam.offset() + "");try {sleep(3000);} catch (InterruptedException e) { e.printStackTrace();}}}
}

7. 评估系统

评估系统主要是调用其他项目组完成的C++编写的库函数来使用,这里只模拟了怎样调用.so函数,具体在我这篇文章中有讲:https://blog.csdn.net/qq_37842366/article/details/100929963

8. linux上测试

将代码打包成jar包,放到linux服务器上,运行jar包
生产者发送消息:

消费者接收并处理:

Kafka处理服务器发来的消息并与数据库交互——具体流程相关推荐

  1. JAVA对接公众号(二、处理微信服务器发来的消息)

    一.验证公众号配置的服务器信息. 须知:处理微信服务器发来的消息之前必须先通过公众号配置的服务器验证 获取AccessToken,里面的HttpClientUtil类可以从我csdn资源中找 /*** ...

  2. 微信小程序接受服务器发过来的消息,微信小程序API 接收消息和事件

    接收消息和事件 当用户在客服会话发送消息.或由某些特定的用户操作引发事件推送时,微信服务器会将消息或事件的数据包发送到开发者填写的 URL,如果使用的是云开发,则可以推送到指定的云函数(详情请参考消息 ...

  3. 虎翼主机 mysql_Discuz!发论坛短消息提示mysql数据库错误

    pop:之前在其他的主机转到虎翼门户通的空间,只记得修改主论坛数据库的配置,忘记了这个ucenter的配置了. 症状: 登陆.发帖等均正常的,但是不能发论坛短消息,提示如下错误信息 Can not c ...

  4. 微信小程序asp服务器架设,小程序与ASP做数据库交互

    bindtest: function () { wx.request({ url: 'http://175.102.0.95/erp/1c.asp',  //本地服务器地址 data: { S_Spe ...

  5. 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯

    作者 | 陈屹       责编 | 欧阳姝黎 近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求.目前实现千万级高并发海量数据请求的服务器设计在"套路 ...

  6. send函数给FTP服务器发消息,send函数给FTP服务器发消息

    send函数给FTP服务器发消息 内容精选 换一换 Kafka系列2:深入理解Kafka消费者上篇聊了Kafka概况,包含了Kafka的基本概念.设计原理,以及设计核心.本篇单独聊聊Kafka的生产者 ...

  7. telnet给服务器发消息,Telnet按字符发送字符串

    我写的代码可以发送消息给服务器.问题是,如果我捕捉Wireshark的沟通,从我的应用程序发送我的字符串的消息是这样的:Telnet按字符发送字符串 hello - 1 packet 如果我检查的Te ...

  8. android 通过xmpp即时聊天客户端往服务器发消息,利用XMPP协议推送服务器告警信息到安卓平台及桌面...

    XMPP的前身是Jabber,一个开源形式组织产生的网络即时通信协议. XMPP目前被IETF国际标准组织完成了标准化工作.标准化的核心结果分为两部分: 核心的XML流传输协议 基于XML流传输的即时 ...

  9. telnet给服务器发消息,[摘抄]使用telnet命令直接发送

    需要注意的是,由于现在邮件服务器大多设置了身份验证,禁止非法连接发送邮件,主要是为了防止垃圾邮件的侵袭,所以以下方法不保障能完全成功,贴在这里供大家参考,了解两台邮件服务器之间的对接过程.如果你操作的 ...

最新文章

  1. linux压缩和解压缩类命令|--zip/unzip指令
  2. cs架构用什么语言开发_C、C++、Go 语言、Linux服务器开发高级架构师进阶之路
  3. spark之CF协同过滤
  4. 【MySQL】在Windows下更改datadir
  5. 可信知识实证在UGC时代情报应用中的思考与探索
  6. Python break/continue - Python零基础入门教程
  7. 卡诺模型案例分析_3个维度看竞品分析!
  8. 切换图片 ImageSwitcher
  9. 理解用户态切换到内核态——内核态下有一个特殊的进程
  10. 情人节福利,用JAVA做个QQ机器人,帮我提醒女神按时喝水和陪她聊天~(开源)
  11. Opera Unite 用户指南
  12. 近24小时以太坊上的DEX交易量超过150亿美元
  13. 2019-0403视觉SLAM的学习第二讲
  14. 蓝桥杯2014c++真题:扑克序列(next_permutation)
  15. 计算机电路图解,几种常见的放大电路原理图解
  16. 大数据毕设/课设 - 基于大数据的全国疫情实时监控大屏系统设计与实现
  17. linux安装python教程视频_新手开发者的极简Linux上手Python视频教程
  18. DNS-域名系统 【应用层】【计算机网络】
  19. 一款好用的基于vue的录屏插件recordrtc,拿走不谢
  20. H.264向H.265的转变及其相关技术

热门文章

  1. .php文件是病毒吗,php病毒
  2. OTA全称为Over-The-Air technology(空中下载技术)
  3. /storage/sdcard0, /sdcard, /mnt/sdcard ,/storage/emulated/legacy 的区别
  4. JaveSE 17 Java基础语法 → 注释奇怪的注释
  5. 3步释放工作和生活压力
  6. 把计算机器显示桌面,怎样将电脑显示器和桌面匹配
  7. 2022商业版游戏陪玩陪聊系统最新源码+视频教程+全套素材
  8. Squeeze-and-Excitation Networks(SENet) 学习笔记
  9. SDOI 2009 学校食堂(好难的状压QAQ
  10. Leetcode算法题-解法转载