java与python之间的混合开发

1. 项目需求

使用java进行进行数据库的访问,并对查询出的数据进行的数据清理,使用python进行无监督分类

应用场景:

. 整个项目是BS架构,基于SpringBoot2.0

. 业务端主要是通过Java来实现后台与数据库的连接与交互

. 算法端主要通过python实现

实现的目标:

  1. Java通过多线程任务调度定时完成任务调度,完成对数据库中的某一数据进行聚类并生成对应的聚类模型,聚类和模型生成由Python实现;
  2. 通过前端调用Resful接口来调取后端的接口,再通过java来与python交互,实现模型的调用来完成新数据的推理;
  3. 整个过程python不涉及查数据库,Java查数据库之后实现数据的清洗和预处理,按照约定的格式传递给python来进行数据的算法实现

2.方案

  1. 可以通过udp或者tcp协议来传递数据

    缺点:a. 不能异步通信;b.安全性不高;c.调试麻烦;d.数据容易丢失

  2. 通过网络接口(Restful)的方式,调用python服务(通过http的POST请求中的Body将数据传入,没有测试过数据量)

  3. 通过消息中间键,来作为数据传递和消息传递的媒介,通过队列的方式来传递数据,实现异步通信,并且通过中间键作为中介有管理系统,安全性高,有对应的管理界面调试比较简单,能够查看推送的数据是什么;使用队列来作为日志信息的输入和输出;

  • 最终确定的方案:选用方案二
    使用消息中间键的好处:

    1. 数据不容易丢

    2. 能够实现异步通信

      异步方式指消息发送方在发送消息时不必知道接收方的状态,更无需等待接收方的回复,而接收方在收到消息时也不必知道发送方的目前状态,更无需进行同步的消息处理,它们之间的连接完全是松耦合的,通信是非阻塞的,这种异步通信方式是由消息中间件中的消息队列及其服务机制保障的

    3. 能够消峰(通过队列的形式,防止一直访问服务)

      用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去

3.技术选型

Java端使用springboot2.0框架来搭建服务,使用MyBattisPlus操作数据库

消息中间键:本文针对的数据量不算特别大,也不需要很高的吞吐量,所以选用稳定且功能完备,spring集成的ActiveMQ

python端使用stomp消息传输框架来对ActiveMq的消息队列进行监督

4. 代码实现

SpringBoot2.0集成ActiveMq

添加pom文件

 <!--ActiveMq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><version>1.5.0.RELEASE</version>
</dependency>
<!--消息队列连接池-->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.0</version>
</dependency>

构建一个生产者服务

java和ActiveMq相关的配置文件

  • 通过yml文件配置
server:port: 777 #生产者,微服务端口号
#ActiveMq
spring:activemq:broker-url: tcp://192.168.1.151:61616  #mq的位置(61616是后台的位置)user: adminpassword: adminjms:pub-sub-domain: false  #false为Queue  true为topic,默认为false,目前正在探索怎么队列和topic一起使用
processQueueName: activemq-queue-data-process
reciveQueueName: activemq-queue-result
trainQueueName: activemq-queue-data-train
reciveFigQueueName: activemq-queue-result_fig
reciveLogTopic: /topic/Log
  • config配置,注意**@Bean**注解的是对象的初始化函数,后面使用Autoware注入的时候,会自动的调用这个初始化函数,生成对应的对象
