文章目录

  • Linux系统下海康工业相机MVS二次开发-Python

Linux系统下海康工业相机MVS二次开发-Python

环境:树莓派 Ubuntu系统
编程环境:Python3.7 Node(忘了版本了,都可以,最好稳定版本)
需要安装的模块:Python端:cv2 websockets fastapi等;Node端:主要是ws(用来传输视频流)
安装可以看网上的文章,很多有写,不过树莓派这里避坑:树莓派进入是pi用户,需要sudo su才能切换为root用户,安装Python模块时如果在pi用户下安装,用root用户运行是是没有的,会报no module name... 错误。另外,安装cv2模块时似乎不能用pip安装,需要用它自带的apt-get安装,不过不知道是不是只有我这里才是这样的。
以前上课做过树莓派的实验,很简单,但是真正实践时还是遇到了许多问题和困难,做了接近一周,因为树莓派只有一个网口得和相机换来换去,记录一下工程吧。
先整体介绍一下功能,主要是前端写了一个页面调用python后端相机传送的视频帧,并实现点击按钮拍照功能。
首先,需要在系统安装MVS,这里引用一下其他博主写的文章,可参考:https://blog.csdn.net/cugyzy/article/details/120974745
以下是我写的工程目录:

写代码时参考了许多博主以及MVS官方安装完成后在/opt/MVS/Samples/armhf/Python/GarbImage下的代码以及官方的对于相机的调用方法文档。
先将各个方法封装在类HKCamera中,如下:

