#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "rclcpp/rclcpp.hpp" #include "nmea_msgs/msg/sentence.hpp" #include "geometry_msgs/msg/twist_stamped.hpp" #include class SerialNtripNmea : public rclcpp::Node { public: SerialNtripNmea() : Node("serial_ntrip_nmea") { // 声明参数 this->declare_parameter("serial_port", "/dev/ttyUSB0"); this->declare_parameter("baud_rate", 115200); this->declare_parameter("ntrip_host", "103.143.19.54"); this->declare_parameter("ntrip_port", 8002); this->declare_parameter("mountpoint", "RTCM32GRCpro"); this->declare_parameter("username", "7ripg398"); this->declare_parameter("password", "97344"); this->declare_parameter("nmea_topic", "navsat/nmea_sentence"); this->declare_parameter("frame_id", "sentence"); // 获取参数 serial_port_ = this->get_parameter("serial_port").as_string(); baud_rate_ = this->get_parameter("baud_rate").as_int(); ntrip_host_ = this->get_parameter("ntrip_host").as_string(); ntrip_port_ = this->get_parameter("ntrip_port").as_int(); mountpoint_ = this->get_parameter("mountpoint").as_string(); username_ = this->get_parameter("username").as_string(); password_ = this->get_parameter("password").as_string(); nmea_topic_ = this->get_parameter("nmea_topic").as_string(); frame_id_ = this->get_parameter("frame_id").as_string(); // 创建NMEA消息发布器 nmea_pub_ = this->create_publisher(nmea_topic_, 10); // 额外创建一个带 RMC 航向修正的 NMEA 话题,保持原有话题行为不变 nmea_fix_pub_ = this->create_publisher("navsat/nmea_sentence_fix", 10); // 订阅 /cruise/wheel/twist 用于静止判定 twist_sub_ = this->create_subscription( "/cruise/wheel/twist", 10, std::bind(&SerialNtripNmea::twist_callback, this, std::placeholders::_1)); RCLCPP_INFO(this->get_logger(), "SerialNtripNmea初始化:"); RCLCPP_INFO(this->get_logger(), " 串口: %s @ %d baud", serial_port_.c_str(), baud_rate_); RCLCPP_INFO(this->get_logger(), " NTRIP服务器: %s:%d", ntrip_host_.c_str(), ntrip_port_); RCLCPP_INFO(this->get_logger(), " 挂载点: %s", mountpoint_.c_str()); RCLCPP_INFO(this->get_logger(), " NMEA话题: %s", nmea_topic_.c_str()); // 启动服务 if (setup_serial() && connect_ntrip()) { running_ = true; start_server(); } } ~SerialNtripNmea() { running_ = false; if (serial_fd_ >= 0) close(serial_fd_); if (ntrip_socket_ >= 0) close(ntrip_socket_); } private: std::string serial_port_; int baud_rate_; std::string ntrip_host_; int ntrip_port_; std::string mountpoint_; std::string username_; std::string password_; std::string nmea_topic_; std::string frame_id_; int serial_fd_{-1}; int ntrip_socket_{-1}; rclcpp::Publisher::SharedPtr nmea_pub_; rclcpp::Publisher::SharedPtr nmea_fix_pub_; rclcpp::Subscription::SharedPtr twist_sub_; rclcpp::TimerBase::SharedPtr status_timer_; std::atomic running_{false}; std::atomic ntrip_connected_{false}; // 静止判定:从 /cruise/wheel/twist 获取,linear.x==0 且 angular.z==0 为静止 std::atomic is_stationary_{false}; // RMC 静止航向修正相关状态(仅用于 navsat/nmea_sentence_fix) std::deque rmc_heading_history_; bool in_stationary_mode_{false}; bool stationary_heading_locked_{false}; double stationary_locked_heading_{0.0}; bool setup_serial() { serial_fd_ = open(serial_port_.c_str(), O_RDWR | O_NOCTTY | O_NONBLOCK); if (serial_fd_ < 0) { RCLCPP_ERROR(this->get_logger(), "串口打开失败: %s - %s", serial_port_.c_str(), strerror(errno)); return false; } struct termios tty; if (tcgetattr(serial_fd_, &tty) != 0) { RCLCPP_ERROR(this->get_logger(), "获取串口属性失败: %s", strerror(errno)); return false; } // 设置波特率 speed_t speed; switch (baud_rate_) { case 9600: speed = B9600; break; case 19200: speed = B19200; break; case 38400: speed = B38400; break; case 57600: speed = B57600; break; case 115200: speed = B115200; break; case 230400: speed = B230400; break; default: RCLCPP_ERROR(this->get_logger(), "不支持的波特率: %d", baud_rate_); return false; } cfsetospeed(&tty, speed); cfsetispeed(&tty, speed); // 设置串口参数 tty.c_cflag &= ~PARENB; tty.c_cflag &= ~CSTOPB; tty.c_cflag &= ~CSIZE; tty.c_cflag |= CS8; tty.c_cflag &= ~CRTSCTS; tty.c_cflag |= (CREAD | CLOCAL); tty.c_lflag = 0; tty.c_iflag = 0; tty.c_oflag = 0; tty.c_cc[VTIME] = 0; tty.c_cc[VMIN] = 0; if (tcsetattr(serial_fd_, TCSANOW, &tty) != 0) { RCLCPP_ERROR(this->get_logger(), "设置串口属性失败: %s", strerror(errno)); return false; } tcflush(serial_fd_, TCIOFLUSH); RCLCPP_INFO(this->get_logger(), "串口连接成功: %s", serial_port_.c_str()); return true; } bool connect_ntrip() { ntrip_socket_ = socket(AF_INET, SOCK_STREAM, 0); if (ntrip_socket_ < 0) { RCLCPP_ERROR(this->get_logger(), "创建NTRIP socket失败: %s", strerror(errno)); return false; } struct sockaddr_in ntrip_addr; ntrip_addr.sin_family = AF_INET; ntrip_addr.sin_port = htons(ntrip_port_); ntrip_addr.sin_addr.s_addr = inet_addr(ntrip_host_.c_str()); if (connect(ntrip_socket_, (struct sockaddr*)&ntrip_addr, sizeof(ntrip_addr)) < 0) { RCLCPP_ERROR(this->get_logger(), "连接NTRIP服务器失败: %s", strerror(errno)); return false; } // 构建NTRIP请求 std::string auth_string = username_ + ":" + password_; std::string auth_b64 = base64_encode(auth_string); std::stringstream request; request << "GET /" << mountpoint_ << " HTTP/1.1\r\n"; request << "User-Agent: NTRIP SerialNtripNmea/1.0\r\n"; request << "Authorization: Basic " << auth_b64 << "\r\n"; request << "Accept: gnss/data\r\n"; request << "Connection: close\r\n"; request << "\r\n"; std::string request_str = request.str(); RCLCPP_INFO(this->get_logger(), "发送NTRIP请求到服务器: %s:%d", ntrip_host_.c_str(), ntrip_port_); ssize_t sent = send(ntrip_socket_, request_str.c_str(), request_str.length(), 0); if (sent < 0) { RCLCPP_ERROR(this->get_logger(), "发送NTRIP请求失败: %s", strerror(errno)); return false; } // 读取响应 char buffer[1024]; ssize_t received = recv(ntrip_socket_, buffer, sizeof(buffer) - 1, 0); if (received > 0) { buffer[received] = '\0'; std::string response(buffer); if (response.find("200") != std::string::npos) { ntrip_connected_ = true; RCLCPP_INFO(this->get_logger(), "NTRIP连接成功: %s", mountpoint_.c_str()); return true; } else { RCLCPP_ERROR(this->get_logger(), "NTRIP连接失败: %s", response.c_str()); } } else { RCLCPP_ERROR(this->get_logger(), "未收到NTRIP服务器响应"); } return false; } void twist_callback(const geometry_msgs::msg::TwistStamped::ConstSharedPtr msg) { constexpr double eps = 1e-6; bool stationary = (std::fabs(msg->twist.linear.x) < eps) && (std::fabs(msg->twist.angular.z) < eps); is_stationary_.store(stationary); } void start_server() { // 启动串口数据读取和NTRIP发送线程 std::thread serial_thread(&SerialNtripNmea::read_serial_and_process, this); serial_thread.detach(); // 启动NTRIP数据接收和写入串口线程 std::thread ntrip_thread(&SerialNtripNmea::read_ntrip_and_write_serial, this); ntrip_thread.detach(); // 使用定时器显示状态,使 spin() 能处理 twist 订阅回调 status_timer_ = this->create_wall_timer( std::chrono::seconds(1), [this]() { if (rclcpp::ok() && running_) { RCLCPP_INFO(this->get_logger(), "运行状态 - NTRIP连接: %s", ntrip_connected_ ? "是" : "否"); } }); } void read_serial_and_process() { std::vector buffer(1024); std::string nmea_buffer; RCLCPP_INFO(this->get_logger(), "开始读取串口数据并处理"); while (rclcpp::ok() && running_) { ssize_t bytes_read = read(serial_fd_, buffer.data(), buffer.size()); if (bytes_read > 0) { // 处理接收到的数据 for (ssize_t i = 0; i < bytes_read; ++i) { char c = buffer[i]; nmea_buffer += c; // 检测NMEA语句结束符(换行符) if (c == '\n') { // 移除换行符和回车符 if (!nmea_buffer.empty()) { // 先移除可能的换行符 '\n' if (nmea_buffer.back() == '\n') { nmea_buffer.pop_back(); // 再移除可能的回车符 '\r' if (!nmea_buffer.empty() && nmea_buffer.back() == '\r') { nmea_buffer.pop_back(); } } else if (nmea_buffer.back() == '\r') { // 如果只有回车符,直接移除 nmea_buffer.pop_back(); } } // 检查是否为有效的NMEA语句(以$开头) if (!nmea_buffer.empty() && nmea_buffer[0] == '$') { // 发布NMEA消息到ROS话题 auto nmea_msg = nmea_msgs::msg::Sentence(); nmea_msg.header.frame_id = frame_id_; nmea_msg.header.stamp = this->now(); nmea_msg.sentence = nmea_buffer; // 原始 NMEA 话题,行为保持不变 nmea_pub_->publish(nmea_msg); // 带 RMC 航向修正的 NMEA 话题 auto nmea_msg_fix = nmea_msg; nmea_msg_fix.sentence = make_fixed_nmea_sentence(nmea_buffer); nmea_fix_pub_->publish(nmea_msg_fix); RCLCPP_DEBUG(this->get_logger(), "发布NMEA消息: %s", nmea_buffer.c_str()); // 检查是否为GGA语句,发送到NTRIP服务器 bool is_gga = false; if (nmea_buffer.length() >= 6) { std::string nmea_type = nmea_buffer.substr(1, 5); if (nmea_type == "GPGGA" || nmea_type == "GNGGA") { is_gga = true; } } if (is_gga && ntrip_connected_) { // 发送GGA数据到NTRIP服务器 ssize_t sent = send(ntrip_socket_, nmea_buffer.c_str(), nmea_buffer.length(), MSG_DONTWAIT | MSG_NOSIGNAL); if (sent > 0) { RCLCPP_DEBUG(this->get_logger(), "发送GGA数据到NTRIP: %s", nmea_buffer.c_str()); } else if (sent < 0) { if (errno == EPIPE || errno == ECONNRESET) { RCLCPP_ERROR(this->get_logger(), "NTRIP连接断开"); ntrip_connected_ = false; } } } } nmea_buffer.clear(); } } RCLCPP_DEBUG(this->get_logger(), "从串口接收到 %zd bytes 数据", bytes_read); } else if (bytes_read < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { usleep(10000); // 10ms轮询间隔 } else { RCLCPP_ERROR(this->get_logger(), "串口读取错误: %s", strerror(errno)); break; } } else { usleep(10000); // 10ms轮询间隔 } } } void read_ntrip_and_write_serial() { std::vector buffer(4096); while (rclcpp::ok() && running_ && ntrip_connected_) { ssize_t bytes_read = recv(ntrip_socket_, buffer.data(), buffer.size(), MSG_DONTWAIT); if (bytes_read > 0) { // 将NTRIP返回的数据写入串口 ssize_t bytes_written = write(serial_fd_, buffer.data(), bytes_read); if (bytes_written > 0) { RCLCPP_DEBUG(this->get_logger(), "写入串口NTRIP数据: %zd bytes", bytes_written); } else if (bytes_written < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { RCLCPP_WARN(this->get_logger(), "串口写入缓冲区满"); } else { RCLCPP_ERROR(this->get_logger(), "串口写入错误: %s", strerror(errno)); } } } else if (bytes_read == 0) { RCLCPP_ERROR(this->get_logger(), "NTRIP服务器断开连接"); ntrip_connected_ = false; break; } else { if (errno == EAGAIN || errno == EWOULDBLOCK) { usleep(10000); // 10ms轮询间隔 } else { RCLCPP_ERROR(this->get_logger(), "NTRIP读取错误: %s", strerror(errno)); ntrip_connected_ = false; break; } } } // 尝试重新连接 if (running_ && !ntrip_connected_) { RCLCPP_INFO(this->get_logger(), "尝试重新连接NTRIP服务器..."); usleep(5000000); // 5秒后重试 if (connect_ntrip()) { std::thread ntrip_thread(&SerialNtripNmea::read_ntrip_and_write_serial, this); ntrip_thread.detach(); } } } std::string base64_encode(const std::string& input) { static const std::string base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789+/"; std::string encoded; int i = 0; int j = 0; unsigned char char_array_3[3]; unsigned char char_array_4[4]; for (char c : input) { char_array_3[i++] = c; if (i == 3) { char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); char_array_4[3] = char_array_3[2] & 0x3f; for(i = 0; i < 4; i++) encoded += base64_chars[char_array_4[i]]; i = 0; } } if (i) { for(j = i; j < 3; j++) char_array_3[j] = '\0'; char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); char_array_4[3] = char_array_3[2] & 0x3f; for (j = 0; j < i + 1; j++) encoded += base64_chars[char_array_4[j]]; while(i++ < 3) encoded += '='; } return encoded; } // 生成带 RMC 航向修正的 NMEA 语句,仅影响 navsat/nmea_sentence_fix 话题 std::string make_fixed_nmea_sentence(const std::string& sentence) { // 只处理以 $ 开头的 RMC 语句,其余直接原样返回 if (sentence.size() < 6 || sentence[0] != '$') { return sentence; } std::string type = sentence.substr(1, 5); if (type != "GPRMC" && type != "GNRMC") { return sentence; } // 分离出不含校验和的部分($ 与 * 之间) std::size_t asterisk_pos = sentence.find('*'); std::string body_no_checksum; if (asterisk_pos == std::string::npos) { // 没有显式校验和时,直接去掉起始的 $ body_no_checksum = sentence.substr(1); } else { body_no_checksum = sentence.substr(1, asterisk_pos - 1); } // 使用逗号分隔字段 std::vector fields; std::size_t start = 0; while (true) { std::size_t comma = body_no_checksum.find(',', start); if (comma == std::string::npos) { fields.emplace_back(body_no_checksum.substr(start)); break; } else { fields.emplace_back(body_no_checksum.substr(start, comma - start)); start = comma + 1; } } // RMC 标准字段中,索引 7 为地速(节),索引 8 为真航向 if (fields.size() <= 8) { return sentence; } bool heading_ok = false; double heading_deg = 0.0; if (!fields[8].empty()) { try { heading_deg = std::stod(fields[8]); heading_ok = true; } catch (...) { heading_ok = false; } } if (!heading_ok) { // 航向字段无法解析时,不做任何改动 return sentence; } // 静止判定:从 /cruise/wheel/twist 获取,linear.x==0 且 angular.z==0 为静止 bool stationary = is_stationary_.load(); // 进入/退出静止模式(在写入历史前判定,使用前10条那条数据的航向) if (stationary) { std::cout << "进入静止模式" << std::endl; bool just_entered = !in_stationary_mode_; in_stationary_mode_ = true; if (just_entered && rmc_heading_history_.size() >= 10) { stationary_locked_heading_ = rmc_heading_history_[rmc_heading_history_.size() - 10]; stationary_heading_locked_ = true; } } else { std::cout << "退出静止模式" << std::endl; in_stationary_mode_ = false; stationary_heading_locked_ = false; } // 记录当前 RMC 航向到历史(用于下次静止时取前10条) rmc_heading_history_.push_back(heading_deg); if (rmc_heading_history_.size() > 50) { rmc_heading_history_.pop_front(); } double output_heading = heading_deg; if (in_stationary_mode_ && stationary_heading_locked_) { output_heading = stationary_locked_heading_; } // 更新航向字段 { std::ostringstream oss; oss << std::fixed << std::setprecision(2) << output_heading; fields[8] = oss.str(); } // 重新拼接不含校验和的部分 std::ostringstream body_oss; for (std::size_t i = 0; i < fields.size(); ++i) { if (i > 0) { body_oss << ','; } body_oss << fields[i]; } std::string new_body_no_checksum = body_oss.str(); // 重新计算校验和(对 $ 与 * 之间的字符做 XOR) unsigned char checksum = 0; for (char ch : new_body_no_checksum) { checksum ^= static_cast(ch); } std::ostringstream result; result << '$' << new_body_no_checksum << '*' << std::uppercase << std::hex << std::setw(2) << std::setfill('0') << static_cast(checksum); return result.str(); } }; int main(int argc, char** argv) { // 忽略SIGPIPE信号 signal(SIGPIPE, SIG_IGN); rclcpp::init(argc, argv); auto node = std::make_shared(); rclcpp::spin(node); rclcpp::shutdown(); return 0; }