話題通信實踐
使用話題通信實現一個系統信息監視工具
消息接口是定義通信數據結構的規則,消息類型是根據規則生成的具體代碼實現,供節點使用,這里由于沒有對應的消息類型所以我們需要先自定義一個消息接口。
需要兩個基礎包
builtin_interfaces:ROS 2 自帶的一個接口包,里面定義了一些基礎消息類型。
rosidl_default_generators:一個構建工具包,提供生成消息/服務/動作接口代碼的 CMake 宏和邏輯。
自定義的消息接口如下
builtin_interfaces/Time stamp # 記錄時間戳
string host_name # 主機名字
float32 cpu_percent # CPu使用率
float32 memory_percent # 內存使用率
float32 memory_total # 內存總大小
float32 memory_available # 內存總大小
float64 net_sent # 網絡發送數據總量imp8Mb
float64 net_recv # 網絡數據接受總量M
Cmake函數,將消息接口轉換為庫或頭文件,這樣編譯時生成對應的 C++/Python 類型
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SystemStatus.msg"
DEPENDENCIES builtin_interfaces
)
<member_of_group>rosidl_interface_packages</member_of_group> :它聲明當前包是一個 ROS 2 接口包,用于提供消息、服務或動作定義。

以下是項目將自定義的消息接口生成對應的頭文件


ros2 pkg create status_publisher --build-type ament_python --dependencies rclpy status_interfaces --license Apache-2.0創建發布者功能包,上面創建的是消息接口的功能包
每次打開新的終端都要刷新環境變量
import rclpy
import psutil
import platform
from status_interfaces.msg import SystemStatus
from rclpy.node import Node
class StatusPublisher(Node):
def __init__(self,node_name):
super().__init__(node_name)
# 話題名稱為sys_status,發布者為節點內部成員
self.status_publisher = self.create_publisher(
SystemStatus,'sys_status',10
)
self.timer_ = self.create_timer(1.0,self.timer_callback)
def timer_callback(self):
# 獲取信息
cpu_percent = psutil.cpu_percent()
memory_info = psutil.virtual_memory()
net_io_counters = psutil.net_io_counters()
# 輸入消息
msg = SystemStatus()
msg.stamp = self.get_clock().now().to_msg()
msg.host_name = platform.node()
msg.cpu_percent = cpu_percent
msg.memory_percent=memory_info.percent
# 將B轉換為MB
msg.memory_total = memory_info.total / 1024/ 1024
msg.memory_available=memory_info.available/ 1024/ 1024
msg.net_sent = net_io_counters.bytes_sent / 1024/ 1024
msg.net_recv = net_io_counters.bytes_recv / 1024/ 1024
self.get_logger().info(f'publish {str(msg)}')
self.status_publisher.publish(msg)
def main():
rclpy.init()
#節點名稱為statusPub
node = StatusPublisher('statusPub')
rclpy.spin(node)
rclpy.shutdown()

使用QT創建可視化窗口,將訂閱者獲取得到的消息進行可視化展示
cmake_minimum_required(VERSION 3.8)
project(status_display)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(status_interfaces REQUIRED)
find_package(Qt5 REQUIRED COMPONENTS Widgets)
add_executable(statusDisplay src/status_display.cpp)
ament_target_dependencies(statusDisplay rclcpp status_interfaces)
target_link_libraries(statusDisplay Qt5::Widgets)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
install(TARGETS statusDisplay
DESTINATION lib/${PROJECT_NAME}
)
ament_package()
#include<QApplication>
#include <QLabel>
#include <QString>
#include <rclcpp/rclcpp.hpp>
#include <status_interfaces/msg/system_status.hpp>
using namespace std;
using namespace rclcpp;
using SystemStatus = status_interfaces::msg::SystemStatus;
class StatusDisplay: public Node{
private:
Subscription<SystemStatus>::SharedPtr subscriber;
QLabel *label;
public:
StatusDisplay():Node("statusDisplay"){
label =new QLabel();
subscriber = this->create_subscription<SystemStatus>(
// lambada表達式創建匿名函數,每獲取到一條消息就轉換為對應字符串打印到界面上
"sys_status",10,[&](const SystemStatus::SharedPtr msg) -> void{
label->setText(get_qstr_from_msg(msg));
}
);
label->setText(get_qstr_from_msg(std::make_shared<SystemStatus>()));
label->show();
};
// 將獲取到的消息轉為我們要打印的字符串
QString get_qstr_from_msg(const SystemStatus::SharedPtr msg){
stringstream show_str;
show_str
<< "===新提供狀態可視化顯示工具=====\n"
<< "數據 時間:\t" << msg->stamp.sec << "\ts\n"
<< "主機名字:\t" << msg->host_name << "\t\n"
<< "CPU使用率:\t" << msg->cpu_percent <<"\t%\n"
<< "內存使用率:\t" << msg->memory_percent << "\t%\n"
<< "內存總大小:\t" << msg->memory_total << "\tMB\n"
<< "剩余有效內存:\t" << msg->memory_available << "\tMB\n"
<< "網絡發送量:\t" << msg->net_sent << "\tMB\n"
<< "網絡接受量:\t" << msg->net_recv << "\tMB\n"
<< "===============================";
return QString::fromStdString(show_str.str());
};
};
int main(int argc,char *argv[]){
rclcpp::init(argc,argv);
QApplication app(argc,argv);
auto node = make_shared<StatusDisplay>();
// 由于執行ROS和Qt界面都會堵塞系統,需要再開一個線程進行異步執行
thread spin_thread([&]()->void{
rclcpp::spin(node);
rclcpp::shutdown();
});
spin_thread.detach();
app.exec();
return 0;
}


浙公網安備 33010602011771號