import sys
from ctypes import *
import os
import numpy as np
import time
import cv2
import random
import copysys.path.append("/opt/MVS/Samples/armhf/Python/MvImport") #打开MVS中的MvImport文件,对于不同系统打开的文件路径跟随实际文件路径变化即可
from MvCameraControl_class import * #调用了MvCameraControl_class.py文件class HKCamera():def __init__(self, CameraIdx=0, log_path=None):# enumerate all the camera devicesdeviceList = self.enum_devices()# generate a camera instanceself.camera = self.open_camera(deviceList, CameraIdx, log_path)self.start_camera()def __del__(self):if self.camera is None:return# 停止取流ret = self.camera.MV_CC_StopGrabbing()if ret != 0:raise Exception("stop grabbing fail! ret[0x%x]" % ret)# 关闭设备ret = self.camera.MV_CC_CloseDevice()if ret != 0:raise Exception("close deivce fail! ret[0x%x]" % ret)# 销毁句柄ret = self.camera.MV_CC_DestroyHandle()if ret != 0:raise Exception("destroy handle fail! ret[0x%x]" % ret)@staticmethoddef enum_devices(device=0, device_way=False):"""device = 0  枚举网口、USB口、未知设备、cameralink 设备device = 1 枚举GenTL设备"""if device_way == False:if device == 0:cameraType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_UNKNOW_DEVICE | MV_1394_DEVICE | MV_CAMERALINK_DEVICEdeviceList = MV_CC_DEVICE_INFO_LIST()# 枚举设备ret = MvCamera.MV_CC_EnumDevices(cameraType, deviceList)if ret != 0:raise Exception("enum devices fail! ret[0x%x]" % ret)return deviceListelse:passelif device_way == True:passdef open_camera(self, deviceList, CameraIdx, log_path):# generate a camera instancecamera = MvCamera()# 选择设备并创建句柄stDeviceList = cast(deviceList.pDeviceInfo[CameraIdx], POINTER(MV_CC_DEVICE_INFO)).contentsif log_path is not None:ret = self.camera.MV_CC_SetSDKLogPath(log_path)if ret != 0:raise Exception("set Log path  fail! ret[0x%x]" % ret)# 创建句柄,生成日志ret = camera.MV_CC_CreateHandle(stDeviceList)if ret != 0:raise Exception("create handle fail! ret[0x%x]" % ret)else:# 创建句柄,不生成日志ret = camera.MV_CC_CreateHandleWithoutLog(stDeviceList)if ret != 0:raise Exception("create handle fail! ret[0x%x]" % ret)# 打开相机ret = camera.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)if ret != 0:raise Exception("open device fail! ret[0x%x]" % ret)return cameradef start_camera(self):stParam = MVCC_INTVALUE()memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))ret = self.camera.MV_CC_GetIntValue("PayloadSize", stParam)if ret != 0:raise Exception("get payload size fail! ret[0x%x]" % ret)self.nDataSize = stParam.nCurValueself.pData = (c_ubyte * self.nDataSize)()self.stFrameInfo = MV_FRAME_OUT_INFO_EX()memset(byref(self.stFrameInfo), 0, sizeof(self.stFrameInfo))self.camera.MV_CC_StartGrabbing()def get_Value(self, param_type, node_name):""":param cam:            相机实例:param_type:           获取节点值得类型:param node_name:      节点名 可选 int 、float 、enum 、bool 、string 型节点:return:               节点值"""if param_type == "int_value":stParam = MVCC_INTVALUE_EX()memset(byref(stParam), 0, sizeof(MVCC_INTVALUE_EX))ret = self.camera.MV_CC_GetIntValueEx(node_name, stParam)if ret != 0:raise Exception("获取 int 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))return stParam.nCurValueelif param_type == "float_value":stFloatValue = MVCC_FLOATVALUE()memset(byref(stFloatValue), 0, sizeof(MVCC_FLOATVALUE))ret = self.camera.MV_CC_GetFloatValue(node_name, stFloatValue)if ret != 0:raise Exception("获取 float 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))return stFloatValue.fCurValueelif param_type == "enum_value":stEnumValue = MVCC_ENUMVALUE()memset(byref(stEnumValue), 0, sizeof(MVCC_ENUMVALUE))ret = self.camera.MV_CC_GetEnumValue(node_name, stEnumValue)if ret != 0:raise Exception("获取 enum 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))return stEnumValue.nCurValueelif param_type == "bool_value":stBool = c_bool(False)ret = self.camera.MV_CC_GetBoolValue(node_name, stBool)if ret != 0:raise Exception("获取 bool 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))return stBool.valueelif param_type == "string_value":stStringValue = MVCC_STRINGVALUE()memset(byref(stStringValue), 0, sizeof(MVCC_STRINGVALUE))ret = self.camera.MV_CC_GetStringValue(node_name, stStringValue)if ret != 0:raise Exception("获取 string 型数据 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))return stStringValue.chCurValueelse:return Nonedef set_Value(self, param_type, node_name, node_value):""":param cam:               相机实例:param param_type:        需要设置的节点值得类型int:float:enum:     参考于客户端中该选项的 Enum Entry Value 值即可bool:     对应 0 为关,1 为开string:   输入值为数字或者英文字符,不能为汉字:param node_name:         需要设置的节点名:param node_value:        设置给节点的值:return:"""if param_type == "int_value":ret = self.camera.MV_CC_SetIntValueEx(node_name, int(node_value))if ret != 0:raise Exception("设置 int 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))elif param_type == "float_value":ret = self.camera.MV_CC_SetFloatValue(node_name, float(node_value))if ret != 0:raise Exception("设置 float 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))elif param_type == "enum_value":ret = self.camera.MV_CC_SetEnumValue(node_name, node_value)if ret != 0:raise Exception("设置 enum 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))elif param_type == "bool_value":ret = self.camera.MV_CC_SetBoolValue(node_name, node_value)if ret != 0:raise Exception("设置 bool 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))elif param_type == "string_value":ret = self.camera.MV_CC_SetStringValue(node_name, str(node_value))if ret != 0:raise Exception("设置 string 型数据节点 %s 失败 ! 报错码 ret[0x%x]" % (node_name, ret))def set_exposure_time(self, exp_time):self.set_Value(param_type="float_value", node_name="ExposureTime", node_value=exp_time)def get_exposure_time(self):return self.get_Value(param_type="float_value", node_name="ExposureTime")def get_image(self, width=None):""":param cam:     相机实例:active_way:主动取流方式的不同方法 分别是(getImagebuffer)(getoneframetimeout):return:"""ret = self.camera.MV_CC_GetOneFrameTimeout(self.pData, self.nDataSize, self.stFrameInfo, 1000)if ret == 0:#image = np.asarray(self.pData).reshape((self.stFrameInfo.nHeight, self.stFrameInfo.nWidth))image = np.asarray(self.pData)if width is not None:image = cv2.resize(image, (width, int(self.stFrameInfo.nHeight * width / self.stFrameInfo.nWidth)))num = random.randint(1,10)cv2.putText(image, str(num), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)passreturn imageelse:return None#实时展示def show_runtime_info(self, image):# exp_time = self.get_exposure_time()#cv2.putText(image, ("exposure time = %1.1fms" % (exp_time * 0.001)), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)num = random.randint(1,10)cv2.putText(image, str(num), (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)#拍照存储照片def take_picture(self,image):#print("take picture!")path = "/home/pi/Downloads"if not os.path.exists(path):os.mkdir(path)random_str = ""base_str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"length = len(base_str)-1for i in range(8):random_str += base_str[random.randint(0,length)]file_path = path+"/"+random_str+".jpg"cv2.imwrite(file_path,image)return file_path
"""
bmpsize = self.stFrameInfo.nWidth * self.stFrameInfo.nHeight * 3 + 54
bmp_buf = (c_ubyte * bmpsize)()
path = "/home/pi/Downloads"
if not os.path.exists(path):os.mkdir(path)
random_str = ""
base_str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
length = len(base_str)-1
for i in range(8):random_str += base_str[random.randint(0,length)]
file_path = path+"/"+random_str+".bmp"
print(file_path)
file_open = open(file_path.encode('ascii'), 'wb+')
try:img_buff = copy.deepcopy(bmp_buf)file_open.write(img_buff, )res = True
except:raise Exception("save file executed failed")
finally:file_open.close()
"""

