Skip to content

Models

rationai.resources.models.Models

Bases: APIResource

Source code in rationai/resources/models.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class Models(APIResource):
    def classify_image(
        self,
        model: str,
        image: Image | NDArray[np.uint8],
        timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    ) -> float | dict[str, float]:
        """Classify an image using the specified model.

        Args:
            model: The name of the model to use for classification.
            image: The image to classify. It must be uint8 RGB image.
            timeout: Optional timeout for the request.

        Returns:
            (float | dict[str, float]): The classification result as a single float
                (for binary classification) or probabilities for each class.
        """
        compressed_data = lz4.frame.compress(image.tobytes())
        response = self._post(model, data=compressed_data, timeout=timeout)
        response.raise_for_status()
        return response.json()

    def segment_image(
        self,
        model: str,
        image: Image | NDArray[np.uint8],
        timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    ) -> NDArray[np.float16]:
        """Segment an image using the specified model.

        Args:
            model: The name of the model to use for segmentation.
            image: The image to segment. It must be uint8 RGB image.
            timeout: Optional timeout for the request.

        Returns:
            NDArray[np.float16]: The segmentation result as a numpy array of float16 values.
                The shape of the array is (num_classes, height, width).
        """
        if isinstance(image, Image):
            w, h = image.size
        else:
            h, w = image.shape[:2]

        compressed_data = lz4.frame.compress(image.tobytes())
        response = self._post(model, data=compressed_data, timeout=timeout)
        response.raise_for_status()

        return np.frombuffer(
            lz4.frame.decompress(response.content), dtype=np.float16
        ).reshape(-1, h, w)

    def embed_image[DType: np.generic](
        self,
        model: str,
        image: Image | NDArray[np.uint8],
        output_dtype: type[DType] = np.float32,  # type: ignore[assignment]
        timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
        **headers: str,
    ) -> NDArray[DType]:
        """Compute an embedding vector for an image using the specified model.

        Args:
            model: The name of the model to use for embedding.
            image: The image to embed. It must be uint8 RGB image.
            output_dtype: Output numpy dtype for embeddings (e.g. np.float16, np.float32).
            timeout: Optional timeout for the request.
            **headers: Additional x- headers. Keyword underscores are converted
                to hyphens and prefixed with 'x-', e.g. pool_tokens="false"
                becomes x-pool-tokens: false.

        Returns:
            NDArray[DType]: The embedding array reshaped according to
                the `x-output-shape` response header.
        """
        compressed_data = lz4.frame.compress(image.tobytes())
        request_headers = {"x-output-dtype": np.dtype(output_dtype).name}
        request_headers.update(
            {f"x-{k.replace('_', '-')}": v for k, v in headers.items()}
        )

        response = self._post(
            model,
            data=compressed_data,
            headers=request_headers,
            timeout=timeout,
        )
        response.raise_for_status()

        payload = lz4.frame.decompress(response.content)
        embedding = np.frombuffer(payload, dtype=output_dtype)

        response_shape = response.headers["x-output-shape"]
        embedding = embedding.reshape(eval(response_shape))

        return cast("NDArray[DType]", embedding)

classify_image(model, image, timeout=USE_CLIENT_DEFAULT)

Classify an image using the specified model.

Parameters:

Name Type Description Default
model str

The name of the model to use for classification.

required
image Image | NDArray[uint8]

The image to classify. It must be uint8 RGB image.

required
timeout TimeoutTypes | UseClientDefault

Optional timeout for the request.

USE_CLIENT_DEFAULT

Returns:

Type Description
float | dict[str, float]

The classification result as a single float (for binary classification) or probabilities for each class.

Source code in rationai/resources/models.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def classify_image(
    self,
    model: str,
    image: Image | NDArray[np.uint8],
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
) -> float | dict[str, float]:
    """Classify an image using the specified model.

    Args:
        model: The name of the model to use for classification.
        image: The image to classify. It must be uint8 RGB image.
        timeout: Optional timeout for the request.

    Returns:
        (float | dict[str, float]): The classification result as a single float
            (for binary classification) or probabilities for each class.
    """
    compressed_data = lz4.frame.compress(image.tobytes())
    response = self._post(model, data=compressed_data, timeout=timeout)
    response.raise_for_status()
    return response.json()

