msg = """ Control Your Robot! --------------------------- Moving around: w a s d x w/x : increase/decrease linear velocity a/d : increase/decrease angular velocity space key, s : force stop CTRL-C to quit """
# This Python file uses the following encoding: utf-8 """autogenerated by genpy from geometry_msgs/Twist.msg. Do not edit.""" import codecs import sys python3 = Trueif sys.hexversion > 0x03000000elseFalse import genpy import struct
import geometry_msgs.msg
classTwist(genpy.Message): _md5sum = "9f195f881246fdfa2798d1d3eebca84a" _type = "geometry_msgs/Twist" _has_header = False# flag to mark the presence of a Header object _full_text = """# This expresses velocity in free space broken into its linear and angular parts. Vector3 linear Vector3 angular ================================================================================ MSG: geometry_msgs/Vector3 # This represents a vector in free space. # It is only meant to represent a direction. Therefore, it does not # make sense to apply a translation to it (e.g., when applying a # generic rigid transformation to a Vector3, tf2 will only apply the # rotation). If you want your data to be translatable too, use the # geometry_msgs/Point message instead. float64 x float64 y float64 z""" __slots__ = ['linear','angular'] _slot_types = ['geometry_msgs/Vector3','geometry_msgs/Vector3']
def__init__(self, *args, **kwds): """ Constructor. Any message fields that are implicitly/explicitly set to None will be assigned a default value. The recommend use is keyword arguments as this is more robust to future message changes. You cannot mix in-order arguments and keyword arguments. The available fields are: linear,angular :param args: complete set of field values, in .msg order :param kwds: use keyword arguments corresponding to message field names to set specific fields. """ if args or kwds: super(Twist, self).__init__(*args, **kwds) # message fields cannot be None, assign default values for those that are if self.linear isNone: self.linear = geometry_msgs.msg.Vector3() if self.angular isNone: self.angular = geometry_msgs.msg.Vector3() else: self.linear = geometry_msgs.msg.Vector3() self.angular = geometry_msgs.msg.Vector3()
def_get_types(self): """ internal API method """ return self._slot_types
defserialize(self, buff): """ serialize message into buffer :param buff: buffer, ``StringIO`` """ try: _x = self buff.write(_get_struct_6d().pack(_x.linear.x, _x.linear.y, _x.linear.z, _x.angular.x, _x.angular.y, _x.angular.z)) except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self))))) except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))
defdeserialize(self, str): """ unpack serialized message in str into this message instance :param str: byte array of serialized message, ``str`` """ if python3: codecs.lookup_error("rosmsg").msg_type = self._type try: if self.linear isNone: self.linear = geometry_msgs.msg.Vector3() if self.angular isNone: self.angular = geometry_msgs.msg.Vector3() end = 0 _x = self start = end end += 48 (_x.linear.x, _x.linear.y, _x.linear.z, _x.angular.x, _x.angular.y, _x.angular.z,) = _get_struct_6d().unpack(str[start:end]) return self except struct.error as e: raise genpy.DeserializationError(e) # most likely buffer underfill
defserialize_numpy(self, buff, numpy): """ serialize message with numpy array types into buffer :param buff: buffer, ``StringIO`` :param numpy: numpy python module """ try: _x = self buff.write(_get_struct_6d().pack(_x.linear.x, _x.linear.y, _x.linear.z, _x.angular.x, _x.angular.y, _x.angular.z)) except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self))))) except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))
defdeserialize_numpy(self, str, numpy): """ unpack serialized message in str into this message instance using numpy for array types :param str: byte array of serialized message, ``str`` :param numpy: numpy python module """ if python3: codecs.lookup_error("rosmsg").msg_type = self._type try: if self.linear isNone: self.linear = geometry_msgs.msg.Vector3() if self.angular isNone: self.angular = geometry_msgs.msg.Vector3() end = 0 _x = self start = end end += 48 (_x.linear.x, _x.linear.y, _x.linear.z, _x.angular.x, _x.angular.y, _x.angular.z,) = _get_struct_6d().unpack(str[start:end]) return self except struct.error as e: raise genpy.DeserializationError(e) # most likely buffer underfill
_struct_I = genpy.struct_I def_get_struct_I(): global _struct_I return _struct_I _struct_6d = None def_get_struct_6d(): global _struct_6d if _struct_6d isNone: _struct_6d = struct.Struct("<6d") return _struct_6d
classPublisher(Topic): """ Class for registering as a publisher of a ROS topic. """
def__init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None): """ Constructor @param name: resource name of topic, e.g. 'laser'. @type name: str @param data_class: message class for serialization @type data_class: L{Message} class @param subscriber_listener: listener for subscription events. May be None. @type subscriber_listener: L{SubscribeListener} @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's socket (disables Nagle algorithm). This results in lower latency publishing at the cost of efficiency. @type tcp_nodelay: bool @param latch: If True, the last message published is 'latched', meaning that any future subscribers will be sent that message immediately upon connection. @type latch: bool @param headers: If not None, a dictionary with additional header key-values being used for future connections. @type headers: dict @param queue_size: The queue size used for asynchronously publishing messages from different threads. A size of zero means an infinite queue, which can be dangerous. When the keyword is not being used or when None is passed all publishing will happen synchronously and a warning message will be printed. @type queue_size: int @raise ROSException: if parameters are invalid """ super(Publisher, self).__init__(name, data_class, Registration.PUB)
if subscriber_listener: self.impl.add_subscriber_listener(subscriber_listener) if tcp_nodelay: get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay) if latch: self.impl.enable_latch() if headers: self.impl.add_headers(headers) if queue_size isnotNone: self.impl.set_queue_size(queue_size) else: import warnings warnings.warn("The publisher should be created with an explicit keyword argument 'queue_size'. " "Please see http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers for more information.", SyntaxWarning, stacklevel=2)
defpublish(self, *args, **kwds): """ Publish message data object to this topic. Publish can either be called with the message instance to publish or with the constructor args for a new Message instance, i.e.:: pub.publish(message_instance) pub.publish(message_field_1, message_field_2...) pub.publish(message_field_1='foo', message_field_2='bar') @param args : L{Message} instance, message arguments, or no args if keyword arguments are used @param kwds : Message keyword arguments. If kwds are used, args must be unset @raise ROSException: If rospy node has not been initialized @raise ROSSerializationException: If unable to serialize message. This is usually a type error with one of the fields. """ if self.impl isNone: raise ROSException("publish() to an unregistered() handle") ifnot is_initialized(): raise ROSException("ROS node has not been initialized yet. Please call init_node() first") data = args_kwds_to_message(self.data_class, args, kwds) try: self.impl.acquire() self.impl.publish(data) except genpy.SerializationError as e: # can't go to rospy.logerr(), b/c this could potentially recurse _logger.error(traceback.format_exc()) raise ROSSerializationException(str(e)) finally: self.impl.release()