再写一个Server做为后端服务,并实时传送视频帧:

# websocket
import sys
import cv2
import numpy as np
import asyncio
from threading import Thread
import websockets
import base64sys.path.append("/home/pi/Desktop/HKtest/HKProject/modules") #打开MVS中的MvImport文件,对于不同系统打开的文件路径跟随实际文件路径变化即可
from HKCamera import *from fastapi import FastAPI
import uvicorn
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()
#设置允许访问的域名
origins = ["*"]  #也可以设置为"*",即为所有。
#设置跨域传参
app.add_middleware(CORSMiddleware,allow_origins=origins,  #设置允许的origins来源00allow_credentials=True,allow_methods=["*"],  # 设置允许跨域的http方法,比如 get、post、put等。allow_headers=["*"])  #允许跨域的headers,可以用来鉴别来源等作用。#isTakePicture = False
#isDownTake = Falseencode_param=[int(cv2.IMWRITE_JPEG_QUALITY),95]
camera = HKCamera()# 向服务器端实时发送视频帧
async def send_msg(websocket):#global isTakePicture,isDownTaketry:while True:image = camera.get_image(width=800)if image is not None:# camera.show_runtime_info(image)# cv2.imshow("", image)# 图像编码result, imgencode = cv2.imencode('.jpg', image, encode_param)data = np.array(imgencode)img = data.tostring()# base64编码传输img = base64.b64encode(img).decode()await websocket.send("data:image/jpeg;base64," + img)except Exception as e:print(e)
# 客户端主逻辑
async def main_logic():async with websockets.connect('ws://127.0.0.1:3000') as websocket:await send_msg(websocket)#loop = asyncio.get_event_loop()
#result = loop.run_until_complete(main_logic())def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_until_complete(main_logic())#=================next is web server========================
@app.get("/server")
def takePicture():image = camera.get_image(width=800)if image is not None:file_path = camera.take_picture(image)msg = ""if file_path != None:msg = "Picture has been saved!"else:msg = "saved fail!"return {"msg":msg,"file_path":file_path}    if __name__ == '__main__':new_loop = asyncio.new_event_loop()t = Thread(target=start_thread_loop,args=(new_loop,))t.start()uvicorn.run(app, host="127.0.0.1", port=8000)

写这部分代码时还是遇到了许多问题,因为要实时传输视频帧图片,所以又要同时监听前端发送的请求是不能同时进行的,因此需要用到多线程的知识,将视频帧图片传送作为子线程,后端服务作为主线程,因此可以同时进行且互不干扰。这里学到了一个新东西–协程,Python有一个包是asyncio,里面可以进行多协程的开发,这里也是遇到问题后通过这个思想受到启发后才解决了问题,后面可以再多研究研究。

前端部分:
首先是node服务,需要开启websocket服务并在收到视频帧后将内容发送到各个客户端:

let WebSocketServer = require('ws').Server,wss = new WebSocketServer({ port: 3000 });wss.on('connection', function (ws) {console.log('客户端已连接');
//  console.log(ws);
//     ws.on('message', function (message) {//         wss.clients.forEach(function each(client) {//      let data = [{msgtest:"sfsdfuisdj=====",message:message}];
//      let data = [{msgtest:"sfsdfuisdj====="}];
//             client.send(data);
//      console.log(data);
//         });
//         console.log(message.length);
//      console.log(message);
//    });ws.onmessage = function(evt){wss.clients.forEach(function each(client){client.send(evt.data);     });console.log(evt.data.length);}
});

之前看了一些博客是用的上面注释的写法,但我在运行时却显示不出图片,因为页面接收到的是二进制文件,读不出来,改为下面这种写法就好了,直接传送的是base64。

页面代码:

<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><title>Title</title>
</head><body><div><img id="resImg" src="" /></div><div id="test"><button id="take">点击拍照</button></div><script src="jquery.min.js"></script><script>let ws = new WebSocket("ws://127.0.0.1:3000/");var image = document.getElementById("resImg");//var divtest = document.getElementById("test");ws.onopen = function (evt) {console.log("Connection open ...");ws.send("Hello WebSockets!");};ws.onmessage = function (evt) {//console.log(evt)//console.log("==="+evt.data);//console.log("==="+JSON.stringify(evt.data));//divtest.innerHTML=evt;image.setAttribute("src", evt.data);//$("#resImg").attr("src", evt.data);// console.log("Received Message: " + JSON.stringify(evt.data));// ws.close();};ws.onclose = function (evt) {console.log("Connection closed.");};let btn = document.getElementById('take');btn.onclick = function () {//创建对象const xhr = new XMLHttpRequest()//设置请求方法和urlxhr.open('GET', 'http://localhost:8000/server')//发送xhr.send()//事件绑定 处理服务端返回的结果xhr.onreadystatechange = function () {if (xhr.readyState === 4) { //服务端返回了所有结果if (xhr.status >= 200 && xhr.readyState <= 300) {  //2开头的都表示成功//处理结果console.log(xhr.status) //状态码console.log(xhr.statusText) //状态字符串console.log(xhr.getAllResponseHeaders) //所有响应头console.log(xhr.response) //响应体alert(xhr.response)}}}}</script>
</body></html>

启动node服务命令:node websocket.js
运行python文件命令:python HKCameraServer.py

【参考】https://blog.csdn.net/weixin_42613125/article/details/121089120
【参考】https://blog.csdn.net/qq_39570716/article/details/114066097?spm=1001.2014.3001.5501
【参考】https://www.jianshu.com/p/e3933a56285f
【参考】https://blog.csdn.net/qq_23107577/article/details/113984935?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-4-113984935-blog-123371414.pc_relevant_antiscanv3&spm=1001.2101.3001.4242.3&utm_relevant_index=7
【参考】https://blog.csdn.net/zhuzheqing/article/details/109819702
【参考】https://blog.csdn.net/qq_36917144/article/details/117292871?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-2-117292871-blog-109819702.pc_relevant_downloadblacklistv1&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-2-117292871-blog-109819702.pc_relevant_downloadblacklistv1&utm_relevant_index=4

Linux系统下海康工业相机MVS二次开发-Python相关推荐

  1. Linux系统下海康机器人MVS安装

    一.安装 在海康机器人官网,服务支持-软件下载中间中,下载相应的客户端安装包. 安装包里有各个系统及arm开发板使用的安装包,选择相应的安装包进行安装,要是不知道自己需要安装哪个安装包,就敲uname ...

  2. 海康工业相机LabVIEW二次开发——修改参数、存图

    最近使用LabVIEW对海康机器人的工业相机进行二次开发,没有专门介绍LabVIEW的开发手册,本文就简单的写一写单相机取图显示以及存图的开发步骤. 如果各位是直接使用IMAQdx开发的,可以直接下载 ...

  3. 海康威视工业相机SDK二次开发

    海康威视工业相机SDK二次开发 好气,第一次写文章,结果没不小心保存关掉,什么都没了. 本人是一名在读研究生,被导师分配了做项目中海康工业相机的二次开发.实现的需求是:实现八个相机同时打开视频,并且分 ...

  4. win下海康工业相机使用python读取视频并转换成cv格式

    硬件设备:海康威视工业相机CA013-A0UC USB3 环境:win10,python3.7,海康MVS 海康工业相机环境配置(MVS) 配置好环境后可以运行一下MVS和MVS\Developmen ...

  5. 海康威视工业相机SDK二次开发(VS+Opencv+QT+海康SDK+C++)(一)

    最近在做一个项目,涉及到工业相机,需要对其进行二次开发.相机方面选择了海康威视,网上关于海康威视工业相机SDK的开发资料很少,官方文档里面虽然写的是支持C++开发的,但其实是C.自己也摸索了一段时间, ...

  6. Linux系统安全与应用(二)——安全机制、安全控制、弱口令检测JR、网络扫描NMAP和控制台命令Netstat

    Linux系统安全与应用(二)--安全机制.安全控制.弱口令检测JR.网络扫描NMAP和控制台命令Netstat 一.使用sudo机制提升权限 1.su命令的缺点 2.sudo的用途和用法 3.配置s ...

  7. 海康摄像头的二次开发(java)

    海康摄像头的二次开发(java) 我第一次接触海康摄像头的二次开发的项目,一开始的时候摸不清套路,走了不少弯路,现在准备把我的一些经验留下来,让大家参考一下. 1.首先到海康的官网下载设备网络SDK: ...

  8. 超强在线考试系统源码(私有部署二次开发)

    随着信息化技术的发展,考试系统也在进行着深入的变革.从传统的纸质考试人工评分到现在的在线考试自动评分. 在线考试系统的应用场景也在逐渐扩宽,例如:学校的学生考试.员工培训考试.招聘考试.职称考试等等. ...

  9. 呼叫中心系统的基本构成和二次开发思路

    呼叫中心系统的基本构成和二次开发思路 目前呼叫中心大多基于IP软件换系统开发,常见的底层有 freeswitch,asterisk,freepbx等,一提到底层开源.有些用户觉得只要是开源就能掌控,就 ...

最新文章

  1. python使用matplotlib可视化、使用英文单次或者缩写指定使用的颜色、使用16进制的RGB字符串指定颜色、使用RGB或者RGBA数字元组指定颜色
  2. 敏捷开发:如何通过回顾保持学习状态
  3. linux(CentOS)磁盘挂载数据盘
  4. 大厂疯传!Python+商业数据分析+数据可视化教程(附项目案例)
  5. 【IT资讯】TIOBE - 2020年6月编程语言排行
  6. 堆栈溢出从入门到提高
  7. ELK学习笔记之Elasticsearch启动常见错误
  8. 博文视点Open Party ——漏洞分析
  9. c#服务器上的文件怎么打印机,如何通过使用C#窗口服务通过打印机打印数据打印文本文件...
  10. pycharm 安装JPype
  11. 批量给多个 Excel 工作簿文件添加文字水印或图片水印
  12. 中国第一代技术网红,在阿里被当神一样崇拜:我不跟人拼智商,我就跟他们拼狠!...
  13. 如何关闭445端口 两种方式教你关闭445端口
  14. movs 数据传送指令_数据传送指令之:MOV指令
  15. python 自动化发送邮件_Python自动化必备发送邮件报告脚本详解
  16. 国外开源电子商务平台
  17. QCC3040---Message Broker module
  18. G.7xx 音频压缩标准
  19. 计算机一级考试ps知识点,计算机一级考试PS备考训练题及答案
  20. android 换行符 编码_android中的常见的占位符及转义字符

热门文章

  1. 深入浅出多线程编程实战(五)ThreadLocal详解(介绍、使用、原理、应用场景)
  2. 基于Bootstrap的后台管理界面
  3. 各厂内推整理 | 第二期
  4. AE基础教程第一阶段——15质量图标和效果开关
  5. 关于代码动态修改xib内控件尺寸
  6. 二项分布最大值,泊松分布的推导,几何分布的推导 (概统2.证明)
  7. 微信新出置顶公众号功能,优质自媒体的春天!
  8. 那些酷炫的AI前沿应用 |〖医疗〗听声音辨新冠感染;〖农业〗除草机器人;〖救援〗无人机海上搜救;〖运动〗拳击比赛裁判;〖环保〗从废品中分拣可回收材料;〖招聘〗TaTiO招聘平台…
  9. 用计算机模拟股票大盘,股票模拟盘操作与实盘不同之处有哪些
  10. 有哪些改图方便的软件?6款P图软件来看看