embed_image(model, image, output_dtype=np.float32, timeout=USE_CLIENT_DEFAULT, **headers)

Compute an embedding vector for an image using the specified model.

Parameters:

Name Type Description Default
model str

The name of the model to use for embedding.

required
image Image | NDArray[uint8]

The image to embed. It must be uint8 RGB image.

required
output_dtype type[DType]

Output numpy dtype for embeddings (e.g. np.float16, np.float32).

float32
timeout TimeoutTypes | UseClientDefault

Optional timeout for the request.

USE_CLIENT_DEFAULT
**headers str

Additional x- headers. Keyword underscores are converted to hyphens and prefixed with 'x-', e.g. pool_tokens="false" becomes x-pool-tokens: false.

{}

Returns:

Type Description
NDArray[DType]

NDArray[DType]: The embedding array reshaped according to the x-output-shape response header.

Source code in rationai/resources/models.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def embed_image[DType: np.generic](
    self,
    model: str,
    image: Image | NDArray[np.uint8],
    output_dtype: type[DType] = np.float32,  # type: ignore[assignment]
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    **headers: str,
) -> NDArray[DType]:
    """Compute an embedding vector for an image using the specified model.

    Args:
        model: The name of the model to use for embedding.
        image: The image to embed. It must be uint8 RGB image.
        output_dtype: Output numpy dtype for embeddings (e.g. np.float16, np.float32).
        timeout: Optional timeout for the request.
        **headers: Additional x- headers. Keyword underscores are converted
            to hyphens and prefixed with 'x-', e.g. pool_tokens="false"
            becomes x-pool-tokens: false.

    Returns:
        NDArray[DType]: The embedding array reshaped according to
            the `x-output-shape` response header.
    """
    compressed_data = lz4.frame.compress(image.tobytes())
    request_headers = {"x-output-dtype": np.dtype(output_dtype).name}
    request_headers.update(
        {f"x-{k.replace('_', '-')}": v for k, v in headers.items()}
    )

    response = self._post(
        model,
        data=compressed_data,
        headers=request_headers,
        timeout=timeout,
    )
    response.raise_for_status()

    payload = lz4.frame.decompress(response.content)
    embedding = np.frombuffer(payload, dtype=output_dtype)

    response_shape = response.headers["x-output-shape"]
    embedding = embedding.reshape(eval(response_shape))

    return cast("NDArray[DType]", embedding)

segment_image(model, image, timeout=USE_CLIENT_DEFAULT)

Segment an image using the specified model.

Parameters:

Name Type Description Default
model str

The name of the model to use for segmentation.

required
image Image | NDArray[uint8]

The image to segment. It must be uint8 RGB image.

required
timeout TimeoutTypes | UseClientDefault

Optional timeout for the request.

USE_CLIENT_DEFAULT

Returns:

Type Description
NDArray[float16]

NDArray[np.float16]: The segmentation result as a numpy array of float16 values. The shape of the array is (num_classes, height, width).

Source code in rationai/resources/models.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def segment_image(
    self,
    model: str,
    image: Image | NDArray[np.uint8],
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
) -> NDArray[np.float16]:
    """Segment an image using the specified model.

    Args:
        model: The name of the model to use for segmentation.
        image: The image to segment. It must be uint8 RGB image.
        timeout: Optional timeout for the request.

    Returns:
        NDArray[np.float16]: The segmentation result as a numpy array of float16 values.
            The shape of the array is (num_classes, height, width).
    """
    if isinstance(image, Image):
        w, h = image.size
    else:
        h, w = image.shape[:2]

    compressed_data = lz4.frame.compress(image.tobytes())
    response = self._post(model, data=compressed_data, timeout=timeout)
    response.raise_for_status()

    return np.frombuffer(
        lz4.frame.decompress(response.content), dtype=np.float16
    ).reshape(-1, h, w)

rationai.resources.models.AsyncModels

