Skip to content

ratiopath.parsers.GeoJSONParser

Parser for GeoJSON format annotation files.

GeoJSON is a format for encoding geographic data structures using JSON. This parser supports both polygon and point geometries.

Extended capabilities: - Relational metadata integration: Maps properties from geometry-less definition features to spatial annotation features via a shared join key (solve_relations).

Expected relational schema for solve_relations: FeatureCollection ├── Feature (Definition) │ ├── geometry: null │ └── properties │ ├── presetID: "a376..." <──────┐ (join_key) │ └── meta: { "category": { "name": "Category", "value": "Healthy Tissue" } } └── Feature (Annotation) │ ├── geometry: { "type": "Polygon" } │ └── properties │ └── presetID: "a376..." <──────┘

Source code in ratiopath/parsers/geojson_parser.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class GeoJSONParser:
    """Parser for GeoJSON format annotation files.

    GeoJSON is a format for encoding geographic data structures using JSON.
    This parser supports both polygon and point geometries.

    Extended capabilities:
    - Relational metadata integration: Maps properties from geometry-less definition
      features to spatial annotation features via a shared join key (solve_relations).

    Expected relational schema for solve_relations:
    FeatureCollection
    ├── Feature (Definition)
    │   ├── geometry: null
    │   └── properties
    │       ├── presetID: "a376..."  <──────┐ (join_key)
    │       └── meta: { "category": { "name": "Category", "value": "Healthy Tissue" } }
    └── Feature (Annotation)                │
        ├── geometry: { "type": "Polygon" } │
        └── properties                      │
            └── presetID: "a376..."  <──────┘
    """

    def __init__(
        self, file_path: Path | str | TextIO, join_key: str | None = "presetID"
    ) -> None:
        self.gdf = gpd.read_file(file_path)

        if not self.gdf.empty:
            has_geometry = ~(self.gdf.geometry.is_empty | self.gdf.geometry.isna())
            annotations = self.gdf[has_geometry].explode(index_parts=True)
            definitions = self.gdf[~has_geometry]

            if join_key in self.gdf.columns and not definitions.empty:
                self.gdf = self._solve_relations(annotations, definitions, join_key)  # type: ignore[arg-type]
            else:
                self.gdf = annotations

    @staticmethod
    def _solve_relations(
        annotations: GeoDataFrame, definitions: GeoDataFrame, join_key: str
    ) -> GeoDataFrame:
        """Merge definition properties into annotations using the join key.

        Columns that exist only in the definitions are folded into the result.
        Columns that exist in both get a ``_def`` suffix for the definition side.
        """
        # Drop all-null columns from annotations so they don't shadow definition values
        ann_null_cols = [
            c
            for c in annotations.columns
            if c != "geometry" and c != join_key and annotations[c].isna().all()
        ]
        annotations_clean = annotations.drop(columns=ann_null_cols)

        merged = annotations_clean.merge(
            definitions.drop(columns=["geometry"]),
            on=join_key,
            how="left",
            suffixes=("", "_def"),
        )
        return merged

    def get_filtered_geodataframe(
        self, separator: str = "_", **kwargs: str
    ) -> GeoDataFrame:
        """Filter the GeoDataFrame based on property values.

        Args:
            separator: The string used to separate keys in the filtering.
            **kwargs: Keyword arguments for filtering. Keys are column names
                (e.g., 'classification.name') and values are regex patterns to match
                against.

        Returns:
            A filtered GeoDataFrame.
        """
        filtered_gdf = self.gdf
        for key, pattern in kwargs.items():
            subkeys = key.split(separator)
            if not subkeys or subkeys[0] not in filtered_gdf.columns:
                # If the first part of the key doesn't exist, return an empty frame
                return self.gdf.iloc[0:0]

            series = filtered_gdf[subkeys[0]]
            for subkey in subkeys[1:]:
                series = series.apply(safe_to_dict)
                mask = series.apply(
                    lambda x, sk=subkey: isinstance(x, dict) and sk in x
                )
                series = series[mask].apply(lambda x, sk=subkey: x[sk])
                filtered_gdf = filtered_gdf[mask]

            series = series.astype(str)
            mask = series.str.match(pattern, na=False)
            filtered_gdf = filtered_gdf[mask]

        return filtered_gdf

    def get_polygons(self, **kwargs: str) -> Iterable[Polygon]:
        """Get polygons from the GeoDataFrame.

        Args:
            **kwargs: Keyword arguments for filtering properties.

        Yields:
            Shapely Polygon objects.
        """
        filtered_gdf = self.get_filtered_geodataframe(**kwargs)
        for geom in filtered_gdf.geometry:
            if isinstance(geom, Polygon):
                yield geom

    def get_points(self, **kwargs: str) -> Iterable[Point]:
        """Get points from the GeoDataFrame.

        Args:
            **kwargs: Keyword arguments for filtering properties.

        Yields:
            Shapely Point objects.
        """
        filtered_gdf = self.get_filtered_geodataframe(**kwargs)
        for geom in filtered_gdf.geometry:
            if isinstance(geom, Point):
                yield geom

