原生的不存在velocity赋值的功能,现在补充上,补充之后就可以看到joint_states的velocity话题赋值了

#!/usr/bin/env python
#
# Software License Agreement (BSD License)
#
# Copyright (c) 2012-2014, Southwest Research Institute
# Copyright (c) 2014-2015, TU Delft Robotics Institute
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#  * Neither the name of the Southwest Research Institute, nor the names
#    of its contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#import rospy
import copy
import threadingimport sys
if sys.version_info.major == 2:import Queue
else:import queue as Queue# Publish
from sensor_msgs.msg import JointState
from control_msgs.msg import FollowJointTrajectoryFeedback
from industrial_msgs.msg import RobotStatus# Subscribe
from trajectory_msgs.msg import JointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint# Services
from industrial_msgs.srv import GetRobotInfo, GetRobotInfoResponse# Reference
from industrial_msgs.msg import TriState, RobotMode, ServiceReturnCode, DeviceInfo"""
MotionControllerSimulatorThis class simulates the motion controller for an industrial robot.This class IS threadsafe"""
class MotionControllerSimulator():"""Constructor of motion controller simulator"""def __init__(self, num_joints, initial_joint_state, update_rate = 100, buffer_size = 0):# Class lockself.lock = threading.Lock()# Motion loop update rate (higher update rates result in smoother simulated motion)self.update_rate = update_raterospy.loginfo("debug :Setting motion update rate (hz): %f", self.update_rate)rospy.logerr("debug :Setting motion update rate (hz): %f", self.update_rate)# Initialize joint positionself.joint_positions = initial_joint_stateself.joint_velocitys=   [0]*num_jointsrospy.loginfo("debug:Setting initial joint state: %s", str(initial_joint_state))rospy.logerr("debug:Setting initial joint state: %s", str(initial_joint_state))rospy.logerr("debug:Setting initial joint vel: %s", str(self.joint_velocitys))# Initialize motion buffer (contains joint position lists)self.motion_buffer = Queue.Queue(buffer_size)rospy.loginfo("debug:Setting motion buffer size: %i", buffer_size)rospy.logerr("debug:Setting motion buffer size: %i", buffer_size)# Shutdown signalself.sig_shutdown = False# Stop signalself.sig_stop = False# Motion threadself.motion_thread = threading.Thread(target=self._motion_worker)self.motion_thread.daemon = Trueself.motion_thread.start()""""""def add_motion_waypoint(self, point):self.motion_buffer.put(point)""""""def get_joint_positions(self):with self.lock:return self.joint_positions[:]""""""def get_joint_velocitys(self):with self.lock:return self.joint_velocitys[:]""""""def is_in_motion(self):return not self.motion_buffer.empty()""""""def shutdown(self):self.sig_shutdown = Truerospy.logdebug('Motion_Controller shutdown signaled')""""""def stop(self):rospy.logdebug('Motion_Controller stop signaled')with self.lock:self._clear_buffer()self.sig_stop = True""""""def interpolate(self, last_pt, current_pt, alpha):#rospy.logerr("debug : enter interpolate() in industrial_robot_simulator")intermediate_pt = JointTrajectoryPoint()for last_joint, current_joint in zip(last_pt.positions, current_pt.positions):intermediate_pt.positions.append(last_joint + alpha*(current_joint-last_joint))intermediate_pt.time_from_start = last_pt.time_from_start + rospy.Duration(alpha*(current_pt.time_from_start.to_sec() - last_pt.time_from_start.to_sec()))#rospy.logerr("debug : will exit interpolate() in industrial_robot_simulator")return intermediate_pt""""""def accelerate(self, last_pt, current_pt, current_time, delta_time):intermediate_pt = JointTrajectoryPoint()#rospy.logerr("debug : enter accelerate() in industrial_robot_simulator")for last_joint, current_joint, last_vel, current_vel in zip(last_pt.positions, current_pt.positions, last_pt.velocities, current_pt.velocities):delta_x = current_joint-last_jointdv = current_vel+last_vela1 = 6*delta_x/pow(delta_time,2) - 2*(dv+last_vel)/delta_timea2 = -12*delta_x/pow(delta_time,3) + 6*dv/pow(delta_time,2)current_pos = last_joint + last_vel*current_time + a1*pow(current_time,2)/2 +a2*pow(current_time,3)/6current_interp_vel = last_vel + a1*current_time  + a2*pow(current_time,2)/2 #add by ygxintermediate_pt.positions.append(current_pos)intermediate_pt.velocities.append(current_interp_vel)intermediate_pt.time_from_start = last_pt.time_from_start + rospy.Duration(current_time)#rospy.logerr("debug : will exit accelerate() in industrial_robot_simulator")return intermediate_pt""""""def _clear_buffer(self):with self.motion_buffer.mutex:self.motion_buffer.queue.clear()""""""def _move_to(self, point, dur):try:#rospy.logerr("debug :  in _move_to dur is: %f",dur)rospy.sleep(dur)with self.lock:if not self.sig_stop:self.joint_positions = point.positions[:]self.joint_velocitys = point.velocities[:]#rospy.loginfo('Moved to position: %s in %s', str(self.joint_positions), str(dur))else:rospy.logdebug('Stopping motion immediately, clearing stop signal')self.sig_stop = Falseexcept Exception as e:rospy.logerr('Unexpected exception: %s', e)""""""def _motion_worker(self):#rospy.logdebug('Starting motion worker in motion controller simulator')rospy.logerr('debug :Starting motion worker in motion controller simulator')move_duration = rospy.Duration()if self.update_rate != 0.:update_duration = rospy.Duration(1./self.update_rate)last_goal_point = JointTrajectoryPoint()with self.lock:last_goal_point.positions = self.joint_positions[:]while not self.sig_shutdown:try:current_goal_point = self.motion_buffer.get()# If the current time from start is less than the last, then it's a new trajectoryif current_goal_point.time_from_start < last_goal_point.time_from_start:move_duration = current_goal_point.time_from_start# Else it's an existing trajectory and subtract the twoelse:# If current move duration is greater than update_duration, move arm to interpolated joint position# Provide an exception to this rule: if update rate is <=0, do not add interpolated pointsmove_duration = current_goal_point.time_from_start - last_goal_point.time_from_startif self.update_rate > 0:starting_goal_point = copy.deepcopy(last_goal_point)full_duration = move_duration.to_sec()rospy.logerr("debug : full_duration is: %f",full_duration)while update_duration < move_duration:if not starting_goal_point.velocities or not current_goal_point.velocities:intermediate_goal_point = self.interpolate(last_goal_point, current_goal_point, update_duration.to_sec()/move_duration.to_sec())for last_joint, current_joint in zip(last_goal_point.positions, current_goal_point.positions):intermediate_goal_point.velocities.append((current_joint-last_joint)/move_duration)#add by ygxelse:intermediate_goal_point = self.accelerate(starting_goal_point, current_goal_point, full_duration-move_duration.to_sec()+update_duration.to_sec(), full_duration)self._move_to(intermediate_goal_point, update_duration.to_sec()) #TODO should this use min(update_duration, 0.5*move_duration) to smooth timing?last_goal_point = copy.deepcopy(intermediate_goal_point)move_duration = current_goal_point.time_from_start - intermediate_goal_point.time_from_startself._move_to(current_goal_point, move_duration)last_goal_point = copy.deepcopy(current_goal_point)except Exception as e:rospy.logerr('Unexpected exception: %s', e)rospy.logdebug("Shutting down motion controller")rospy.logerr("debug :Shutting down motion controller")"""
IndustrialRobotSimulatorThis class simulates an industrial robot controller.  The simulator
adheres to the ROS-Industrial robot driver specification:http://www.ros.org/wiki/Industrial/Industrial_Robot_Driver_SpecTODO: Currently the simulator only supports the bare minimum motion
interface.TODO: Interfaces to add:
Joint streaming
All services"""
class IndustrialRobotSimulatorNode():"""Constructor of industrial robot simulator"""def __init__(self):rospy.init_node('industrial_robot_simulator')# Class lockself.lock = threading.Lock()# Publish rate (hz)self.pub_rate = rospy.get_param('pub_rate', 10.0)rospy.logdebug("Setting publish rate (hz) based on parameter: %f", self.pub_rate)# Joint namesdef_joint_names = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]self.joint_names = rospy.get_param('controller_joint_names', def_joint_names)if len(self.joint_names) == 0:rospy.logwarn("Joint list is empty, did you set the 'controller_joint_names' parameter?")rospy.loginfo("Simulating manipulator with %d joints: %s", len(self.joint_names), ", ".join(self.joint_names))# Setup initial joint positionsnum_joints = len(self.joint_names)initial_joint_state = rospy.get_param('initial_joint_state', [0]*num_joints)same_len = len(initial_joint_state) == num_jointsall_num  = all(type(x) is int or type(x) is float for x in initial_joint_state)if not same_len or not all_num:initial_joint_state = [0]*num_jointsrospy.logwarn("Invalid initial_joint_state parameter, defaulting to all-zeros ""(len: %s, types: %s).", same_len, all_num)rospy.loginfo("Using initial joint state: %s", str(initial_joint_state))# retrieve update ratemotion_update_rate = rospy.get_param('motion_update_rate', 100.);  #set param to 0 to ignore interpolated motionself.motion_ctrl = MotionControllerSimulator(num_joints, initial_joint_state, update_rate=motion_update_rate)# Published to joint statesrospy.logdebug("Creating joint state publisher")self.joint_state_pub = rospy.Publisher('joint_states', JointState, queue_size=1)# Published to joint feedbackrospy.logdebug("Creating joint feedback publisher")self.joint_feedback_pub = rospy.Publisher('feedback_states', FollowJointTrajectoryFeedback, queue_size=1)rospy.logdebug("Creating robot status publisher")self.robot_status_pub = rospy.Publisher('robot_status', RobotStatus, queue_size=1)# Subscribe to a joint trajectoryrospy.logdebug("Creating joint trajectory subscriber")self.joint_path_sub = rospy.Subscriber('joint_path_command', JointTrajectory, self.trajectory_callback)# JointStates timed task (started automatically)period = rospy.Duration(1.0/self.pub_rate)rospy.logdebug('Setting up publish worker with period (sec): %s', str(period.to_sec()))rospy.Timer(period, self.publish_worker)# GetRobotInfo service server and pre-cooked svc responseself.get_robot_info_response = self._init_robot_info_response()self.svc_get_robot_info = rospy.Service('get_robot_info', GetRobotInfo, self.cb_svc_get_robot_info)# Clean up initrospy.on_shutdown(self.motion_ctrl.shutdown)"""Service callback for GetRobotInfo() service. Returns fake information."""def cb_svc_get_robot_info(self, req):# return cached response instancereturn self.get_robot_info_response"""The publish worker is executed at a fixed rate.  This publishes the variousstate and status information for the robot."""def publish_worker(self, event):self.joint_state_publisher()self.robot_status_publisher()"""The joint state publisher publishes the current joint state and the currentfeedback state (as these are closely related)"""def joint_state_publisher(self):try:joint_state_msg = JointState()joint_fb_msg = FollowJointTrajectoryFeedback()time = rospy.Time.now()with self.lock:#Joint statesjoint_state_msg.header.stamp = timejoint_state_msg.name = self.joint_namesjoint_state_msg.position = self.motion_ctrl.get_joint_positions()joint_state_msg.velocity = self.motion_ctrl.get_joint_velocitys() #add by ygxself.joint_state_pub.publish(joint_state_msg)#Joint feedbackjoint_fb_msg.header.stamp = timejoint_fb_msg.joint_names = self.joint_namesjoint_fb_msg.actual.positions = self.motion_ctrl.get_joint_positions()self.joint_feedback_pub.publish(joint_fb_msg)except Exception as e:rospy.logerr('Unexpected exception in joint state publisher: %s', e)"""The robot status publisher publishes the current simulated robot status.The following values are hard coded:- robot always in AUTO mode- drives always powered- motion always possible- robot never E-stopped- no error code- robot never in errorThe value of 'in_motion' is derived from the state of theMotionControllerSimulator."""def robot_status_publisher(self):try:msg = RobotStatus()msg.mode.val            = RobotMode.AUTOmsg.e_stopped.val       = TriState.FALSEmsg.drives_powered.val  = TriState.TRUEmsg.motion_possible.val = TriState.TRUEmsg.in_motion.val       = self.motion_ctrl.is_in_motion()msg.in_error.val        = TriState.FALSEmsg.error_code          = 0self.robot_status_pub.publish(msg)except Exception as e:rospy.logerr('Unexpected exception: %s', e)"""Trajectory subscription callback (gets called whenever a joint trajectoryis received).@param msg_in: joint trajectory message@type  msg_in: JointTrajectory"""def trajectory_callback(self, msg_in):try:#rospy.logdebug('Received trajectory with %s points, executing callback', str(len(msg_in.points)))#rospy.loginfo('Received trajectory with %s points, executing callback', str(len(msg_in.points)))rospy.logerr('debug : in industrial_robot_simulator, Received trajectory with %s points, executing callback', str(len(msg_in.points)))if self.motion_ctrl.is_in_motion():if len(msg_in.points) > 0:rospy.logerr('Received trajectory while still in motion, trajectory splicing not supported')else:rospy.logdebug('Received empty trajectory while still in motion, stopping current trajectory')self.motion_ctrl.stop()else:for point in msg_in.points:point = self._to_controller_order(msg_in.joint_names, point)self.motion_ctrl.add_motion_waypoint(point)except Exception as e:rospy.logerr('Unexpected exception: %s', e)rospy.logdebug('Exiting trajectory callback')rospy.logerr('debug :Exiting trajectory callback')"""Remaps point to controller joint order@param keys:   keys defining joint value order@type  keys:   list@param point:  joint trajectory point@type  point:  JointTrajectoryPoint@return point: reorder point@type point: JointTrajectoryPoint"""def _to_controller_order(self, keys, point):#rospy.loginfo('to controller order, keys: %s, point: %s', str(keys), str(point))pt_rtn = copy.deepcopy(point)pt_rtn.positions = self._remap_order(self.joint_names, keys, point.positions)if point.velocities:pt_rtn.velocities = self._remap_order(self.joint_names, keys, point.velocities)return pt_rtndef _remap_order(self, ordered_keys, value_keys, values):#rospy.loginfo('remap order, ordered_keys: %s, value_keys: %s, values: %s', str(ordered_keys), str(value_keys), str(values))ordered_values = []ordered_values = [0]*len(ordered_keys)mapping = dict(zip(value_keys, values))#rospy.loginfo('maping: %s', str(mapping))for i in range(len(ordered_keys)):ordered_values[i] = mapping[ordered_keys[i]]passreturn ordered_values"""Constructs a GetRobotInfoResponse instance with either default data,or data provided by the user."""def _init_robot_info_response(self):if not rospy.has_param('~robot_info'):# if user did not provide data, we generate someimport rospkgrp = rospkg.RosPack()irs_version = rp.get_manifest('industrial_robot_simulator').versionrobot_info = dict(controller=dict(model='Industrial Robot Simulator Controller',serial_number='0123456789',sw_version=irs_version),robots=[dict(model='Industrial Robot Simulator Manipulator',serial_number='9876543210',sw_version=irs_version)])else:# otherwise use only the data user has provided (and nothing more)robot_info = rospy.get_param('~robot_info')resp = GetRobotInfoResponse()resp.controller = DeviceInfo(**robot_info['controller'])# add info on controlled robot / motion groupif len(robot_info['robots']) > 0:robot = robot_info['robots'][0]resp.robots.append(DeviceInfo(**robot))if len(robot_info['robots']) > 1:# simulator simulates a single robot / motion grouprospy.logwarn("Multiple robots / motion groups defined in ""'robot_info' parameter, ignoring all but first element")# always successfullresp.code.val = ServiceReturnCode.SUCCESSreturn respif __name__ == '__main__':try:rospy.loginfo('Starting joint_controller_simulator')controller = IndustrialRobotSimulatorNode()rospy.spin()except rospy.ROSInterruptException:pass

改造industrial_robot_simulator给joint_states的velocity赋值,便于测试轨迹规划的速度平滑性相关推荐

  1. memcpy()内存拷贝和赋值操作效率测试

    比较memcpy()内存拷贝和"="赋值操作效率,测试代码如下 #include <stdio.h> #include <malloc.h> #includ ...

  2. std::list 修改某个值_forOMG非英雄篇——如何修改数据以便于测试

    本文旨在告诉大家如何修改英雄的数值以方便进行测试. 本系列依次为 莫无煜:forOMG英雄篇--如何修改数据以便于测试​zhuanlan.zhihu.com <非英雄篇>.<技能篇& ...

  3. 快速生成apk 自动发布到网站 便于测试

    遇到的问题: 开发者生成的apk 需要不断给 测试安装让他们测试.有没有脚本自动将最新apk上传到服务器,让测试自己安装测试呢?mac电脑 怎么自己搭建文件服务器  启动Tomcat功能在这里不在赘述 ...

  4. 记录常用的xss攻击代码,便于测试系统安全漏洞

    XSS(跨站脚本攻击) - 常用代码大全: 1'"()&%<acx><ScRiPt >prompt(915149)</ScRiPt><svg ...

  5. dota修改服务器,forOMG非英雄篇 如何修改数据以便于测试

    本系列依次为<英雄篇>.<非英雄篇>.<技能篇>.<物品篇>. 可以被修改的数据可以被分为两类:很容易修改的(英雄.非英雄.技能)和比较复杂的(物品). ...

  6. 盘点大厂的那些开源项目 - 华为

    HarmonyOS 鸿蒙系统(HarmonyOS),是第一款基于微内核的全场景分布式OS,是华为自主研发的操作系统.华为会率先部署在智慧屏.车载终端.穿戴等智能终端上,未来会有越来越多的智能设备使用开 ...

  7. Dijkstra,A*,DWA,TEB

    Dijkstra Dijkstra算法是典型的单元最短路径算法,用于计算赋权有向图或无向图的单源最短路径问题 Dijkstra算法的思想是,将带权有向图或无向图中的顶点分为两类,一类是已经确定最短路径 ...

  8. self_drive car_学习笔记--第10课:路径规划

    前言:这节课围绕无人车的路径规划开讲,包含规划时采用的一些算法思路.一些规划方式,最后以Apollo项目规划部分为例子,介绍一下其中所使用的一些规划算法以及方式.很菜现在,有些理解错误地方,还望大大们 ...

  9. 盘点大厂的那些开源项目 ——华为

    HarmonyOS 鸿蒙系统(HarmonyOS),是第一款基于微内核的全场景分布式OS,是华为自主研发的操作系统.华为会率先部署在智慧屏.车载终端.穿戴等智能终端上,未来会有越来越多的智能设备使用开 ...

最新文章

  1. 剑指Offer 替换空格
  2. 打造DMPO通道完成sdwan优化
  3. css3 -webkit-filter
  4. mysql root 提权_mysql以ROOT权限提权方法
  5. 自我分析colly的robots源码
  6. python中的以简单例子解释函数参数、函数定义、函数返回值、函数调用
  7. IDEA快捷键eclipse版(有自定义部分)
  8. ELK-elasticsearch-kibana-logstash 报错问题集锦
  9. 详解Oracle临时表的几种用法及意义
  10. 拓端tecdat|R语言阈值模型代码示例
  11. 胶东机场t1离哪个停车场近,青岛胶东国际机场停车场攻略
  12. 非线性系统离散线性化方法(一)
  13. Java中获取实时时间
  14. matlab计算并联电阻怎么输入,如何用计算器快速计算并联电阻,并联电阻的计算方法...
  15. excel生成随机手机号
  16. CVPR2004/风格分解:Separating Style and Content on a Nonlinear Manifold在非线性流形上分离样式和内容
  17. 如何通过链脉智能名片提高品牌知名度
  18. 图像分割论文 “RANet : Region Attention Network for Semantic Segmentation”
  19. js数组倒序排列+字符串数组转为数字数组
  20. 真正理解nodeJS,nodejs是什么,深入理解node

热门文章

  1. 【已解决】IDEA创建Maven多模块项目子模块引用不到父模块的pom
  2. springboot2.3手册:5分钟用Netty搭建高性能异步WebSocket服务
  3. ImageFeatures
  4. 笔记:宾大《Algebra, Topology, Differential Calculus, and Optimization Theory For CS and ML》——第三章第九节
  5. 跟sky学数字IC前端设计:数字IP_FPGA实战
  6. 计算机辅助翻译翻译技术的译前应用,计算机辅助翻译技术在网络字幕组中的应用...
  7. 今天,公众号留言功能开通啦!| 文末乔迁福利
  8. Android系统apps之Setting的修改和设置
  9. 必不可少!STL源码目录结构分析,附加源码下载链接
  10. 工程伦理--6.4 风险沟通