Skip to content

ratiopath.ray.aggregate.TensorMean

Bases: AggregateFnV2[dict, ndarray | float]

Calculates the mean (average) of a column containing Tensors.

This aggregator treats the data column as a high-dimensional array where axis 0 represents the batch dimension. To satisfy the requirements of a reduction and prevent memory growth proportional to the number of rows, axis 0 must be included in the aggregation.

Parameters:

Name Type Description Default
on str

The name of the column containing tensors or numbers.

required
axis int | tuple[int, ...] | None

The axis or axes along which the reduction is computed. - None: Global reduction. Collapses all dimensions (including batch) to a single scalar. - int: Aggregates over both the batch (axis 0) AND the specified tensor dimension. For example, axis=1 collapses the batch and the first dimension of the tensors. - tuple: A sequence of axes that must explicitly include 0.

None
ignore_nulls bool

Whether to ignore null values. Defaults to True.

True
alias_name str | None

Optional name for the resulting column. Defaults to "mean()".

None

Raises:

Type Description
ValueError

If axis is provided as a tuple but does not include 0.

Note

This aggregator is designed for "reduction" operations. If you wish to calculate statistics per-row without collapsing the batch dimension, use .map() instead.

Example

import ray import numpy as np from ratiopath.ray.aggregate import TensorMean

Dataset with 2x2 matrices: total shape (Batch=2, Dim1=2, Dim2=2)

ds = ray.data.from_items( ... [ ... {"m": np.array([[1, 1], [1, 1]])}, ... {"m": np.array([[3, 3], [3, 3]])}, ... ] ... )

1. Global Mean (axis=None) -> Result: 2.0

ds.aggregate(TensorMean(on="m", axis=None))

2. Batch Mean (axis=0) -> Result: np.array([[2, 2], [2, 2]])

ds.aggregate(TensorMean(on="m", axis=0))

3. Mean across Batch and Rows (axis=(0, 1)) -> Result: np.array([2, 2])

ds.aggregate(TensorMean(on="m", axis=(0, 1)))

Source code in ratiopath/ray/aggregate/tensor_mean.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class TensorMean(AggregateFnV2[dict, np.ndarray | float]):
    """Calculates the mean (average) of a column containing Tensors.

    This aggregator treats the data column as a high-dimensional array where
    **axis 0 represents the batch dimension**. To satisfy the requirements
    of a reduction and prevent memory growth proportional to the number of rows,
    axis 0 must be included in the aggregation.


    Args:
        on: The name of the column containing tensors or numbers.
        axis: The axis or axes along which the reduction is computed.
            - `None`: Global reduction. Collapses all dimensions (including batch)
              to a single scalar.
            - `int`: Aggregates over both the batch (axis 0) AND the specified
              tensor dimension. For example, `axis=1` collapses the batch and
              the first dimension of the tensors.
            - `tuple`: A sequence of axes that **must** explicitly include `0`.
        ignore_nulls: Whether to ignore null values. Defaults to True.
        alias_name: Optional name for the resulting column. Defaults to "mean(<on>)".

    Raises:
        ValueError: If `axis` is provided as a tuple but does not include `0`.

    Note:
        This aggregator is designed for "reduction" operations. If you wish to
        calculate statistics per-row without collapsing the batch dimension,
        use `.map()` instead.

    Example:
        >>> import ray
        >>> import numpy as np
        >>> from ratiopath.ray.aggregate import TensorMean
        >>> # Dataset with 2x2 matrices: total shape (Batch=2, Dim1=2, Dim2=2)
        >>> ds = ray.data.from_items(
        ...     [
        ...         {"m": np.array([[1, 1], [1, 1]])},
        ...         {"m": np.array([[3, 3], [3, 3]])},
        ...     ]
        ... )
        >>> # 1. Global Mean (axis=None) -> Result: 2.0
        >>> ds.aggregate(TensorMean(on="m", axis=None))
        >>> # 2. Batch Mean (axis=0) -> Result: np.array([[2, 2], [2, 2]])
        >>> ds.aggregate(TensorMean(on="m", axis=0))
        >>> # 3. Mean across Batch and Rows (axis=(0, 1)) -> Result: np.array([2, 2])
        >>> ds.aggregate(TensorMean(on="m", axis=(0, 1)))
    """

    _aggregate_axis: tuple[int, ...] | None = None

    def __init__(
        self,
        on: str,
        axis: int | tuple[int, ...] | None = None,
        ignore_nulls: bool = True,
        alias_name: str | None = None,
    ):
        super().__init__(
            name=alias_name if alias_name else f"mean({on})",
            on=on,
            ignore_nulls=ignore_nulls,
            # Initialize with identity values for summation
            zero_factory=self.zero_factory,
        )

        if axis is not None:
            axes = {0, axis} if isinstance(axis, int) else set(axis)

            if 0 not in axes:
                raise ValueError(
                    f"Invalid axis configuration: {axis}. Axis 0 (the batch dimension) "
                    "must be included to perform a reduction. To process rows "
                    "independently without collapsing the batch, use .map() instead."
                )

            self._aggregate_axis = tuple(axes)

    @staticmethod
    def zero_factory() -> dict:
        return {"sum": 0, "shape": None, "count": 0}

    def aggregate_block(self, block: Block) -> dict:
        block_acc = BlockAccessor.for_block(block)

        # Get exact counts before any NumPy conversion obscures the nulls
        valid_count = cast(
            "int",
            block_acc.count(self._target_col_name, ignore_nulls=True),  # type: ignore [arg-type]
        )
        total_count = cast(
            "int",
            block_acc.count(self._target_col_name, ignore_nulls=False),  # type: ignore [arg-type]
        )

        # Catch nulls immediately if strict mode is on
        if valid_count < total_count and not self._ignore_nulls:
            raise ValueError(
                f"Column '{self._target_col_name}' contains null values, but "
                "ignore_nulls is False."
            )

        if valid_count == 0:
            return self.zero_factory()

        col_np = cast("np.ndarray", block_acc.to_numpy(self._target_col_name))

        # Filter out nulls if necessary
        if valid_count < total_count:
            valid_tensors = [x for x in col_np if x is not None]
            col_np = np.stack(valid_tensors)

        # Perform the partial sum and calculate how many elements contributed
        block_sum = np.sum(col_np, axis=self._aggregate_axis)
        block_count = col_np.size // block_sum.size

        return {
            "sum": block_sum.flatten(),
            "shape": block_sum.shape,
            "count": block_count,
        }

    def combine(self, current_accumulator: dict, new: dict) -> dict:
        return {
            "sum": np.asarray(current_accumulator["sum"]) + np.asarray(new["sum"]),
            "shape": current_accumulator["shape"] or new["shape"],
            "count": current_accumulator["count"] + new["count"],
        }

    def finalize(self, accumulator: dict) -> np.ndarray | float:  # type: ignore [override]
        count = accumulator["count"]

        if count == 0:
            return np.nan

        # Reshape the flattened sum back to original aggregated dimensions
        return np.asarray(accumulator["sum"]).reshape(accumulator["shape"]) / count