package com.atguigu.activemq.bootMqProduce.bootMqProduce.config;import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;import javax.jms.Queue;//只要是boot,就把Component标注在上面
//类似于Spring的applicationContext.xml
@Component  //让SpringBoot管理起来
@EnableJms  //开启Jms配置
public class ConfigBean {//从yml中读取对应键值的配置注入@Value("${myqueue}")private String myQueue;//配置了一个ActiveMQ的队列@Bean //配置Bean  有点儿类似于之前Spring的配置<bean id=" " class=" ">public Queue queue(){return new ActiveMQQueue(myQueue);}}
  • 生产消息
package com.atguigu.activemq.bootMqProduce.bootMqProduce.produce;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.jms.Queue;
import java.util.UUID;//这个就相当于impl@Component //只要是boot都要标注
public class QueueProduce {@Autowired  //这是一个通用的jms(java的消息标准)的类,里面定义了java消息的生产消费流程标准private JmsMessagingTemplate jmsMessagingTemplate;@Autowired   //之前在配置里面定义了一个Bean,那么就可以通过@Autowired自动注入进来,是需要配置的private Queue queue;public void produceMsg(){jmsMessagingTemplate.convertAndSend(queue, "*****"+     UUID.randomUUID().toString().substring(0,6));//自动转换后写入}@Scheduled(fixedDelay = 3000) //定时发送,每隔3000ms发送一次public void produceMsgScheduled(){jmsMessagingTemplate.convertAndSend(queue, "*****"+ UUID.randomUUID().toString().substring(0,6));//自动转换后写入System.out.println("***Msg发送成功***");}
}
  • 主函数,启动服务
package com.atguigu.activemq.bootMqProduce.bootMqProduce;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling //定时发送需要打开这个开关
public class MainApp_produce {public static void main(String[] args) {SpringApplication.run(MainApp_produce.class,args);}
}

构建一个消费者服务

  • yml配置和生产者一样

  • config配置(无,只需要指定需要监听得队列即可)

  • 注册消费者监听消息队列

    package com.atguigu.activemq.bootMqConsumer.consumer;import org.springframework.beans.factory.annotation.Value;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;import javax.jms.JMSException;
    import javax.jms.TextMessage;@Component //只要是boot,要么是service,要么是Component
    public class Queue_Consumer {//监听注册,指定需要监听得队列@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws JMSException{System.out.println("***消费者收到消息***"+textMessage);}
    }
    

    注意在这个过程中,python通过stomp协议在进行队列消息得传输过程中使用的二进制流数进行传输的,所以,在java端接收python返回的数据的时候,需要将二进制流先转换为字符串,所以此处不能使用textmessage来定义接收的消息,而是使用BytesMessage来定义接收到的消息

    package com.atguigu.activemq.bootMqConsumer.consumer;import org.springframework.beans.factory.annotation.Value;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;import javax.jms.JMSException;
    import javax.jms.TextMessage;@Component //只要是boot,要么是service,要么是Component
    public class Queue_Consumer {//监听注册,指定需要监听得队列@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws JMSException{System.out.println("***消费者收到消息***"+textMessage);}
    }
    

python端往队列中发送消息和监听队列中的消息

使用stomp消息传输协议

在进行消息消费的时候,采用两种模式

  • 训练模式,只消费对应的队列中最新的消息

