Ideas:

  1. Extend the radar message definition
  2. Update the radar data conversion
  3. Publish the new point cloud

Nuscenes

We’re going to start off with detecting Nuscenes velocities since we still need to collect ROS2 bags of our own data.

This task should literally be just reading in velocities detected from a point cloud without any estimation or prediction.

Converting from pc2 to radar PC

def convert_pointcloud2_to_nusc_radar(self, msg): # Assuming the radar data is in the correct format for the NuScenes RadarPointCloud class 
radar_data = np.frombuffer(msg.data, dtype=np.float32).reshape(-1, 18) radar_points = RadarPointCloud(radar_data.T) return radar_points

I like this attempt better:
We can port the LiDar code which subscribes to Nuscenes data and subscribe to /RADAR_FRONT
then we can read radar velocities:

def get_radar_velocity(self): 
	sample_token = self.nusc.sample[self.sample_index]['token'] sample = 
	self.nusc.get('sample', sample_token) radar_sensor = 'RADAR_FRONT' 
	radar_data_token = sample['data'][radar_sensor] radar_pointcloud = 
	RadarPointCloud.from_file(self.nusc.get_sample_data_path(radar_data_token)) 
	velocities = np.sqrt(radar_pointcloud.points[8, :]**2 + 
	radar_pointcloud.points[9, :]**2) 
	average_velocity = np.mean(velocities) 
	self.sample_index = (self.sample_index + 1) % len(self.nusc.sample) 
	# Loop through samples 
	self.get_logger().info(f'Average velocity: {average_velocity:.2f} m/s') return average_velocity

We could also run something like this:

class RadarVelocityDetectionNode(Node): 
	def __init__(self): 
		super().__init__('radar_velocity_detection_node') 
		self.subscription = self.create_subscription( 
		PointCloud2, 
		'/RADAR_TOP', 
		self.listener_callback,
		 10)
		self.publisher_ = self.create_publisher(Float32, 'velocity', 10) 
		self.get_logger().info('Radar Velocity Detection Node has been started') 
	def listener_callback(self, msg): 
	points_array = ros2_numpy.point_cloud2.pointcloud2_to_array(msg) 
	velocities = self.detect_velocities(points_array) 
	avg_velocity = np.mean(velocities) if velocities.size > 0 else 0.0 
	self.publisher_.publish(Float32(data=avg_velocity)) 
	self.get_logger().info(f'Average velocity: {avg_velocity:.2f} m/s') 
	
	def detect_velocities(self, points_array): 
	# Assuming velocity is in fields 'vx' and 'vy' 
		velocities = np.sqrt(points_array['vx']**2 + points_array['vy']**2) 
		return velocities

I didn’t know polymath was open source lol, we’re gonna try this: https://github.com/polymathrobotics/ars-408

We want to use cluster mode for the driver