__init__(on, axis=None, ignore_nulls=True, alias_name=None)

Source code in ratiopath/ray/aggregate/tensor_mean.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    on: str,
    axis: int | tuple[int, ...] | None = None,
    ignore_nulls: bool = True,
    alias_name: str | None = None,
):
    super().__init__(
        name=alias_name if alias_name else f"mean({on})",
        on=on,
        ignore_nulls=ignore_nulls,
        # Initialize with identity values for summation
        zero_factory=self.zero_factory,
    )

    if axis is not None:
        axes = {0, axis} if isinstance(axis, int) else set(axis)

        if 0 not in axes:
            raise ValueError(
                f"Invalid axis configuration: {axis}. Axis 0 (the batch dimension) "
                "must be included to perform a reduction. To process rows "
                "independently without collapsing the batch, use .map() instead."
            )

        self._aggregate_axis = tuple(axes)

aggregate_block(block)

Source code in ratiopath/ray/aggregate/tensor_mean.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def aggregate_block(self, block: Block) -> dict:
    block_acc = BlockAccessor.for_block(block)

    # Get exact counts before any NumPy conversion obscures the nulls
    valid_count = cast(
        "int",
        block_acc.count(self._target_col_name, ignore_nulls=True),  # type: ignore [arg-type]
    )
    total_count = cast(
        "int",
        block_acc.count(self._target_col_name, ignore_nulls=False),  # type: ignore [arg-type]
    )

    # Catch nulls immediately if strict mode is on
    if valid_count < total_count and not self._ignore_nulls:
        raise ValueError(
            f"Column '{self._target_col_name}' contains null values, but "
            "ignore_nulls is False."
        )

    if valid_count == 0:
        return self.zero_factory()

    col_np = cast("np.ndarray", block_acc.to_numpy(self._target_col_name))

    # Filter out nulls if necessary
    if valid_count < total_count:
        valid_tensors = [x for x in col_np if x is not None]
        col_np = np.stack(valid_tensors)

    # Perform the partial sum and calculate how many elements contributed
    block_sum = np.sum(col_np, axis=self._aggregate_axis)
    block_count = col_np.size // block_sum.size

    return {
        "sum": block_sum.flatten(),
        "shape": block_sum.shape,
        "count": block_count,
    }

combine(current_accumulator, new)

Source code in ratiopath/ray/aggregate/tensor_mean.py
130
131
132
133
134
135
def combine(self, current_accumulator: dict, new: dict) -> dict:
    return {
        "sum": np.asarray(current_accumulator["sum"]) + np.asarray(new["sum"]),
        "shape": current_accumulator["shape"] or new["shape"],
        "count": current_accumulator["count"] + new["count"],
    }

finalize(accumulator)

Source code in ratiopath/ray/aggregate/tensor_mean.py
137
138
139
140
141
142
143
144
def finalize(self, accumulator: dict) -> np.ndarray | float:  # type: ignore [override]
    count = accumulator["count"]

    if count == 0:
        return np.nan

    # Reshape the flattened sum back to original aggregated dimensions
    return np.asarray(accumulator["sum"]).reshape(accumulator["shape"]) / count

zero_factory() staticmethod

Source code in ratiopath/ray/aggregate/tensor_mean.py
86
87
88
@staticmethod
def zero_factory() -> dict:
    return {"sum": 0, "shape": None, "count": 0}