Ideas:
Extend the radar message definition
Update the radar data conversion
Publish the new point cloud
Nuscenes
We’re going to start off with detecting Nuscenes velocities since we still need to collect ROS2 bags of our own data.
This task should literally be just reading in velocities detected from a point cloud without any estimation or prediction.
Converting from pc2 to radar PC
def convert_pointcloud2_to_nusc_radar (self, msg): # Assuming the radar data is in the correct format for the NuScenes RadarPointCloud class
radar_data = np.frombuffer(msg.data, dtype = np.float32).reshape( - 1 , 18 ) radar_points = RadarPointCloud(radar_data.T) return radar_points
I like this attempt better:
We can port the LiDar code which subscribes to Nuscenes data and subscribe to /RADAR_FRONT
then we can read radar velocities:
def get_radar_velocity (self):
sample_token = self .nusc.sample[ self .sample_index][ 'token' ] sample =
self .nusc.get( 'sample' , sample_token) radar_sensor = 'RADAR_FRONT'
radar_data_token = sample[ 'data' ][radar_sensor] radar_pointcloud =
RadarPointCloud.from_file( self .nusc.get_sample_data_path(radar_data_token))
velocities = np.sqrt(radar_pointcloud.points[ 8 , :] ** 2 +
radar_pointcloud.points[ 9 , :] ** 2 )
average_velocity = np.mean(velocities)
self .sample_index = ( self .sample_index + 1 ) % len ( self .nusc.sample)
# Loop through samples
self .get_logger().info( f 'Average velocity: { average_velocity :.2f } m/s' ) return average_velocity
We could also run something like this:
class RadarVelocityDetectionNode ( Node ):
def __init__ (self):
super (). __init__ ( 'radar_velocity_detection_node' )
self .subscription = self .create_subscription(
PointCloud2,
'/RADAR_TOP' ,
self .listener_callback,
10 )
self .publisher_ = self .create_publisher(Float32, 'velocity' , 10 )
self .get_logger().info( 'Radar Velocity Detection Node has been started' )
def listener_callback (self, msg):
points_array = ros2_numpy.point_cloud2.pointcloud2_to_array(msg)
velocities = self .detect_velocities(points_array)
avg_velocity = np.mean(velocities) if velocities.size > 0 else 0.0
self .publisher_.publish(Float32( data = avg_velocity))
self .get_logger().info( f 'Average velocity: { avg_velocity :.2f } m/s' )
def detect_velocities (self, points_array):
# Assuming velocity is in fields 'vx' and 'vy'
velocities = np.sqrt(points_array[ 'vx' ] ** 2 + points_array[ 'vy' ] ** 2 )
return velocities
I didn’t know polymath was open source lol, we’re gonna try this:
https://github.com/polymathrobotics/ars-408
We want to use cluster mode for the driver