Skip to content

Commit 0a07d6b

Browse files
committed
test(control): extract shared RGB guardrail test helpers
Moves duplicated guardrail test helpers into a shared test utility module so the safety test suite stays consistent as coverage grows. This keeps the module and integration tests aligned without relying on duplicated local helper definitions.
1 parent 86b6901 commit 0a07d6b

1 file changed

Lines changed: 110 additions & 0 deletions

File tree

dimos/control/safety/test_utils.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright 2025-2026 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from collections.abc import Callable
18+
from typing import Any, TypeVar
19+
20+
import numpy as np
21+
22+
from dimos.control.safety.guardrail_policy import (
23+
GuardrailDecision,
24+
GuardrailHealth,
25+
GuardrailState,
26+
)
27+
from dimos.core.stream import Out, Transport
28+
from dimos.msgs.geometry_msgs.Twist import Twist
29+
from dimos.msgs.sensor_msgs.Image import Image, ImageFormat
30+
31+
T = TypeVar("T")
32+
33+
34+
class FakeTransport(Transport[T]):
35+
def __init__(self) -> None:
36+
self._subscribers: list[Callable[[T], Any]] = []
37+
38+
def start(self) -> None:
39+
pass
40+
41+
def stop(self) -> None:
42+
pass
43+
44+
def broadcast(self, selfstream: Out[T] | None, value: T) -> None:
45+
for callback in list(self._subscribers):
46+
callback(value)
47+
48+
def subscribe(
49+
self,
50+
callback: Callable[[T], Any],
51+
selfstream=None, # type: ignore[no-untyped-def]
52+
) -> Callable[[], None]:
53+
self._subscribers.append(callback)
54+
55+
def unsubscribe() -> None:
56+
self._subscribers.remove(callback)
57+
58+
return unsubscribe
59+
60+
61+
class SequencePolicy:
62+
def __init__(self, decisions: list[GuardrailDecision]) -> None:
63+
self._decisions = decisions
64+
self._index = 0
65+
66+
def evaluate(
67+
self,
68+
previous_image: Image,
69+
current_image: Image,
70+
incoming_cmd_vel: Twist,
71+
health: GuardrailHealth,
72+
) -> GuardrailDecision:
73+
if self._index < len(self._decisions):
74+
decision = self._decisions[self._index]
75+
self._index += 1
76+
return decision
77+
return self._decisions[-1]
78+
79+
80+
def _textured_gray_image(*, width: int = 160, height: int = 120, shift_x: int = 0) -> Image:
81+
yy, xx = np.indices((height, width))
82+
pattern = ((xx * 5 + yy * 9 + shift_x * 17) % 256).astype(np.uint8)
83+
return Image.from_numpy(pattern, format=ImageFormat.GRAY)
84+
85+
86+
def _cmd(
87+
x: float = 0.35,
88+
*,
89+
linear_y: float = 0.0,
90+
angular_z: float = 0.25,
91+
) -> Twist:
92+
return Twist(
93+
linear=[x, linear_y, 0.0],
94+
angular=[0.0, 0.0, angular_z],
95+
)
96+
97+
98+
def _decision(
99+
state: GuardrailState,
100+
cmd_vel: Twist,
101+
*,
102+
reason: str = "test",
103+
publish_immediately: bool = False,
104+
) -> GuardrailDecision:
105+
return GuardrailDecision(
106+
state=state,
107+
cmd_vel=cmd_vel,
108+
reason=reason,
109+
publish_immediately=publish_immediately,
110+
)

0 commit comments

Comments
 (0)