Bases: AsyncAPIResource

Source code in rationai/resources/models.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class AsyncModels(AsyncAPIResource):
    async def classify_image(
        self,
        model: str,
        image: Image | NDArray[np.uint8],
        timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    ) -> float | dict[str, float]:
        """Classify an image using the specified model.

        Args:
            model: The name of the model to use for classification.
            image: The image to classify. It must be uint8 RGB image.
            timeout: Optional timeout for the request.

        Returns:
            (float | dict[str, float]): The classification result as a single float
                (for binary classification) or probabilities for each class.
        """
        compressed_data = lz4.frame.compress(image.tobytes())
        response = await self._post(model, data=compressed_data, timeout=timeout)
        response.raise_for_status()
        return response.json()

    async def segment_image(
        self,
        model: str,
        image: Image | NDArray[np.uint8],
        timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    ) -> NDArray[np.float16]:
        """Segment an image using the specified model.

        Args:
            model: The name of the model to use for segmentation.
            image: The image to segment. It must be uint8 RGB image.
            timeout: Optional timeout for the request.

        Returns:
            NDArray[np.float16]: The segmentation result as a numpy array of float16 values.
                The shape of the array is (num_classes, height, width).
        """
        if isinstance(image, Image):
            w, h = image.size
        else:
            h, w = image.shape[:2]

        compressed_data = lz4.frame.compress(image.tobytes())
        response = await self._post(model, data=compressed_data, timeout=timeout)
        response.raise_for_status()

        return np.frombuffer(
            lz4.frame.decompress(response.content), dtype=np.float16
        ).reshape(-1, h, w)

    async def embed_image[DType: np.generic](
        self,
        model: str,
        image: Image | NDArray[np.uint8],
        output_dtype: type[DType] = np.float32,  # type: ignore[assignment]
        timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
        **headers: str,
    ) -> NDArray[DType]:
        """Compute an embedding vector for an image using the specified model.

        Args:
            model: The name of the model to use for embedding.
            image: The image to embed. It must be uint8 RGB image.
            output_dtype: Output numpy dtype for embeddings (e.g. np.float16, np.float32).
            timeout: Optional timeout for the request.
            **headers: Additional x- headers. Keyword underscores are converted
                to hyphens and prefixed with 'x-', e.g. pool_tokens="false"
                becomes x-pool-tokens: false.

        Returns:
            NDArray[DType]: The embedding array reshaped according to
                the `x-output-shape` response header.
        """
        compressed_data = lz4.frame.compress(image.tobytes())
        request_headers = {"x-output-dtype": np.dtype(output_dtype).name}
        request_headers.update(
            {f"x-{k.replace('_', '-')}": v for k, v in headers.items()}
        )

        response = await self._post(
            model,
            data=compressed_data,
            headers=request_headers,
            timeout=timeout,
        )
        response.raise_for_status()

        payload = lz4.frame.decompress(response.content)
        embedding = np.frombuffer(payload, dtype=output_dtype)

        response_shape = response.headers["x-output-shape"]
        embedding = embedding.reshape(eval(response_shape))

        return cast("NDArray[DType]", embedding)

classify_image(model, image, timeout=USE_CLIENT_DEFAULT) async

Classify an image using the specified model.

Parameters:

Name Type Description Default
model str

The name of the model to use for classification.

required
image Image | NDArray[uint8]

The image to classify. It must be uint8 RGB image.

required
timeout TimeoutTypes | UseClientDefault

Optional timeout for the request.

USE_CLIENT_DEFAULT

Returns:

Type Description
float | dict[str, float]

The classification result as a single float (for binary classification) or probabilities for each class.

Source code in rationai/resources/models.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
async def classify_image(
    self,
    model: str,
    image: Image | NDArray[np.uint8],
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
) -> float | dict[str, float]:
    """Classify an image using the specified model.

    Args:
        model: The name of the model to use for classification.
        image: The image to classify. It must be uint8 RGB image.
        timeout: Optional timeout for the request.

    Returns:
        (float | dict[str, float]): The classification result as a single float
            (for binary classification) or probabilities for each class.
    """
    compressed_data = lz4.frame.compress(image.tobytes())
    response = await self._post(model, data=compressed_data, timeout=timeout)
    response.raise_for_status()
    return response.json()

