flash网站作品欣赏,无锡网站制作启航好,郑州网站建设方案报价,素材网站pinterest目录 一、初始化和关闭节点二、发布者三、订阅者四、服务端五、客户端六、参数管理七、日志记录八、生命周期管理 ROS2 在 Python 编程中引入了一些新的概念和 API#xff0c;这些变化使得代码更加模块化和易于维护。特别是 rclpy 库提供了更丰富的功能和更好的错误处理机制这些变化使得代码更加模块化和易于维护。特别是 rclpy 库提供了更丰富的功能和更好的错误处理机制同时支持异步编程模型。如果你已经熟悉 ROS1 的 Python 编程这些变化应该不会太难适应。
一、初始化和关闭节点
ROS1:
import rospydef main():rospy.init_node(my_node, anonymousTrue)# 节点逻辑rospy.spin()if __name__ __main__:main()ROS2:
import rclpy
from rclpy.node import Nodeclass MyNode(Node):def __init__(self):super().__init__(my_node)# 节点逻辑def main(argsNone):rclpy.init(argsargs)node MyNode()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ __main__:main()二、发布者
ROS1:
import rospy
from std_msgs.msg import Stringdef talker():pub rospy.Publisher(chatter, String, queue_size10)rospy.init_node(talker, anonymousTrue)rate rospy.Rate(10) # 10 Hzwhile not rospy.is_shutdown():hello_str hello world %s % rospy.get_time()rospy.loginfo(hello_str)pub.publish(hello_str)rate.sleep()if __name__ __main__:try:talker()except rospy.ROSInterruptException:passROS2:
import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass Talker(Node):def __init__(self):super().__init__(talker)self.publisher_ self.create_publisher(String, chatter, 10)timer_period 1 # secondsself.timer self.create_timer(timer_period, self.timer_callback)def timer_callback(self):msg String()msg.data fHello World {self.get_clock().now().nanoseconds // 1000000}self.get_logger().info(fPublishing: {msg.data})self.publisher_.publish(msg)def main(argsNone):rclpy.init(argsargs)node Talker()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ __main__:main()三、订阅者
ROS1:
import rospy
from std_msgs.msg import Stringdef callback(data):rospy.loginfo(rospy.get_caller_id() I heard %s, data.data)def listener():rospy.init_node(listener, anonymousTrue)rospy.Subscriber(chatter, String, callback)rospy.spin()if __name__ __main__:listener()ROS2:
import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass Listener(Node):def __init__(self):super().__init__(listener)self.subscription self.create_subscription(String, chatter, self.listener_callback, 10)self.subscription # prevent unused variable warningdef listener_callback(self, msg):self.get_logger().info(fI heard: {msg.data})def main(argsNone):rclpy.init(argsargs)node Listener()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ __main__:main()四、服务端
ROS1:
import rospy
from std_srvs.srv import AddTwoIntsdef handle_add_two_ints(req):rospy.loginfo(fReturning [{req.a} {req.b} {req.a req.b}])return AddTwoIntsResponse(req.a req.b)def add_two_ints_server():rospy.init_node(add_two_ints_server)s rospy.Service(add_two_ints, AddTwoInts, handle_add_two_ints)rospy.spin()if __name__ __main__:add_two_ints_server()ROS2:
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoIntsclass AddTwoIntsService(Node):def __init__(self):super().__init__(add_two_ints_server)self.srv self.create_service(AddTwoInts, add_two_ints, self.add_two_ints_callback)def add_two_ints_callback(self, request, response):response.sum request.a request.bself.get_logger().info(fReturning [{request.a} {request.b} {response.sum}])return responsedef main(argsNone):rclpy.init(argsargs)node AddTwoIntsService()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ __main__:main()五、客户端
ROS1:
import rospy
from std_srvs.srv import AddTwoIntsdef add_two_ints_client(x, y):rospy.wait_for_service(add_two_ints)try:add_two_ints rospy.ServiceProxy(add_two_ints, AddTwoInts)resp1 add_two_ints(x, y)return resp1.sumexcept rospy.ServiceException as e:print(fService call failed: {e})if __name__ __main__:rospy.init_node(add_two_ints_client)x 1y 2print(fRequesting {x}{y})print(f{x} {y} {add_two_ints_client(x, y)})ROS2:
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoIntsclass AddTwoIntsClient(Node):def __init__(self):super().__init__(add_two_ints_client)self.cli self.create_client(AddTwoInts, add_two_ints)while not self.cli.wait_for_service(timeout_sec1.0):self.get_logger().info(service not available, waiting again...)self.req AddTwoInts.Request()def send_request(self, a, b):self.req.a aself.req.b bself.future self.cli.call_async(self.req)def main(argsNone):rclpy.init(argsargs)node AddTwoIntsClient()node.send_request(16, 2)while rclpy.ok():rclpy.spin_once(node)if node.future.done():try:response node.future.result()except Exception as e:node.get_logger().info(fService call failed {e})else:node.get_logger().info(fResult of add_two_ints: {response.sum})breaknode.destroy_node()rclpy.shutdown()if __name__ __main__:main()六、参数管理
ROS1:
在ROS1中参数管理是通过全局参数服务器来实现的。
import rospydef main():rospy.init_node(my_node, anonymousTrue)# 获取参数param_value rospy.get_param(param_name, default_value)rospy.loginfo(fParameter value: {param_value})# 设置参数rospy.set_param(param_name, new_value)if __name__ __main__:main()ROS2:
在ROS2中参数管理更加灵活支持类型安全的参数接口和参数描述符。
import rclpy
from rclpy.node import Nodeclass MyNode(Node):def __init__(self):super().__init__(my_node)# 获取参数param_value self.get_parameter(param_name).get_parameter_value().string_valueself.get_logger().info(fParameter value: {param_value})# 设置参数self.set_parameters([rclpy.parameter.Parameter(param_name, rclpy.Parameter.Type.STRING, new_value)])def main(argsNone):rclpy.init(argsargs)node MyNode()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ __main__:main()七、日志记录
ROS1:
在ROS1中日志记录使用 rospy 提供的函数。
import rospydef main():rospy.init_node(my_node, anonymousTrue)rospy.loginfo(This is an info message.)rospy.logwarn(This is a warning message.)rospy.logerr(This is an error message.)rospy.logfatal(This is a fatal message.) if __name__ __main__:main()ROS2:
在ROS2中日志记录使用 rclpy 提供的函数。
import rclpy
from rclpy.node import Nodeclass MyNode(Node):def __init__(self):super().__init__(my_node)self.get_logger().info(This is an info message.)self.get_logger().warning(This is a warning message.)self.get_logger().error(This is an error message.)self.get_logger().fatal(This is a fatal message.)def main(argsNone):rclpy.init(argsargs)node MyNode()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ __main__:main()八、生命周期管理
ROS1:
ROS1没有内置的生命周期管理功能通常需要开发者自己实现节点的生命周期管理。
ROS2:
ROS2引入了生命周期管理允许更精细地控制节点的启动和停止过程。
import rclpy
from rclpy.lifecycle import Node, State, TransitionCallbackReturn
from rclpy.node import Node as BaseNodeclass LifecycleNode(Node):def __init__(self):super().__init__(lifecycle_node)def on_configure(self, state: State) - TransitionCallbackReturn:self.get_logger().info(on_configure() is called.)return TransitionCallbackReturn.SUCCESSdef on_activate(self, state: State) - TransitionCallbackReturn:self.get_logger().info(on_activate() is called.)return TransitionCallbackReturn.SUCCESSdef on_deactivate(self, state: State) - TransitionCallbackReturn:self.get_logger().info(on_deactivate() is called.)return TransitionCallbackReturn.SUCCESSdef on_cleanup(self, state: State) - TransitionCallbackReturn:self.get_logger().info(on_cleanup() is called.)return TransitionCallbackReturn.SUCCESSdef on_shutdown(self, state: State) - TransitionCallbackReturn:self.get_logger().info(on_shutdown() is called.)return TransitionCallbackReturn.SUCCESSdef main(argsNone):rclpy.init(argsargs)node LifecycleNode()executor rclpy.executors.SingleThreadedExecutor()executor.add_node(node)try:executor.spin()except KeyboardInterrupt:passnode.on_shutdown(State(id9, labelunconfigured))node.destroy_node()rclpy.shutdown()if __name__ __main__:main()欢迎大家加QQ群一起讨论学习894013891