  • 推断模式,将队列中的数据来一条处理一条

# -*- coding: utf-8 -*-
"""
Created on Thu Jul 19 09:54:08 2018@author: lihc
"""# -*-coding:utf-8-*-
import stomp
import datetime
import json
from FINCH import *
import joblib
import os
import tracebackqueue_receive_train = 'activemq-queue-data-train'
queue_receive_process = 'activemq-queue-data-process'
topic_log = '/topic/Log'
listener_name = 'SampleListener'
queue_send_data = 'activemq-queue-result'
queue_process_result = 'activemq-process-result'
queue_send_fig = 'activemq-queue-result_fig_bak'
post = 61613
url = '192.168.1.151'#消息回调函数,有消息传入的时候调用on_message函数
class SampleListener(object):def on_message(self, headers, message):data_new = json.loads(message)# # print('headers: %s' % headers)print(">>>>>>>>>>>>>>>>>>>>>>>>>>>listener1")print('message: %s' % data_new)class HaddleListener(object):def __init__(self):self.flag = Falseself.data = {}self.header = ''#将不同signal_type的最新数据存入data字典中def on_message(self, headers, message):# return messageself.flag = Truedata = json.loads(message)signal_type = ''.join(list(data.keys()))self.data[signal_type] = dataself.header = headersprint(data)class ActiveMqManager(object):def __init__(self, url, post):self.url = urlself.port = post#建立连接self.con = stomp.Connection10([(self.url, self.port)])self.con.connect(username='admin', passcode='admin', wait=True)# 推送到队列queuedef send_to_queue(self, msg, queue_name):con = stomp.Connection10([(self.url, self.port)])if not isinstance(msg, bytes):msg_json = json.dumps(msg)# 二进制流直接传输?else:msg_json = msgcon.connect(username='admin', passcode='admin', wait=True)con.send(destination=queue_name, body=msg_json, content_type='text/plain')con.disconnect()# 推送到主题def send_to_topic(self, msg, topic_name):con = stomp.Connection10([(self.url, self.port)])con.connect()con.send(topic_name, msg)con.disconnect()##从队列接收消息def receive_from_queue(self, queue_receive, listener_name, listener):con = stomp.Connection10([(self.url, self.port)])con.set_listener(listener_name, listener)con.connect(username='admin', passcode='admin', wait=True)con.subscribe(queue_receive, headers={"listener_name": listener_name})# con.subscribe(queue_receive, headers={"listener_name": listener_name}, ack='client')##从主题接收消息def receive_from_topic(self, topic_name):con = stomp.Connection10([(self.url, self.port)])con.set_listener(listener_name, SampleListener())con.connect()con.subscribe(topic_name)while 1:self.send_to_topic('topic')time.sleep(3)  # secscon.disconnect()if __name__ == '__main__':activeMqManager = ActiveMqManager(url, post)train_data_listener = HaddleListener()process_data_listener = HaddleListener()# 只要主线程不结束,这个监听就一直都在,也不能 disconnecion# 注册两个监听器,分别监听queue_receive_train和queue_receive_process队列activeMqManager.receive_from_queue(queue_receive_train, listener_name="TrainListener", listener=train_data_listener)activeMqManager.receive_from_queue(queue_receive_process, listener_name="ProcessListener", listener=process_data_listener)print("主线程阻塞")while True:##对于训练数据的队列,我们需要使用的是队列中最新的一条数据,所以通过这种机制训练最新的一条数据""">>>>>>>>>>>>>>>>>>通过这种控制机制,让每次程序启动的时候只训练最新的数据<<<<<<<<<<<<<<<<<<<<<<"""try:if train_data_listener.flag == True:time.sleep(8)  # secs# 训练 (只处理最新的数据)if(train_data_listener.flag == True and train_data_listener.data != {}):train_data_listener.flag = False# data = train_data_listener.data# print('message: %s' % train_data_listener.data)print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ">>>>>>>>>>>>>>>>>>>>>>处理")for key, value in train_data_listener.data.items():# 将不同的signal_type的最新的数据保存下来signal_type = ''.join(list(value.keys()))# 处理消息dataProcessing = DataProcessing(type=signal_type, train_data_dic=value)# dataProcessing = DataProcessing(type=signal_type, train_data_dic=data, model="FINCH")result_map = dataProcessing.structSignalData()#python数据处理的结果回传给java的业务端activeMqManager.send_to_queue(result_map, queue_send_data)print("发送成功")# 通过话题来传输日志activeMqManager.send_to_topic("发送成功", topic_log) # 消费完了之后,将data的数据给清空train_data_listener.data = {}# 推断:来一条处理一条# if ( process_data_listener.data is not None):if (process_data_listener.flag == True and process_data_listener.data != {}):process_data_listener.flag = False# data = process_data_listener.dataprint(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ">>>>>>>>>>>>>>>>>>>>>>测试")for key, value in process_data_listener.data.items():# 通过订阅话题来实时传输日志activeMqManager.send_to_topic(">>>>>>>>>>>>>>>>>>>>>>测试", topic_log)signal_type = ''.join(list(value.keys()))if signal_type:result_map = dataProcessing.process(value)activeMqManager.send_to_topic("识别完成", topic_log)activeMqManager.send_to_queue(result_map, queue_process_result)print("识别结果发送成功")activeMqManager.send_to_topic("识别结果发送成功", topic_log)# 消费完了之后清空process_data_listener.data = {}  except (Exception) as error:print(traceback.print_exc())print("失败代码:"+ str(error))

4.传输的消息的格式

  • java生产者生产消息的格式如下:
{"LD": {"LD":["PL":300,"JD":30,"WD":30]},"SX": {"SX":["PL":345,"JD":80,"WD":65]}
}

数据以JSON字符串的形式进行传输

  • Java端接收数据并解析数据的方式