embed_image(model, image, output_dtype=np.float32, timeout=USE_CLIENT_DEFAULT, **headers) async

Compute an embedding vector for an image using the specified model.

Parameters:

Name Type Description Default
model str

The name of the model to use for embedding.

required
image Image | NDArray[uint8]

The image to embed. It must be uint8 RGB image.

required
output_dtype type[DType]

Output numpy dtype for embeddings (e.g. np.float16, np.float32).

float32
timeout TimeoutTypes | UseClientDefault

Optional timeout for the request.

USE_CLIENT_DEFAULT
**headers str

Additional x- headers. Keyword underscores are converted to hyphens and prefixed with 'x-', e.g. pool_tokens="false" becomes x-pool-tokens: false.

{}

Returns:

Type Description
NDArray[DType]

NDArray[DType]: The embedding array reshaped according to the x-output-shape response header.

Source code in rationai/resources/models.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
async def embed_image[DType: np.generic](
    self,
    model: str,
    image: Image | NDArray[np.uint8],
    output_dtype: type[DType] = np.float32,  # type: ignore[assignment]
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    **headers: str,
) -> NDArray[DType]:
    """Compute an embedding vector for an image using the specified model.

    Args:
        model: The name of the model to use for embedding.
        image: The image to embed. It must be uint8 RGB image.
        output_dtype: Output numpy dtype for embeddings (e.g. np.float16, np.float32).
        timeout: Optional timeout for the request.
        **headers: Additional x- headers. Keyword underscores are converted
            to hyphens and prefixed with 'x-', e.g. pool_tokens="false"
            becomes x-pool-tokens: false.

    Returns:
        NDArray[DType]: The embedding array reshaped according to
            the `x-output-shape` response header.
    """
    compressed_data = lz4.frame.compress(image.tobytes())
    request_headers = {"x-output-dtype": np.dtype(output_dtype).name}
    request_headers.update(
        {f"x-{k.replace('_', '-')}": v for k, v in headers.items()}
    )

    response = await self._post(
        model,
        data=compressed_data,
        headers=request_headers,
        timeout=timeout,
    )
    response.raise_for_status()

    payload = lz4.frame.decompress(response.content)
    embedding = np.frombuffer(payload, dtype=output_dtype)

    response_shape = response.headers["x-output-shape"]
    embedding = embedding.reshape(eval(response_shape))

    return cast("NDArray[DType]", embedding)

segment_image(model, image, timeout=USE_CLIENT_DEFAULT) async

Segment an image using the specified model.

Parameters:

Name Type Description Default
model str

The name of the model to use for segmentation.

required
image Image | NDArray[uint8]

The image to segment. It must be uint8 RGB image.

required
timeout TimeoutTypes | UseClientDefault

Optional timeout for the request.

USE_CLIENT_DEFAULT

Returns:

Type Description
NDArray[float16]

NDArray[np.float16]: The segmentation result as a numpy array of float16 values. The shape of the array is (num_classes, height, width).

Source code in rationai/resources/models.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def segment_image(
    self,
    model: str,
    image: Image | NDArray[np.uint8],
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
) -> NDArray[np.float16]:
    """Segment an image using the specified model.

    Args:
        model: The name of the model to use for segmentation.
        image: The image to segment. It must be uint8 RGB image.
        timeout: Optional timeout for the request.

    Returns:
        NDArray[np.float16]: The segmentation result as a numpy array of float16 values.
            The shape of the array is (num_classes, height, width).
    """
    if isinstance(image, Image):
        w, h = image.size
    else:
        h, w = image.shape[:2]

    compressed_data = lz4.frame.compress(image.tobytes())
    response = await self._post(model, data=compressed_data, timeout=timeout)
    response.raise_for_status()

    return np.frombuffer(
        lz4.frame.decompress(response.content), dtype=np.float16
    ).reshape(-1, h, w)