本系列文章是 2024 春季学期北航计算机学院本科生课程《软件工程》(嵌入式方向)的实验部分报告,不包含团队大作业项目内容与相关细节

任务2-启智机器人工程的导入与运动控制

实验目的

  • 建立启智机器人仿真环境
  • 掌握仿真环境下机器人的运动控制

实验任务

  • 完成启智机器人及仿真环境的环境配置和项目构建
  • 学习 Twist 消息类型并完成机器人的运动控制

实验内容

建立启智机器人仿真环境

首先在目录中新建一个工作空间

mkdir task2
mkdir task2/src
cd task2/src
catkin_init_workspace

然后通过 Github 将三个源码仓库同时拷贝进 src 目录中,再安装对应包的依赖项,注意安装依赖项与 ROS 的版本相关(kinetic-16.04,Melodic-18.04,Noetic-20.04)

git clone https://github.com/6-robot/wpr_simulation.git
git clone https://github.com/6-robot/wpb_home.git
git clone https://github.com/6-robot/waterplus_map_tools.git

./wpr_simulation/scripts/install_for_melodic.sh
./wpb_home/scripts/install_for_melodic.sh
./waterplus_map_tools/scripts/install_for_melodic.sh

依赖安装完毕后回到根目录编译项目,相当于此时项目就拥有了三个启智机器人内置的软件包,有了执行一定功能的能力

catkin_make

此时已经可以通过执行以下指令开启机器人在 Gazebo 中执行简单的仿真场景

roslaunch wpr_simulation wpb_simple.launch

掌握仿真环境下机器人的运动控制

参考指导书编写一个软件包实现对机器人的运动控制。

首先在 src 目录创建一个软件包,然后创建 scripts 目录并编写一个 python 代码,需要注意对脚本文件添加可执行属性

catkin_create_pkg vel_pkg rospy geometry_msgs
mkdir vel_pkg/scripts
cd vel_pkg/scripts
touch vel_ctrl_node.py
chmod +x vel_ctrl_node.py

这里创建包的含义是:

  • catkin_create_pkg:创建 ROS 源码包
  • vel_pkg:源码包名
  • rospy:python 依赖项(内部文件使用 python 文件编写)
  • geometry_msgs:包含机器人移动速度消息包格式文件的包名

指导书中的 python 代码和重写功能后的代码内容如下:

#!/usr/bin/env python
# coding=utf-8

# 引入两个声明包时用到的库:`rospy` 包含 python 中使用的 ROS 函数;`geometry_msgs.msg` 包含 `Twist` 模块
import rospy
from geometry_msgs.msg import Twist

# 旧
if __name__ == "__main__":

# 初始化节点,命名为 vel_ctrl_node
rospy.init_node("vel_ctrl_node")
# 发布速度控制话题 cmd_vel(运动控制话题)
vel_pub = rospy.Publisher("cmd_vel",Twist,queue_size=10)
# 构建 Twist 速度消息包 msg,并赋值
msg = Twist()
msg.linear.x = 0.1 # 只发送 x 方向线性速度
# msg.angular.z =0.1
# 构建发送频率对象 10Hz
rate = rospy.Rate(10)
while not rospy.is_shutdown():
rospy.loginfo("发送一个速度消息包")
vel_pub.publish(msg) # 循环在话题中发布数据包
rate.sleep()


# 新
#!/usr/bin/env python
import rospy
from geometry_msgs.msg import Twist
import sys, select, termios, tty

msg = """
Control Your Robot!
---------------------------
Moving around:
w
a s d
x

w/x : increase/decrease linear velocity
a/d : increase/decrease angular velocity
space key, s : force stop

CTRL-C to quit
"""

def getKey():
tty.setraw(sys.stdin.fileno())
rlist, _, _ = select.select([sys.stdin], [], [], 0.1)
if rlist:
key = sys.stdin.read(1)
else:
key = ''

termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
return key

speed = 0.5
turn = 1

def vels(speed,turn):
return "currently:\tspeed %s\tturn %s " % (speed,turn)

if __name__=="__main__":
settings = termios.tcgetattr(sys.stdin)

rospy.init_node('robot_teleop')
pub = rospy.Publisher('/cmd_vel', Twist, queue_size=5)

x = 0
th = 0
status = 0

try:
print(msg)
print(vels(speed,turn))
while(1):
key = getKey()
if key == 'w':
x = speed
th = 0
elif key == 's':
x = 0
th = 0
elif key == 'a':
x = 0
th = turn
elif key == 'd':
x = 0
th = -turn
elif key == 'x':
x = -speed
th = 0
elif key == ' ':
x = 0
th = 0
else:
if (key == '\x03'):
break

twist = Twist()
twist.linear.x = x; twist.linear.y = 0; twist.linear.z = 0
twist.angular.x = 0; twist.angular.y = 0; twist.angular.z = th
pub.publish(twist)

except Exception as e:
print(e)

finally:
twist = Twist()
twist.linear.x = 0; twist.linear.y = 0; twist.linear.z = 0
twist.angular.x = 0; twist.angular.y = 0; twist.angular.z = 0
pub.publish(twist)

termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)

然后现在回到项目根目录,重新编译,然后再配置环境,最终运行控制节点:

cd ~/task2
catkin_make
source ./devel/setup.bash
roslaunch wpr_simulation wpb_simple.launch
rosrun vel_pkg vel_ctrl_node.py

然后就能通过 Gazebo 环境观察仿真机器人的运动,并通过 vel_ctrl_node 节点对其发出指令控制其运动

image-20240315223149660

实验之后

实验中使用到的一些相关知识的解读,以 melodic 版本的 python 方向为例

Twist速度控制

Twist 是包含在 geometry_msg.msg 包中的类,用于向机器人的核心节点发送运动控制数据。通常的使用方式为在文件中创建对象,修改速度数据后通过 Publisher 发布到专用的速度控制话题 \cmd_vel 中进行速度的控制。

这个速度被分解为线速度和角速度,分别为 Twist 对象的不同字段

# This Python file uses the following encoding: utf-8
"""autogenerated by genpy from geometry_msgs/Twist.msg. Do not edit."""
import codecs
import sys
python3 = True if sys.hexversion > 0x03000000 else False
import genpy
import struct

import geometry_msgs.msg

class Twist(genpy.Message):
_md5sum = "9f195f881246fdfa2798d1d3eebca84a"
_type = "geometry_msgs/Twist"
_has_header = False # flag to mark the presence of a Header object
_full_text = """# This expresses velocity in free space broken into its linear and angular parts.
Vector3 linear
Vector3 angular

================================================================================
MSG: geometry_msgs/Vector3
# This represents a vector in free space.
# It is only meant to represent a direction. Therefore, it does not
# make sense to apply a translation to it (e.g., when applying a
# generic rigid transformation to a Vector3, tf2 will only apply the
# rotation). If you want your data to be translatable too, use the
# geometry_msgs/Point message instead.

float64 x
float64 y
float64 z"""
__slots__ = ['linear','angular']
_slot_types = ['geometry_msgs/Vector3','geometry_msgs/Vector3']

def __init__(self, *args, **kwds):
"""
Constructor. Any message fields that are implicitly/explicitly
set to None will be assigned a default value. The recommend
use is keyword arguments as this is more robust to future message
changes. You cannot mix in-order arguments and keyword arguments.

The available fields are:
linear,angular

:param args: complete set of field values, in .msg order
:param kwds: use keyword arguments corresponding to message field names
to set specific fields.
"""
if args or kwds:
super(Twist, self).__init__(*args, **kwds)
# message fields cannot be None, assign default values for those that are
if self.linear is None:
self.linear = geometry_msgs.msg.Vector3()
if self.angular is None:
self.angular = geometry_msgs.msg.Vector3()
else:
self.linear = geometry_msgs.msg.Vector3()
self.angular = geometry_msgs.msg.Vector3()

def _get_types(self):
"""
internal API method
"""
return self._slot_types

def serialize(self, buff):
"""
serialize message into buffer
:param buff: buffer, ``StringIO``
"""
try:
_x = self
buff.write(_get_struct_6d().pack(_x.linear.x, _x.linear.y, _x.linear.z, _x.angular.x, _x.angular.y, _x.angular.z))
except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))
except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))

def deserialize(self, str):
"""
unpack serialized message in str into this message instance
:param str: byte array of serialized message, ``str``
"""
if python3:
codecs.lookup_error("rosmsg").msg_type = self._type
try:
if self.linear is None:
self.linear = geometry_msgs.msg.Vector3()
if self.angular is None:
self.angular = geometry_msgs.msg.Vector3()
end = 0
_x = self
start = end
end += 48
(_x.linear.x, _x.linear.y, _x.linear.z, _x.angular.x, _x.angular.y, _x.angular.z,) = _get_struct_6d().unpack(str[start:end])
return self
except struct.error as e:
raise genpy.DeserializationError(e) # most likely buffer underfill


def serialize_numpy(self, buff, numpy):
"""
serialize message with numpy array types into buffer
:param buff: buffer, ``StringIO``
:param numpy: numpy python module
"""
try:
_x = self
buff.write(_get_struct_6d().pack(_x.linear.x, _x.linear.y, _x.linear.z, _x.angular.x, _x.angular.y, _x.angular.z))
except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))
except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))

def deserialize_numpy(self, str, numpy):
"""
unpack serialized message in str into this message instance using numpy for array types
:param str: byte array of serialized message, ``str``
:param numpy: numpy python module
"""
if python3:
codecs.lookup_error("rosmsg").msg_type = self._type
try:
if self.linear is None:
self.linear = geometry_msgs.msg.Vector3()
if self.angular is None:
self.angular = geometry_msgs.msg.Vector3()
end = 0
_x = self
start = end
end += 48
(_x.linear.x, _x.linear.y, _x.linear.z, _x.angular.x, _x.angular.y, _x.angular.z,) = _get_struct_6d().unpack(str[start:end])
return self
except struct.error as e:
raise genpy.DeserializationError(e) # most likely buffer underfill