gdf = gpd.read_file(file_path) instance-attribute

__init__(file_path, join_key='presetID')

Source code in ratiopath/parsers/geojson_parser.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self, file_path: Path | str | TextIO, join_key: str | None = "presetID"
) -> None:
    self.gdf = gpd.read_file(file_path)

    if not self.gdf.empty:
        has_geometry = ~(self.gdf.geometry.is_empty | self.gdf.geometry.isna())
        annotations = self.gdf[has_geometry].explode(index_parts=True)
        definitions = self.gdf[~has_geometry]

        if join_key in self.gdf.columns and not definitions.empty:
            self.gdf = self._solve_relations(annotations, definitions, join_key)  # type: ignore[arg-type]
        else:
            self.gdf = annotations

get_filtered_geodataframe(separator='_', **kwargs)

Filter the GeoDataFrame based on property values.

Parameters:

Name Type Description Default
separator str

The string used to separate keys in the filtering.

'_'
**kwargs str

Keyword arguments for filtering. Keys are column names (e.g., 'classification.name') and values are regex patterns to match against.

{}

Returns:

Type Description
GeoDataFrame

A filtered GeoDataFrame.

Source code in ratiopath/parsers/geojson_parser.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def get_filtered_geodataframe(
    self, separator: str = "_", **kwargs: str
) -> GeoDataFrame:
    """Filter the GeoDataFrame based on property values.

    Args:
        separator: The string used to separate keys in the filtering.
        **kwargs: Keyword arguments for filtering. Keys are column names
            (e.g., 'classification.name') and values are regex patterns to match
            against.

    Returns:
        A filtered GeoDataFrame.
    """
    filtered_gdf = self.gdf
    for key, pattern in kwargs.items():
        subkeys = key.split(separator)
        if not subkeys or subkeys[0] not in filtered_gdf.columns:
            # If the first part of the key doesn't exist, return an empty frame
            return self.gdf.iloc[0:0]

        series = filtered_gdf[subkeys[0]]
        for subkey in subkeys[1:]:
            series = series.apply(safe_to_dict)
            mask = series.apply(
                lambda x, sk=subkey: isinstance(x, dict) and sk in x
            )
            series = series[mask].apply(lambda x, sk=subkey: x[sk])
            filtered_gdf = filtered_gdf[mask]

        series = series.astype(str)
        mask = series.str.match(pattern, na=False)
        filtered_gdf = filtered_gdf[mask]

    return filtered_gdf

get_points(**kwargs)

Get points from the GeoDataFrame.

Parameters:

Name Type Description Default
**kwargs str

Keyword arguments for filtering properties.

{}

Yields:

Type Description
Iterable[Point]

Shapely Point objects.

Source code in ratiopath/parsers/geojson_parser.py
124
125
126
127
128
129
130
131
132
133
134
135
136
def get_points(self, **kwargs: str) -> Iterable[Point]:
    """Get points from the GeoDataFrame.

    Args:
        **kwargs: Keyword arguments for filtering properties.

    Yields:
        Shapely Point objects.
    """
    filtered_gdf = self.get_filtered_geodataframe(**kwargs)
    for geom in filtered_gdf.geometry:
        if isinstance(geom, Point):
            yield geom

get_polygons(**kwargs)

Get polygons from the GeoDataFrame.

Parameters:

Name Type Description Default
**kwargs str

Keyword arguments for filtering properties.

{}

Yields:

Type Description
Iterable[Polygon]

Shapely Polygon objects.

Source code in ratiopath/parsers/geojson_parser.py
110
111
112
113
114
115
116
117
118
119
120
121
122
def get_polygons(self, **kwargs: str) -> Iterable[Polygon]:
    """Get polygons from the GeoDataFrame.

    Args:
        **kwargs: Keyword arguments for filtering properties.

    Yields:
        Shapely Polygon objects.
    """
    filtered_gdf = self.get_filtered_geodataframe(**kwargs)
    for geom in filtered_gdf.geometry:
        if isinstance(geom, Polygon):
            yield geom