Parser for GeoJSON format annotation files.
GeoJSON is a format for encoding geographic data structures using JSON.
This parser supports both polygon and point geometries.
Extended capabilities:
- Relational metadata integration: Maps properties from geometry-less definition
features to spatial annotation features via a shared join key (solve_relations).
Expected relational schema for solve_relations:
FeatureCollection
├── Feature (Definition)
│ ├── geometry: null
│ └── properties
│ ├── presetID: "a376..." <──────┐ (join_key)
│ └── meta: { "category": { "name": "Category", "value": "Healthy Tissue" } }
└── Feature (Annotation) │
├── geometry: { "type": "Polygon" } │
└── properties │
└── presetID: "a376..." <──────┘
Source code in ratiopath/parsers/geojson_parser.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 | class GeoJSONParser:
"""Parser for GeoJSON format annotation files.
GeoJSON is a format for encoding geographic data structures using JSON.
This parser supports both polygon and point geometries.
Extended capabilities:
- Relational metadata integration: Maps properties from geometry-less definition
features to spatial annotation features via a shared join key (solve_relations).
Expected relational schema for solve_relations:
FeatureCollection
├── Feature (Definition)
│ ├── geometry: null
│ └── properties
│ ├── presetID: "a376..." <──────┐ (join_key)
│ └── meta: { "category": { "name": "Category", "value": "Healthy Tissue" } }
└── Feature (Annotation) │
├── geometry: { "type": "Polygon" } │
└── properties │
└── presetID: "a376..." <──────┘
"""
def __init__(
self, file_path: Path | str | TextIO, join_key: str | None = "presetID"
) -> None:
self.gdf = gpd.read_file(file_path)
if not self.gdf.empty:
has_geometry = ~(self.gdf.geometry.is_empty | self.gdf.geometry.isna())
annotations = self.gdf[has_geometry].explode(index_parts=True)
definitions = self.gdf[~has_geometry]
if join_key in self.gdf.columns and not definitions.empty:
self.gdf = self._solve_relations(annotations, definitions, join_key) # type: ignore[arg-type]
else:
self.gdf = annotations
@staticmethod
def _solve_relations(
annotations: GeoDataFrame, definitions: GeoDataFrame, join_key: str
) -> GeoDataFrame:
"""Merge definition properties into annotations using the join key.
Columns that exist only in the definitions are folded into the result.
Columns that exist in both get a ``_def`` suffix for the definition side.
"""
# Drop all-null columns from annotations so they don't shadow definition values
ann_null_cols = [
c
for c in annotations.columns
if c != "geometry" and c != join_key and annotations[c].isna().all()
]
annotations_clean = annotations.drop(columns=ann_null_cols)
merged = annotations_clean.merge(
definitions.drop(columns=["geometry"]),
on=join_key,
how="left",
suffixes=("", "_def"),
)
return merged
def get_filtered_geodataframe(
self, separator: str = "_", **kwargs: str
) -> GeoDataFrame:
"""Filter the GeoDataFrame based on property values.
Args:
separator: The string used to separate keys in the filtering.
**kwargs: Keyword arguments for filtering. Keys are column names
(e.g., 'classification.name') and values are regex patterns to match
against.
Returns:
A filtered GeoDataFrame.
"""
filtered_gdf = self.gdf
for key, pattern in kwargs.items():
subkeys = key.split(separator)
if not subkeys or subkeys[0] not in filtered_gdf.columns:
# If the first part of the key doesn't exist, return an empty frame
return self.gdf.iloc[0:0]
series = filtered_gdf[subkeys[0]]
for subkey in subkeys[1:]:
series = series.apply(safe_to_dict)
mask = series.apply(
lambda x, sk=subkey: isinstance(x, dict) and sk in x
)
series = series[mask].apply(lambda x, sk=subkey: x[sk])
filtered_gdf = filtered_gdf[mask]
series = series.astype(str)
mask = series.str.match(pattern, na=False)
filtered_gdf = filtered_gdf[mask]
return filtered_gdf
def get_polygons(self, **kwargs: str) -> Iterable[Polygon]:
"""Get polygons from the GeoDataFrame.
Args:
**kwargs: Keyword arguments for filtering properties.
Yields:
Shapely Polygon objects.
"""
filtered_gdf = self.get_filtered_geodataframe(**kwargs)
for geom in filtered_gdf.geometry:
if isinstance(geom, Polygon):
yield geom
def get_points(self, **kwargs: str) -> Iterable[Point]:
"""Get points from the GeoDataFrame.
Args:
**kwargs: Keyword arguments for filtering properties.
Yields:
Shapely Point objects.
"""
filtered_gdf = self.get_filtered_geodataframe(**kwargs)
for geom in filtered_gdf.geometry:
if isinstance(geom, Point):
yield geom
|
gdf = gpd.read_file(file_path)
instance-attribute
__init__(file_path, join_key='presetID')
Source code in ratiopath/parsers/geojson_parser.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47 | def __init__(
self, file_path: Path | str | TextIO, join_key: str | None = "presetID"
) -> None:
self.gdf = gpd.read_file(file_path)
if not self.gdf.empty:
has_geometry = ~(self.gdf.geometry.is_empty | self.gdf.geometry.isna())
annotations = self.gdf[has_geometry].explode(index_parts=True)
definitions = self.gdf[~has_geometry]
if join_key in self.gdf.columns and not definitions.empty:
self.gdf = self._solve_relations(annotations, definitions, join_key) # type: ignore[arg-type]
else:
self.gdf = annotations
|
get_filtered_geodataframe(separator='_', **kwargs)
Filter the GeoDataFrame based on property values.
Parameters:
| Name |
Type |
Description |
Default |
separator
|
str
|
The string used to separate keys in the filtering.
|
'_'
|
**kwargs
|
str
|
Keyword arguments for filtering. Keys are column names
(e.g., 'classification.name') and values are regex patterns to match
against.
|
{}
|
Returns:
| Type |
Description |
GeoDataFrame
|
|
Source code in ratiopath/parsers/geojson_parser.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 | def get_filtered_geodataframe(
self, separator: str = "_", **kwargs: str
) -> GeoDataFrame:
"""Filter the GeoDataFrame based on property values.
Args:
separator: The string used to separate keys in the filtering.
**kwargs: Keyword arguments for filtering. Keys are column names
(e.g., 'classification.name') and values are regex patterns to match
against.
Returns:
A filtered GeoDataFrame.
"""
filtered_gdf = self.gdf
for key, pattern in kwargs.items():
subkeys = key.split(separator)
if not subkeys or subkeys[0] not in filtered_gdf.columns:
# If the first part of the key doesn't exist, return an empty frame
return self.gdf.iloc[0:0]
series = filtered_gdf[subkeys[0]]
for subkey in subkeys[1:]:
series = series.apply(safe_to_dict)
mask = series.apply(
lambda x, sk=subkey: isinstance(x, dict) and sk in x
)
series = series[mask].apply(lambda x, sk=subkey: x[sk])
filtered_gdf = filtered_gdf[mask]
series = series.astype(str)
mask = series.str.match(pattern, na=False)
filtered_gdf = filtered_gdf[mask]
return filtered_gdf
|
get_points(**kwargs)
Get points from the GeoDataFrame.
Parameters:
| Name |
Type |
Description |
Default |
**kwargs
|
str
|
Keyword arguments for filtering properties.
|
{}
|
Yields:
| Type |
Description |
Iterable[Point]
|
|
Source code in ratiopath/parsers/geojson_parser.py
124
125
126
127
128
129
130
131
132
133
134
135
136 | def get_points(self, **kwargs: str) -> Iterable[Point]:
"""Get points from the GeoDataFrame.
Args:
**kwargs: Keyword arguments for filtering properties.
Yields:
Shapely Point objects.
"""
filtered_gdf = self.get_filtered_geodataframe(**kwargs)
for geom in filtered_gdf.geometry:
if isinstance(geom, Point):
yield geom
|
get_polygons(**kwargs)
Get polygons from the GeoDataFrame.
Parameters:
| Name |
Type |
Description |
Default |
**kwargs
|
str
|
Keyword arguments for filtering properties.
|
{}
|
Yields:
| Type |
Description |
Iterable[Polygon]
|
|
Source code in ratiopath/parsers/geojson_parser.py
110
111
112
113
114
115
116
117
118
119
120
121
122 | def get_polygons(self, **kwargs: str) -> Iterable[Polygon]:
"""Get polygons from the GeoDataFrame.
Args:
**kwargs: Keyword arguments for filtering properties.
Yields:
Shapely Polygon objects.
"""
filtered_gdf = self.get_filtered_geodataframe(**kwargs)
for geom in filtered_gdf.geometry:
if isinstance(geom, Polygon):
yield geom
|