MapSegment Capabilities Demo¶
This notebook demonstrates the comprehensive capabilities of the omega-prime Mapsegmentation class for processing autonomous vehicle data. The capabilities include:
- Map Segmentation: Analyze and segment road maps into meaningful components
- Intersection Detection: Automatically detect and classify intersections
- Trajectory Analysis: Process vehicle trajectories and map them to road segments
The functionality is currently only available for OSI Centerline maps.
Setup and Imports¶
Import necessary libraries and set up the environment.
Define paths to data files and output folders.
import numpy as np
import omega_prime
from pathlib import Path
output_path = Path.cwd() / "tutorial_map_output"
output_path.mkdir(parents=True, exist_ok=True)
file = Path.cwd() / "../../example_files" / "osi_map_example.mcap"
Read Data Files¶
Load map and trajectory data from specified files.
When using the omega_prime.Recording.from_file method, you can enable advanced options such as lane splitting and polygon computation to enhance the analysis and visualization of the data.
Lane Splitting: This feature splits long lane centerlines into shorter segments of a specified maximum length (e.g., 10 meters). It improves processing efficiency, increases granularity for detailed map analysis, and updates lane connections accordingly while removing distant connections.
# Load the MCAP file with advanced configuration
print(f"Loading MCAP file: {file.name}")
print(f"File exists: {file.exists()}")
if file.exists():
# Create recording object with advanced options
r = omega_prime.Recording.from_file(
filepath=file,
split_lanes=False, # Split lanes
split_lanes_length=10, # Split lanes to a max length of 10 meters
)
print(f"Recording loaded successfully!")
print(f"Number of moving objects: {len(r.moving_objects)}")
print(f"Host vehicle index: {r.host_vehicle_idx}")
print(f"Map has {len(r.map.lanes)} lanes")
print(
f"Recording duration: {r.moving_objects[r.host_vehicle_idx]._df['total_nanos'].max() - r.moving_objects[r.host_vehicle_idx]._df['total_nanos'].min()} nanoseconds"
)
else:
print("⚠️ MCAP file not found. Please check the path.")
if file.exists():
# Create map segments - this is the core functionality we're demonstrating
print("🗺️ Creating map segments...")
r.create_mapsegments()
# Access the mapsegment object
mapsegment = r.mapsegment
# Create locator for coordinate transformations
locator = omega_prime.Locator.from_map(r.map)
print(f"✅ Map segmentation completed!")
print(f"📍 Locator initialized!")
# Display basic information about the map segmentation
print(f"\n📊 Map Segmentation Summary:")
print(f" • Total intersections detected: {len(mapsegment.intersections)}")
print(f" • Total isolated connections: {len(mapsegment.isolated_connections)}")
print(f" • Total segments: {len(mapsegment.segments)}")
print(f" • Lane buffer size: {mapsegment.lane_buffer}")
print(f" • Intersection overlap buffer: {mapsegment.intersection_overlap_buffer}")
# Show intersection details
for i, intersection in enumerate(mapsegment.intersections):
print(
f" 🚦 Intersection {intersection.idx}: {len(intersection.lanes)} lanes, type: {intersection.type} with {len(intersection.trafficlights)} traffic lights"
)
# Show connection details
for i, connection in enumerate(mapsegment.isolated_connections):
print(
f" 🛤️ Connection {connection.idx}: {len(connection.lanes)} lanes, type: {connection.type} with {len(connection.trafficlights)} traffic lights"
)
else:
print("⚠️ Cannot proceed without MCAP file")
Map Segmentation Analysis¶
The identified segments can be used to further analyze intersections and connections within the map.
if file.exists():
print("🔍 Exploring Mapsegmentation Capabilities:")
# Check if all lanes are assigned to segments
all_lanes_assigned = mapsegment.check_if_all_lanes_are_on_segment()
print(f" ✅ All lanes assigned to segments: {all_lanes_assigned}")
# Analyze lane relationships
print(f"\n🛣️ Lane Analysis:")
print(f" • Total lanes in map: {len(mapsegment.lanes)}")
print(f" • Lane dictionary size: {len(mapsegment.lane_dict)}")
print(f" • Lanes with successors: {len(mapsegment.lane_successors_dict)}")
print(f" • Lanes with predecessors: {len(mapsegment.lane_predecessors_dict)}")
# Display sample lane information
sample_lane_ids = list(mapsegment.lane_dict.keys())[:2]
for lane_id in sample_lane_ids:
lane = mapsegment.lane_dict[lane_id]
successors = mapsegment.lane_successors_dict.get(lane_id, [])
predecessors = mapsegment.lane_predecessors_dict.get(lane_id, [])
print(f" 🛤️ Lane {lane_id}:")
print(f" - Successors: {len(successors)} lanes")
print(f" - Predecessors: {len(predecessors)} lanes")
# Check if lane is assigned to a segment
if lane_id in mapsegment.lane_segment_dict:
segment_info = mapsegment.lane_segment_dict[lane_id]
if segment_info.segment is not None:
print(f" - Assigned to segment: {segment_info.segment_idx} (type: {segment_info.segment.type})")
else:
print(f" - Not assigned to any segment")
# Analyze intersections in detail
print(f"\n🚦 Intersection Details:")
if intersection := mapsegment.intersections[0]:
center = intersection.get_center_point()
print(f" Intersection {intersection.idx}:")
print(f" - Center point: ({center[0]:.2f}, {center[1]:.2f})")
print(f" - Number of lanes: {len(intersection.lanes)}")
print(f" - Lane IDs: {intersection.lane_ids[:5]}...") # Show first 5 lane IDs
print(f" - Polygon area: {intersection.polygon.area:.2f} sq units")
# Check for traffic lights
if intersection.trafficlights:
print(f" - Has traffic light: Yes")
else:
print(f" - Has traffic light: No")
else:
print("⚠️ Cannot proceed without MCAP file")
Using Trajectories with Map Segmentation¶
If trajectory data is available, it can be integrated with the map segmentation for enhanced analysis. The trajectories are mapped to the identified road segments, allowing for detailed examination of vehicle movements in relation to the road network.
if file.exists():
print("🚗 Extracting Host Vehicle Trajectory Data:")
# Get host vehicle ID
host_id = r.host_vehicle_idx
print(f" Host vehicle ID: {host_id}")
# Extract trajectory data (converting Polars Series to NumPy arrays)
host_vehicle = r.moving_objects[host_id]
x_values = host_vehicle.x.to_numpy()
y_values = host_vehicle.y.to_numpy()
frame_values = host_vehicle._df["frame"].to_numpy()
positions = np.column_stack((frame_values, x_values, y_values))
segments = mapsegment.trajectory_segment_detection(positions)
# Analyze segment detection results
segment_types = {}
valid_segments = 0
unassigned_segments = 0
for i, segment in enumerate(segments):
if len(segment) > 1 and segment[1] is not None:
valid_segments += 1
seg_type = segment[1].type if hasattr(segment[1], "type") else "UNKNOWN"
segment_types[seg_type] = segment_types.get(seg_type, 0) + 1
else:
unassigned_segments += 1
print(f"\n📈 Segment Detection Summary:")
print(f" • Valid segments: {valid_segments}")
print(f" • Unassigned segments: {unassigned_segments}")
print(f" • Segment types detected:")
for seg_type, count in segment_types.items():
print(f" - {seg_type}: {count} segments")
# Show detailed segment information
print(f"\n🔍 Detailed Segment Information (first 10):")
for i, segment in enumerate(segments[:10]):
if len(segment) > 1 and segment[1] is not None:
seg_obj = segment[1]
seg_type = seg_obj.type if hasattr(seg_obj, "type") else "UNKNOWN"
seg_idx = seg_obj.idx if hasattr(seg_obj, "idx") else "N/A"
print(f" Segment {i}: ID={seg_idx}, Type={seg_type}")
else:
print(f" Segment {i}: Unassigned trajectory segment")
else:
print("⚠️ Cannot proceed without MCAP file")
7. Visualize Map Segments and Trajectories¶
Generate comprehensive visualizations using matplotlib plots and interactive Altair charts.
The Class MapSegment provides methods to visualize the map segments along with vehicle trajectories. The visualizations can include lane IDs, intersection polygons, and connection polygons for detailed analysis.
if file.exists():
mapsegment.plot(
output_plot=None,
trajectory=positions,
plot_lane_ids=False,
plot_intersection_polygons=True,
plot_connection_polygons=False,
)