    消费者的代码

    接收的数据格式为BytesMessage

        @JmsListener(destination = "activemq-queue-result")@Transactionalpublic void recieve(BytesMessage bytesMessage) throws JMSException {byte[] bys = null;//解析二进制数据为字符串数据bys = new byte[(int) bytesMessage.getBodyLength()];bytesMessage.readBytes(bys);Gson gson = new Gson();//将数据转换为指定格式的类,Map.class或者某个结构体Map map = gson.fromJson(new String(bys),Map.class);System.out.println("***消费者收到消息***"+map.toString());System.out.println("***消费者处理消息结果***------>>" + result);}
    

    生产者业务代码

    业务实现,通过yaml的配置,来找到需要加载的表名和字段,根据约定的数据格式,转换为json发送给MQ消息队列

    工具:Yaml2Gson,用来将yaml文件转换为gson格式()、Gson

    public void send2Mq(String queryTable) {Map featureMap = new HashMap();Map configMap = (Map) Yaml2Gson.yamlToMap(zcYml).get(queryTable);
    //        System.out.println(configMap.keySet().toString());featureMap.put("config",configMap );StringBuilder conditionBuilder = new StringBuilder();String features = configMap.keySet().toString();StringBuilder columnsBuilder = new StringBuilder();//拼凑出各个列的查询//查询的特征项为空的,依据其是否为连续值为其设定初始值None or 0for(String feature: features.substring(1,features.length()-1).split(", ")){Map featsMap = (Map)configMap.get(feature);if((Integer)featsMap.get("isContinuous") == 0) columnsBuilder.append("nvl(to_char("+feature+"),'None') "+feature);else columnsBuilder.append("nvl("+feature+",0) "+feature);conditionBuilder.append(feature+" is not null or ");columnsBuilder.append(", ");}String columns = columnsBuilder.substring(0,columnsBuilder.length()-2);String condition = conditionBuilder.substring(0,conditionBuilder.length()-3);List mess = combineQueryMapper.getFeatures(columns,queryTable,condition);//featureMap.put("data",mess);Map resMap = new HashMap();resMap.put(queryTable,featureMap);//数据格式如下produceQueue.produceTrainMsg(new Gson().toJson(resMap));}
    

5.python解析数据的方式

如果是二进制的数据流就直接传输,如果是其他形式的数据,通过json.dumps将数据转为json字符串进行传输,并在java中进行解析

if not isinstance(msg, bytes):msg_json = json.dumps(msg)# 二进制流直接传输?
else:msg_json = msg
con.connect(username='admin', passcode='admin', wait=True)
##通过这个函数发送数据的时候,是自动会转换为二进制数据流的(转换为二进制数据流的json格式的字符串)
con.send(destination=queue_name, body=msg_json, content_type='text/plain')

6.总结

在不同架构的代码之间传输传递不同结构的结构体的数据,使用JSON字符串能够最大程度的保留数据的原本形式。配合使用消息中间键通过数据流就能够很好的将两种不同的语言进行组合使用,发挥他们各自的优势,并且规定好特定的数据格式之后,还能够很容易的将两块的功能进行松耦合并且达到不同模块复用的目的。同时在使用的过程中一定要注意处理好数据的类型和数据为空的情况(根据实际的数据处理业务来将为空的数据填写入合适的值),无论是python还是java,因为null类型一般都是使用object这种大的类来向下兼容的,传递过来的数字都被转换为object类型的数据。最后第一次完全原创的写一篇自己的开发总结,希望大家能够留言指正以及私信讨论!

java与python之间的混合开发相关推荐

  1. c语言 python java_C语言、C++、Java,Python之间的区别,哪个更有前景,哪个更难 ?...

    从这四种语言的难度.受欢迎度还有作用以及优点缺点给楼主做一个全面的分析,我们可以从中了解其区别,以及难易程度.至于今后可以用到的或者是前景问题,根据自己的职业发展大家可以自己做个分析.最后会总结一下他 ...

  2. go语言和python结合_Go+Python双语言混合开发-第1章 【阶段一:Go语言基础】

    1-1 Go+Python双语言混合开发-课程导学 (11:35) 1-2 课程资源获取方式 - 非常重要!!! (06:07) 1-3 课程中需要用到的开发环境介绍 (03:47) 1-4 wind ...

  3. python跟java 效率_对比平台--Java与Python之间的性能差异

    ava Performance和Python之间的主要区别 以下是描述Java Performance和Python之间的区别的要点列表: 以下是Java性能与Python之间的主要区别,我们在决定应 ...

  4. thrift java长连接_利用thrift在c++、java和python之间相互调用

    转自:http://blog.csdn.net/andy_yf/article/details/7487384 thrift做为跨语言调用的方案有高效,支持语言较多,成熟等优点:代码侵入较强是其弱点. ...

  5. java python混合开发_Go+Python双语言混合开发

    装饰器的实质是什么?或者说为什么装饰器要写2层嵌套函数,里层函数完全就已经实现了装饰的功能为什么不直接用里层函数名作为装饰器名称? 答:装饰器是要把原来的函数装饰成新的函数,并且返回这个函数本身的高阶 ...

  6. python语言开发平台_Go+Python双语言混合开发

    装饰器的实质是什么?或者说为什么装饰器要写2层嵌套函数,里层函数完全就已经实现了装饰的功能为什么不直接用里层函数名作为装饰器名称? 答:装饰器是要把原来的函数装饰成新的函数,并且返回这个函数本身的高阶 ...

  7. go和python组合开发_Go+Python双语言混合开发

    作者:V-lsttwo https://www.bilibili.com/read/cv8189097 出处: bilibili 一.我们为什么选择Go语言 选择Go语言的原因可能会有很多,关于Go语 ...

  8. pythongo混合_Go+Python双语言混合开发

    装饰器的实质是什么?或者说为什么装饰器要写2层嵌套函数,里层函数完全就已经实现了装饰的功能为什么不直接用里层函数名作为装饰器名称? 答:装饰器是要把原来的函数装饰成新的函数,并且返回这个函数本身的高阶 ...

  9. Java和Python,哪个更适合开发AI人工智能?

    当今的机器世界需要能够改变我们生活.工作和娱乐方式的技术.考虑到智能系统和具有行为算法.智能搜索和能够自行学习的智能系统的需求,约翰麦卡锡将人工智能引入了我们的世界,并被亲切地称为人工智能. 这些 A ...

最新文章

  1. 博弈入门学习的博客[资源汇总]
  2. 新手求助:大神们帮帮我,关于在ViewPage中添加GridView的问题
  3. 深入SecureFile—新一代LOB揭秘000
  4. 我的编码规范(慢慢补充)
  5. 010_数字内建函数
  6. Android5.1修改以太网MAC地址(SElinux)
  7. Java springcloud B2B2C o2o多用户商城 springcloud架构 (十七)上传文件
  8. 在IIS6上部署WebService
  9. 网页游戏 服务器 性能测试工具,简单的压力测试工具
  10. HD 1003 Max Sum(贪心)
  11. OSI七层模型非专业简介
  12. Linux(树莓派)安装 python-opencv
  13. 使用网络存储SAN和NAS
  14. 面向在线教育业务的流媒体分发演进
  15. 面试题之--实现取余
  16. 有长度要求的区间最大值
  17. HenCoder Android 开发进阶: 自定义 View 1-2 Paint 详解
  18. Ubuntu 搭建SVN服务器(SVN Server)
  19. 寻找百度图片搜索接口历程--one
  20. 斯坦福大学区块链期末考试题

热门文章

  1. arduino串口显示屏
  2. Excel如何把身份证号码升级到18位
  3. 在树莓派上搭建PHP家庭相册管理程序
  4. php 只允许用户单线登陆,电信宽带IPTV只用路由简单设置单线复用和R6300v2使用经验分享...
  5. 2021HW — 360天擎漏洞
  6. 17.unity粒子特效--Renderer模块、灯光模块、粒子系统组合、二级发射器、粒子间碰撞、粒子拖尾
  7. 清华大学计算机学院张院士,清华大学张钹院士访问计算机学院
  8. 【移动开发】ido手机阅读器
  9. 备战马拉松旺季,六大注意事项,干货满满!
  10. 事务的回滚和不提交的区别