_struct_I = genpy.struct_I
def _get_struct_I():
global _struct_I
return _struct_I
_struct_6d = None
def _get_struct_6d():
global _struct_6d
if _struct_6d is None:
_struct_6d = struct.Struct("<6d")
return _struct_6d

总的来看 Twist 类包括以下内容:

  • 类型数据定义:一些内部识别用的字段,一般用不上
  • 序列化/反序列化方法:serializedeserializeserialize_numpydeserialize_numpy,这里的序列化方法在发布消息时会将 Twist 对象序列化后在话题中发布
  • 功能数据定义:使用 __slots__用于声明类实例的属性列表,创建了 linearangular 两个分量均为 float64 的三维向量(x, y, z),分别存储对应坐标轴的线速度和角速度
    • 具体到运动实际场景中,linearx、y 分量对应平面移动(m/s);angular z 分量对应旋转的角速度(rad/s),旋转的正方向遵守右手定则

话题信息发布

向指定话题发布信息时需要使用 Publisher 对象。它用于注册并作为一个特定话题的发布者,类的对象初始化时需要先指定其使用的话题,并通过执行 publish() 方法将所携带的消息对象发布到初始化时指定的话题中。

Publisher 类体现了ROS中的发布-订阅模式,使得不同的节点能够以松耦合的方式通信(实际上 vue 中父子组件使用 $emit 也类似于 Publisher 和 Subscriber)

class Publisher(Topic):
"""
Class for registering as a publisher of a ROS topic.
"""

def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None):
"""
Constructor
@param name: resource name of topic, e.g. 'laser'.
@type name: str
@param data_class: message class for serialization
@type data_class: L{Message} class
@param subscriber_listener: listener for
subscription events. May be None.
@type subscriber_listener: L{SubscribeListener}
@param tcp_nodelay: If True, sets TCP_NODELAY on
publisher's socket (disables Nagle algorithm). This results
in lower latency publishing at the cost of efficiency.
@type tcp_nodelay: bool
@param latch: If True, the last message published is
'latched', meaning that any future subscribers will be sent
that message immediately upon connection.
@type latch: bool
@param headers: If not None, a dictionary with additional header
key-values being used for future connections.
@type headers: dict
@param queue_size: The queue size used for asynchronously
publishing messages from different threads. A size of zero
means an infinite queue, which can be dangerous. When the
keyword is not being used or when None is passed all
publishing will happen synchronously and a warning message
will be printed.
@type queue_size: int
@raise ROSException: if parameters are invalid
"""
super(Publisher, self).__init__(name, data_class, Registration.PUB)

if subscriber_listener:
self.impl.add_subscriber_listener(subscriber_listener)
if tcp_nodelay:
get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay)
if latch:
self.impl.enable_latch()
if headers:
self.impl.add_headers(headers)
if queue_size is not None:
self.impl.set_queue_size(queue_size)
else:
import warnings
warnings.warn("The publisher should be created with an explicit keyword argument 'queue_size'. "
"Please see http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers for more information.", SyntaxWarning, stacklevel=2)

def publish(self, *args, **kwds):
"""
Publish message data object to this topic.
Publish can either be called with the message instance to
publish or with the constructor args for a new Message
instance, i.e.::
pub.publish(message_instance)
pub.publish(message_field_1, message_field_2...)
pub.publish(message_field_1='foo', message_field_2='bar')

@param args : L{Message} instance, message arguments, or no args if keyword arguments are used
@param kwds : Message keyword arguments. If kwds are used, args must be unset
@raise ROSException: If rospy node has not been initialized
@raise ROSSerializationException: If unable to serialize
message. This is usually a type error with one of the fields.
"""
if self.impl is None:
raise ROSException("publish() to an unregistered() handle")
if not is_initialized():
raise ROSException("ROS node has not been initialized yet. Please call init_node() first")
data = args_kwds_to_message(self.data_class, args, kwds)
try:
self.impl.acquire()
self.impl.publish(data)
except genpy.SerializationError as e:
# can't go to rospy.logerr(), b/c this could potentially recurse
_logger.error(traceback.format_exc())
raise ROSSerializationException(str(e))
finally:
self.impl.release()

类定义的主要字段有

  • name:定义的话题名称
  • data_class:初始化的数据类
  • subscriber_listener:订阅事件的监听器,用于监听同话题中 Subscribers 的变化
  • latch:消息锁存,开启锁存时未来的订阅者在加入话题时会立刻接收到信息
  • queue_size:用于从不同线程异步发布消息的队列大小。0表示无限队列,可能会危险。当未使用此关键字或传递 None 时,所有发布将同步进行,并会打印一个警告信息

类当中只含一个 publish 方法,用于向话题发布消息,它可以传递某种类型的消息实例,或者通过一些参数在 Publisher 内部构造消息并自行发送。

publish 方法执行前会先检查自身是否实现(impl)、ROS_node 是否初始化,然后利用锁实现控制消息发送的并发量的操作,最后发布消息。