sbhs_robomaster.client

  1from enum import Enum
  2from types import TracebackType
  3from typing import Any, Coroutine, Final, Optional, Type
  4import asyncio
  5from time import time
  6from .data import *
  7from .push_receiver import PushReceiver
  8from .feed import Feed
  9from .auto_connect import AutoConnectProtocol
 10
 11
 12DIRECT_CONNECT_IP: Final[str] = "192.168.2.1"
 13IP_PORT: Final[int] = 40926
 14
 15_CONTROL_PORT: Final[int] = 40923
 16_PUSH_PORT: Final[int] = 40924
 17
 18
 19async def auto_connect_to_robomaster() -> "RoboMasterClient":
 20    """
 21    It is preferable to use `connect_to_robomaster` where possible, especially if
 22    you are directly connected to the WiFi the robot provides (use `DIRECT_CONNECT_IP`
 23    in that case.) This is due to it not accidentally getting confused when there
 24    are multiple robots (see below).
 25
 26    Automatically finds an unpaired RoboMaster robot on the network and connects to it.
 27    
 28    This assumes there will only be ONE unpaired RoboMaster robot on the network, or you
 29    may end up connecting to a random robot.
 30
 31    ```py
 32    import asyncio
 33    from sbhs_robomaster import auto_connect_to_robomaster
 34
 35    async def main():
 36        async with await auto_connect_to_robomaster() as robot:
 37            # Do stuff with robot
 38            pass
 39
 40    asyncio.run(main())
 41    ```
 42    """
 43
 44    loop = asyncio.get_event_loop()
 45    transport, protocol = await loop.create_datagram_endpoint(AutoConnectProtocol, ("", IP_PORT))
 46
 47    ip = await protocol.ip_future
 48
 49    transport.close()
 50
 51    return await connect_to_robomaster(ip)
 52
 53
 54async def connect_to_robomaster(ip: str) -> "RoboMasterClient":
 55    """
 56    Connects to a RoboMaster robot at the given IP address.
 57    
 58    If the host computer (the computer the code is running on) is connected directly to robot's WiFi,
 59    and not over another network, pass in `DIRECT_CONNECT_IP`:
 60
 61    ```py
 62    import asyncio
 63    from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP
 64
 65    async def main():
 66        async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot:
 67            # Do stuff with robot
 68            pass
 69
 70    asyncio.run(main())
 71    ```
 72    """
 73
 74    command_socket = await asyncio.open_connection(ip, _CONTROL_PORT)
 75
 76    client = RoboMasterClient(command_socket)
 77    await client.async_init()
 78
 79    return client
 80
 81
 82def command_to_str(command: str | int | float | bool | Enum) -> str:
 83    """
 84    Converts a data type to a string that can be sent to the robot.
 85    """
 86
 87    match command:
 88        case str():
 89            return command
 90        case bool():
 91            return "on" if command else "off"
 92        case int() | float():
 93            return str(command)
 94        case Enum():
 95            if isinstance(command.value, str) or isinstance(command.value, int) or isinstance(command.value, float) \
 96                or isinstance(command.value, bool) or isinstance(command.value, Enum):
 97                return command_to_str(command.value)
 98            else:
 99                raise Exception("Invalid enum type")
100
101
102class RoboMasterClient:
103    line: Feed[Line]
104    """
105    A feed of line recognition data.
106
107    Line recognition has to be enabled first with `RoboMasterClient.set_line_recognition_enabled` and
108    the line colour has to be set with `RoboMasterClient.set_line_recognition_colour`.
109    """
110
111    rotation: Feed[ChassisRotation]
112    """
113    A feed of the robot's rotation.
114
115    This has to be enabled first with `RoboMasterClient.set_rotation_push_rate`.
116    """
117
118    attitude: Feed[ChassisAttitude]
119    """
120    A feed of the robot's attitude.
121
122    This has to be enabled first with `RoboMasterClient.set_attitude_push_rate`.
123    """
124
125    status: Feed[ChassisStatus]
126    """
127    A feed of the robot's status.
128
129    This has to be enabled first with `RoboMasterClient.set_status_push_rate`.
130    """
131
132    _line_enabled: bool
133    _rotation_frequency: Frequency
134    _attitude_frequency: Frequency
135    _status_frequency: Frequency
136
137    _conn: tuple[asyncio.StreamReader, asyncio.StreamWriter]
138    _command_lock: asyncio.Lock
139    _push_receiver: PushReceiver
140    _handle_push_task: asyncio.Task[None]
141    _exiting: bool
142
143    def __init__(self, command_conn: tuple[asyncio.StreamReader, asyncio.StreamWriter]) -> None:
144        """
145        This constructor shouldn't be used directly. Use `connect_to_robomaster` instead.
146        """
147
148        self.line = Feed()
149        self.rotation = Feed()
150        self.attitude = Feed()
151        self.status = Feed()
152
153        self._line_enabled = False
154        self._rotation_frequency = Frequency.Off
155        self._attitude_frequency = Frequency.Off
156        self._status_frequency = Frequency.Off
157
158        self._conn = command_conn
159        self._command_lock = asyncio.Lock()
160        self._push_receiver = PushReceiver(_PUSH_PORT)
161        self._handle_push_task = asyncio.create_task(self._handle_push(self._push_receiver.feed))
162        self._exiting = False
163
164    async def async_init(self):
165        """
166        Performs any asynchronous initialisation required. Should be called
167        immediately after the class is constructed. `connect_to_robomaster`
168        handles this automatically.
169        """
170        await self.do("command")
171
172        await self.set_line_recognition_enabled(False)
173        await self.set_rotation_push_rate(Frequency.Off)
174        await self.set_attitude_push_rate(Frequency.Off)
175        await self.set_status_push_rate(Frequency.Off)
176
177    async def __aenter__(self):
178        return self
179
180    async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException], tb: Optional[TracebackType]):
181        await self.do("quit")
182
183        self._exiting = True
184
185        self._handle_push_task.cancel()
186        self._conn[1].close()
187
188    async def _handle_push(self, feed: Feed[Response]):
189        while True:
190            response = await feed.get()
191
192            topic = response.data[0]
193            subject = response.data[2]
194            parseData = Response(response.data[3:])
195
196            if topic == "chassis":
197                if subject == "position":   self.rotation.feed(ChassisRotation.parse(parseData))
198                elif subject == "attitude": self.attitude.feed(ChassisAttitude.parse(parseData))
199                elif subject == "status":   self.status.feed(ChassisStatus.parse(parseData))
200                else:                       print(f"Unknown chassis subject: {subject}")
201            elif topic == "AI":
202                if subject == "line": self.line.feed(Line.parse(parseData))
203                else:                 print(f"Unknown AI subject: {subject}")
204            else:
205                print(f"Unknown topic: {topic}")
206
207    async def _do(self, command: str):
208        async with self._command_lock:
209            if self._exiting:
210                raise Exception("Client is exiting.")
211
212            self._conn[1].write(command.encode())
213            await self._conn[1].drain()
214
215            data = await self._conn[0].readuntil(b";")
216
217            return Response(data.decode()[:-1].split(" "))
218
219    async def do(self, *command_data: str | int | float | bool | Enum) -> Response:
220        """
221        Low level command sending. Every function that influences the robot's behaviour
222        uses this internally.
223
224        It's better to use the higher level functions instead of this.
225
226        Arguments:
227         - *command_data: The parts of the command to send to the robot. These can be strings, numbers, booleans or enums.
228                          All of these will be converted to strings using `command_to_str`.
229        """
230        
231        command = ' '.join(map(command_to_str, command_data)) + ';'
232        return await self._do(command)
233
234    async def get_version(self) -> str:
235        return (await self.do("version")).get_str(0)
236
237    async def set_speed(self, forwards: float, right: float, clockwise: float = 0) -> None:
238        """
239        Causes the robot to move with the given speeds indefinitely.
240
241        Arguments:
242         - forwards: The forwards speed in m/s
243         - right: The right speed in m/s
244         - clockwise: The rotational speed in degrees/s clockwise
245        """
246
247        assert -3.5 <= forwards <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s"
248        assert -3.5 <= right <= 3.5,    "Movement speed must be between -3.5 and 3.5 m/s"
249        assert -600 <= clockwise <= 600, "Rotation speed must be between -600 and 600 degrees/s"
250
251        await self.do("chassis", "speed", "x", forwards, "y", right, "z", clockwise)
252
253    async def set_wheel_speed(self, front_right: float, front_left: float, back_left: float, back_right: float) -> None:
254        """
255        All speeds are in rpm.
256        """
257
258        assert -1000 <= front_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm"
259        assert -1000 <= front_left <= 1000,  "Wheel speeds must be between -1000 and 1000 rpm"
260        assert -1000 <= back_left <= 1000,   "Wheel speeds must be between -1000 and 1000 rpm"
261        assert -1000 <= back_right <= 1000,  "Wheel speeds must be between -1000 and 1000 rpm"
262
263        await self.do(
264            "chassis", "wheel",
265            "w1", front_right,
266            "w2", front_left,
267            "w3", back_left,
268            "w4", back_right
269        )
270
271    async def set_left_right_wheel_speeds(self, left: float, right: float) -> None:
272        """
273        All speeds are in rpm.
274        """
275        await self.set_wheel_speed(right, left, left, right)
276
277    async def set_all_wheel_speeds(self, speed: float) -> None:
278        """
279        All speeds are in rpm.
280        """
281        await self.set_wheel_speed(speed, speed, speed, speed)
282
283    async def get_speed(self) -> ChassisSpeed:
284        return ChassisSpeed.parse(await self.do("chassis", "speed", "?"))
285
286    async def rotate(self, clockwise: float, rotation_speed: float | None = None, timeout: float = 10) -> None:
287        """
288        **NOTE:** This function constantly polls the robot for its rotation and
289        may starve out other areas in the program trying to send messages to
290        the robot. In general, due to the serial nature of the communication
291        protocol, any communication with the robot should not be largely
292        parallel.
293
294        Arguments:
295         - clockwise: The degrees clockwise to rotate.
296         - rotation_speed: The speed to rotate, in degrees/s. (Must be positive.)
297         - timeout: How long to wait for the turn to finish (in seconds).
298        """
299
300        assert -1800 <= clockwise <= 1800, "Rotation must be between -1800 and 1800 degrees"
301        
302        if rotation_speed is not None:
303            assert 0 <= rotation_speed <= 600, "Rotation speed must be positive and less than 600 degrees/s"
304
305        args = [
306            "chassis", "move",
307            "x", 0,
308            "y", 0,
309            "z", clockwise
310        ]
311
312        if rotation_speed is not None:
313            args.append("vz")
314            args.append(rotation_speed)
315
316        await self.do(*args)
317
318        start_time = time()
319        while time() - start_time <= timeout:
320            status = await self.get_status()
321
322            if status.static:
323                return
324            
325            await asyncio.sleep(0.02) # 50Hz
326            
327        raise TimeoutError()
328
329    async def get_rotation(self) -> ChassisRotation:
330        return ChassisRotation.parse(await self.do("chassis", "position", "?"))
331    
332    async def get_attitude(self) -> ChassisAttitude:
333        return ChassisAttitude.parse(await self.do("chassis", "attitude", "?"))
334    
335    async def get_status(self) -> ChassisStatus:
336        return ChassisStatus.parse(await self.do("chassis", "status", "?"))
337
338    async def set_rotation_push_rate(self, freq: Frequency) -> None:
339        """
340        Sets the push rate of the chassis rotation in Hz. Use `Frequency.Off` to disable.
341
342        Access the rotation data through the `RoboMasterClient.rotation` feed.
343        """
344
345        # This sets the "position" push frequency because
346        # that is where the rotation is stored.
347        if freq == Frequency.Off:
348            await self.do("chassis", "push", "position", False)
349        else:
350            await self.do("chassis", "push", "position", True, "pfreq", freq)
351
352        self._rotation_frequency = freq
353
354    def get_rotation_push_rate(self) -> Frequency:
355        """
356        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
357        """
358        return self._rotation_frequency
359
360    async def set_attitude_push_rate(self, freq: Frequency) -> None:
361        """
362        Sets the push rate of the chassis attitude in Hz. Use `Frequency.Off` to disable.
363
364        Access the attitude data through the `RoboMasterClient.attitude` feed.
365        """
366
367        if freq == Frequency.Off:
368            await self.do("chassis", "push", "attitude", False)
369        else:
370            await self.do("chassis", "push", "attitude", True, "afreq", freq)
371
372        self._attitude_frequency = freq
373
374    def get_attitude_push_rate(self) -> Frequency:
375        """
376        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
377        """
378        return self._attitude_frequency
379
380    async def set_status_push_rate(self, freq: Frequency) -> None:
381        """
382        Sets the push rate of the chassis status in Hz. Use `Frequency.Off` to disable.
383
384        Access the status data through the `RoboMasterClient.status` feed.
385        """
386
387        if freq == Frequency.Off:
388            await self.do("chassis", "push", "status", False)
389        else:
390            await self.do("chassis", "push", "status", True, "sfreq", freq)
391
392        self._status_frequency = freq
393
394    def get_status_push_rate(self) -> Frequency:
395        """
396        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
397        """
398        return self._status_frequency
399
400    async def set_ir_enabled(self, enabled: bool = True) -> None:
401        await self.do("ir_distance_sensor", "measure", enabled)
402    
403    async def get_ir_distance(self, ir_id: int) -> float:
404        """
405        The IR sensor has to be enabled first with `set_ir_enabled`.
406
407        Arguments:
408         - ir_id: The ID of the IR sensor. There appears to be only one IR sensor, so this should always be `1`.
409
410        Returns:
411         - The distance in centimeters.
412        """
413        return (await self.do("ir_distance_sensor", "distance", ir_id, "?")).get_float(0)
414    
415    async def move_arm(self, x_dist: float, y_dist: float) -> None:
416        """
417        Moves the robotic arm by the given distance relative to its current position.
418        To move to a specific position, use `set_arm_position` instead.
419
420        The units are unknown. Physically move the arm around and record the results from
421        `get_arm_position` to determine the desired inputs for this function.
422        """
423        await self.do("robotic_arm", "move", "x", x_dist, "y", y_dist)
424    
425    async def set_arm_position(self, x: float, y: float) -> None:
426        """
427        Moves the robotic arm to the given position. To move relative to the current position,
428        use `move_arm` instead.
429
430        The units are unknown. Physically move the arm to the desired position and then use
431        `get_arm_position` to find the desired inputs for this function. The origin for the
432        x and y values appears to be different for every robot, so values for one robot will
433        not carry over to another.
434        """
435        await self.do("robotic_arm", "moveto", "x", x, "y", y)
436    
437    async def get_arm_position(self) -> tuple[float, float]:
438        """
439        Returns:
440         - The current position of the robotic arm in unknown units. The format is `(x, y)`.
441        """
442
443        response = await self.do("robotic_arm", "position", "?")
444        return (response.get_float(0), response.get_float(1))
445    
446    # TODO Allow different levels of force when opening?
447    # https://robomaster-dev.readthedocs.io/en/latest/text_sdk/protocol_api.html#mechanical-gripper-opening-control
448    async def open_gripper(self) -> None:
449        await self.do("robotic_gripper", "open", 1)
450    
451    # TODO Allow different levels of force when closing?
452    # https://robomaster-dev.readthedocs.io/en/latest/text_sdk/protocol_api.html#mechanical-gripper-opening-control
453    async def close_gripper(self) -> None:
454        """
455        This closes the gripper until it encounters any resistance. If there is an object in
456        the way, the gripper will close until it is tightly gripping the object, but it will
457        not damage itself.
458        """
459        await self.do("robotic_gripper", "close", 1)
460
461    async def get_gripper_status(self) -> GripperStatus:
462        """
463        Gets whether the gripper is Open, Closed, or Partially Open.
464        """
465        return (await self.do("robotic_gripper", "status", "?")).get_enum(0, GripperStatus)
466
467    async def set_line_recognition_colour(self, colour: LineColour) -> None:
468        await self.do("AI", "attribute", "line_color", colour)
469
470    async def set_line_recognition_enabled(self, enabled: bool = True) -> None:
471        """
472        Access the line data through the `RoboMasterClient.line` feed.
473        """
474        await self.do("AI", "push", "line", enabled)
475
476        self._line_enabled = enabled
477
478    def get_line_recognition_enabled(self) -> bool:
479        return self._line_enabled
480    
481    async def set_leds(self, leds: LedPosition, colour: Colour, effect: LedEffect = LedEffect.Solid) -> None:
482        """
483        Sets the given LEDs to display a colour with a certain effect. Use
484        `.data.LedEffect.Off` to turn the leds off.
485
486        See `.data.LedPosition` to see how to pass in multiple leds at once.
487        
488        See `.data.LedEffect` to see the available effects.
489        """
490
491        async def set_led(led: str):
492            await self.do("led", "control", "comp", led, "r", colour.r, "g", colour.g, "b", colour.b, "effect", effect)
493
494        if leds == LedPosition.All:
495            await set_led("bottom_all")
496        else:
497            tasks: list[Coroutine[Any, Any, None]] = []
498
499            if LedPosition.Front in leds:
500                tasks.append(set_led("bottom_front"))
501            
502            if LedPosition.Back in leds:
503                tasks.append(set_led("bottom_back"))
504
505            if LedPosition.Left in leds:
506                tasks.append(set_led("bottom_left"))
507
508            if LedPosition.Right in leds:
509                tasks.append(set_led("bottom_right"))
510
511            await asyncio.gather(*tasks)
DIRECT_CONNECT_IP: Final[str] = '192.168.2.1'
IP_PORT: Final[int] = 40926
async def auto_connect_to_robomaster() -> RoboMasterClient:
20async def auto_connect_to_robomaster() -> "RoboMasterClient":
21    """
22    It is preferable to use `connect_to_robomaster` where possible, especially if
23    you are directly connected to the WiFi the robot provides (use `DIRECT_CONNECT_IP`
24    in that case.) This is due to it not accidentally getting confused when there
25    are multiple robots (see below).
26
27    Automatically finds an unpaired RoboMaster robot on the network and connects to it.
28    
29    This assumes there will only be ONE unpaired RoboMaster robot on the network, or you
30    may end up connecting to a random robot.
31
32    ```py
33    import asyncio
34    from sbhs_robomaster import auto_connect_to_robomaster
35
36    async def main():
37        async with await auto_connect_to_robomaster() as robot:
38            # Do stuff with robot
39            pass
40
41    asyncio.run(main())
42    ```
43    """
44
45    loop = asyncio.get_event_loop()
46    transport, protocol = await loop.create_datagram_endpoint(AutoConnectProtocol, ("", IP_PORT))
47
48    ip = await protocol.ip_future
49
50    transport.close()
51
52    return await connect_to_robomaster(ip)

It is preferable to use connect_to_robomaster where possible, especially if you are directly connected to the WiFi the robot provides (use DIRECT_CONNECT_IP in that case.) This is due to it not accidentally getting confused when there are multiple robots (see below).

Automatically finds an unpaired RoboMaster robot on the network and connects to it.

This assumes there will only be ONE unpaired RoboMaster robot on the network, or you may end up connecting to a random robot.

import asyncio
from sbhs_robomaster import auto_connect_to_robomaster

async def main():
    async with await auto_connect_to_robomaster() as robot:
        # Do stuff with robot
        pass

asyncio.run(main())
async def connect_to_robomaster(ip: str) -> RoboMasterClient:
55async def connect_to_robomaster(ip: str) -> "RoboMasterClient":
56    """
57    Connects to a RoboMaster robot at the given IP address.
58    
59    If the host computer (the computer the code is running on) is connected directly to robot's WiFi,
60    and not over another network, pass in `DIRECT_CONNECT_IP`:
61
62    ```py
63    import asyncio
64    from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP
65
66    async def main():
67        async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot:
68            # Do stuff with robot
69            pass
70
71    asyncio.run(main())
72    ```
73    """
74
75    command_socket = await asyncio.open_connection(ip, _CONTROL_PORT)
76
77    client = RoboMasterClient(command_socket)
78    await client.async_init()
79
80    return client

Connects to a RoboMaster robot at the given IP address.

If the host computer (the computer the code is running on) is connected directly to robot's WiFi, and not over another network, pass in DIRECT_CONNECT_IP:

import asyncio
from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP

async def main():
    async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot:
        # Do stuff with robot
        pass

asyncio.run(main())
def command_to_str(command: str | int | float | bool | enum.Enum) -> str:
 83def command_to_str(command: str | int | float | bool | Enum) -> str:
 84    """
 85    Converts a data type to a string that can be sent to the robot.
 86    """
 87
 88    match command:
 89        case str():
 90            return command
 91        case bool():
 92            return "on" if command else "off"
 93        case int() | float():
 94            return str(command)
 95        case Enum():
 96            if isinstance(command.value, str) or isinstance(command.value, int) or isinstance(command.value, float) \
 97                or isinstance(command.value, bool) or isinstance(command.value, Enum):
 98                return command_to_str(command.value)
 99            else:
100                raise Exception("Invalid enum type")

Converts a data type to a string that can be sent to the robot.

class RoboMasterClient:
103class RoboMasterClient:
104    line: Feed[Line]
105    """
106    A feed of line recognition data.
107
108    Line recognition has to be enabled first with `RoboMasterClient.set_line_recognition_enabled` and
109    the line colour has to be set with `RoboMasterClient.set_line_recognition_colour`.
110    """
111
112    rotation: Feed[ChassisRotation]
113    """
114    A feed of the robot's rotation.
115
116    This has to be enabled first with `RoboMasterClient.set_rotation_push_rate`.
117    """
118
119    attitude: Feed[ChassisAttitude]
120    """
121    A feed of the robot's attitude.
122
123    This has to be enabled first with `RoboMasterClient.set_attitude_push_rate`.
124    """
125
126    status: Feed[ChassisStatus]
127    """
128    A feed of the robot's status.
129
130    This has to be enabled first with `RoboMasterClient.set_status_push_rate`.
131    """
132
133    _line_enabled: bool
134    _rotation_frequency: Frequency
135    _attitude_frequency: Frequency
136    _status_frequency: Frequency
137
138    _conn: tuple[asyncio.StreamReader, asyncio.StreamWriter]
139    _command_lock: asyncio.Lock
140    _push_receiver: PushReceiver
141    _handle_push_task: asyncio.Task[None]
142    _exiting: bool
143
144    def __init__(self, command_conn: tuple[asyncio.StreamReader, asyncio.StreamWriter]) -> None:
145        """
146        This constructor shouldn't be used directly. Use `connect_to_robomaster` instead.
147        """
148
149        self.line = Feed()
150        self.rotation = Feed()
151        self.attitude = Feed()
152        self.status = Feed()
153
154        self._line_enabled = False
155        self._rotation_frequency = Frequency.Off
156        self._attitude_frequency = Frequency.Off
157        self._status_frequency = Frequency.Off
158
159        self._conn = command_conn
160        self._command_lock = asyncio.Lock()
161        self._push_receiver = PushReceiver(_PUSH_PORT)
162        self._handle_push_task = asyncio.create_task(self._handle_push(self._push_receiver.feed))
163        self._exiting = False
164
165    async def async_init(self):
166        """
167        Performs any asynchronous initialisation required. Should be called
168        immediately after the class is constructed. `connect_to_robomaster`
169        handles this automatically.
170        """
171        await self.do("command")
172
173        await self.set_line_recognition_enabled(False)
174        await self.set_rotation_push_rate(Frequency.Off)
175        await self.set_attitude_push_rate(Frequency.Off)
176        await self.set_status_push_rate(Frequency.Off)
177
178    async def __aenter__(self):
179        return self
180
181    async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException], tb: Optional[TracebackType]):
182        await self.do("quit")
183
184        self._exiting = True
185
186        self._handle_push_task.cancel()
187        self._conn[1].close()
188
189    async def _handle_push(self, feed: Feed[Response]):
190        while True:
191            response = await feed.get()
192
193            topic = response.data[0]
194            subject = response.data[2]
195            parseData = Response(response.data[3:])
196
197            if topic == "chassis":
198                if subject == "position":   self.rotation.feed(ChassisRotation.parse(parseData))
199                elif subject == "attitude": self.attitude.feed(ChassisAttitude.parse(parseData))
200                elif subject == "status":   self.status.feed(ChassisStatus.parse(parseData))
201                else:                       print(f"Unknown chassis subject: {subject}")
202            elif topic == "AI":
203                if subject == "line": self.line.feed(Line.parse(parseData))
204                else:                 print(f"Unknown AI subject: {subject}")
205            else:
206                print(f"Unknown topic: {topic}")
207
208    async def _do(self, command: str):
209        async with self._command_lock:
210            if self._exiting:
211                raise Exception("Client is exiting.")
212
213            self._conn[1].write(command.encode())
214            await self._conn[1].drain()
215
216            data = await self._conn[0].readuntil(b";")
217
218            return Response(data.decode()[:-1].split(" "))
219
220    async def do(self, *command_data: str | int | float | bool | Enum) -> Response:
221        """
222        Low level command sending. Every function that influences the robot's behaviour
223        uses this internally.
224
225        It's better to use the higher level functions instead of this.
226
227        Arguments:
228         - *command_data: The parts of the command to send to the robot. These can be strings, numbers, booleans or enums.
229                          All of these will be converted to strings using `command_to_str`.
230        """
231        
232        command = ' '.join(map(command_to_str, command_data)) + ';'
233        return await self._do(command)
234
235    async def get_version(self) -> str:
236        return (await self.do("version")).get_str(0)
237
238    async def set_speed(self, forwards: float, right: float, clockwise: float = 0) -> None:
239        """
240        Causes the robot to move with the given speeds indefinitely.
241
242        Arguments:
243         - forwards: The forwards speed in m/s
244         - right: The right speed in m/s
245         - clockwise: The rotational speed in degrees/s clockwise
246        """
247
248        assert -3.5 <= forwards <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s"
249        assert -3.5 <= right <= 3.5,    "Movement speed must be between -3.5 and 3.5 m/s"
250        assert -600 <= clockwise <= 600, "Rotation speed must be between -600 and 600 degrees/s"
251
252        await self.do("chassis", "speed", "x", forwards, "y", right, "z", clockwise)
253
254    async def set_wheel_speed(self, front_right: float, front_left: float, back_left: float, back_right: float) -> None:
255        """
256        All speeds are in rpm.
257        """
258
259        assert -1000 <= front_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm"
260        assert -1000 <= front_left <= 1000,  "Wheel speeds must be between -1000 and 1000 rpm"
261        assert -1000 <= back_left <= 1000,   "Wheel speeds must be between -1000 and 1000 rpm"
262        assert -1000 <= back_right <= 1000,  "Wheel speeds must be between -1000 and 1000 rpm"
263
264        await self.do(
265            "chassis", "wheel",
266            "w1", front_right,
267            "w2", front_left,
268            "w3", back_left,
269            "w4", back_right
270        )
271
272    async def set_left_right_wheel_speeds(self, left: float, right: float) -> None:
273        """
274        All speeds are in rpm.
275        """
276        await self.set_wheel_speed(right, left, left, right)
277
278    async def set_all_wheel_speeds(self, speed: float) -> None:
279        """
280        All speeds are in rpm.
281        """
282        await self.set_wheel_speed(speed, speed, speed, speed)
283
284    async def get_speed(self) -> ChassisSpeed:
285        return ChassisSpeed.parse(await self.do("chassis", "speed", "?"))
286
287    async def rotate(self, clockwise: float, rotation_speed: float | None = None, timeout: float = 10) -> None:
288        """
289        **NOTE:** This function constantly polls the robot for its rotation and
290        may starve out other areas in the program trying to send messages to
291        the robot. In general, due to the serial nature of the communication
292        protocol, any communication with the robot should not be largely
293        parallel.
294
295        Arguments:
296         - clockwise: The degrees clockwise to rotate.
297         - rotation_speed: The speed to rotate, in degrees/s. (Must be positive.)
298         - timeout: How long to wait for the turn to finish (in seconds).
299        """
300
301        assert -1800 <= clockwise <= 1800, "Rotation must be between -1800 and 1800 degrees"
302        
303        if rotation_speed is not None:
304            assert 0 <= rotation_speed <= 600, "Rotation speed must be positive and less than 600 degrees/s"
305
306        args = [
307            "chassis", "move",
308            "x", 0,
309            "y", 0,
310            "z", clockwise
311        ]
312
313        if rotation_speed is not None:
314            args.append("vz")
315            args.append(rotation_speed)
316
317        await self.do(*args)
318
319        start_time = time()
320        while time() - start_time <= timeout:
321            status = await self.get_status()
322
323            if status.static:
324                return
325            
326            await asyncio.sleep(0.02) # 50Hz
327            
328        raise TimeoutError()
329
330    async def get_rotation(self) -> ChassisRotation:
331        return ChassisRotation.parse(await self.do("chassis", "position", "?"))
332    
333    async def get_attitude(self) -> ChassisAttitude:
334        return ChassisAttitude.parse(await self.do("chassis", "attitude", "?"))
335    
336    async def get_status(self) -> ChassisStatus:
337        return ChassisStatus.parse(await self.do("chassis", "status", "?"))
338
339    async def set_rotation_push_rate(self, freq: Frequency) -> None:
340        """
341        Sets the push rate of the chassis rotation in Hz. Use `Frequency.Off` to disable.
342
343        Access the rotation data through the `RoboMasterClient.rotation` feed.
344        """
345
346        # This sets the "position" push frequency because
347        # that is where the rotation is stored.
348        if freq == Frequency.Off:
349            await self.do("chassis", "push", "position", False)
350        else:
351            await self.do("chassis", "push", "position", True, "pfreq", freq)
352
353        self._rotation_frequency = freq
354
355    def get_rotation_push_rate(self) -> Frequency:
356        """
357        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
358        """
359        return self._rotation_frequency
360
361    async def set_attitude_push_rate(self, freq: Frequency) -> None:
362        """
363        Sets the push rate of the chassis attitude in Hz. Use `Frequency.Off` to disable.
364
365        Access the attitude data through the `RoboMasterClient.attitude` feed.
366        """
367
368        if freq == Frequency.Off:
369            await self.do("chassis", "push", "attitude", False)
370        else:
371            await self.do("chassis", "push", "attitude", True, "afreq", freq)
372
373        self._attitude_frequency = freq
374
375    def get_attitude_push_rate(self) -> Frequency:
376        """
377        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
378        """
379        return self._attitude_frequency
380
381    async def set_status_push_rate(self, freq: Frequency) -> None:
382        """
383        Sets the push rate of the chassis status in Hz. Use `Frequency.Off` to disable.
384
385        Access the status data through the `RoboMasterClient.status` feed.
386        """
387
388        if freq == Frequency.Off:
389            await self.do("chassis", "push", "status", False)
390        else:
391            await self.do("chassis", "push", "status", True, "sfreq", freq)
392
393        self._status_frequency = freq
394
395    def get_status_push_rate(self) -> Frequency:
396        """
397        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
398        """
399        return self._status_frequency
400
401    async def set_ir_enabled(self, enabled: bool = True) -> None:
402        await self.do("ir_distance_sensor", "measure", enabled)
403    
404    async def get_ir_distance(self, ir_id: int) -> float:
405        """
406        The IR sensor has to be enabled first with `set_ir_enabled`.
407
408        Arguments:
409         - ir_id: The ID of the IR sensor. There appears to be only one IR sensor, so this should always be `1`.
410
411        Returns:
412         - The distance in centimeters.
413        """
414        return (await self.do("ir_distance_sensor", "distance", ir_id, "?")).get_float(0)
415    
416    async def move_arm(self, x_dist: float, y_dist: float) -> None:
417        """
418        Moves the robotic arm by the given distance relative to its current position.
419        To move to a specific position, use `set_arm_position` instead.
420
421        The units are unknown. Physically move the arm around and record the results from
422        `get_arm_position` to determine the desired inputs for this function.
423        """
424        await self.do("robotic_arm", "move", "x", x_dist, "y", y_dist)
425    
426    async def set_arm_position(self, x: float, y: float) -> None:
427        """
428        Moves the robotic arm to the given position. To move relative to the current position,
429        use `move_arm` instead.
430
431        The units are unknown. Physically move the arm to the desired position and then use
432        `get_arm_position` to find the desired inputs for this function. The origin for the
433        x and y values appears to be different for every robot, so values for one robot will
434        not carry over to another.
435        """
436        await self.do("robotic_arm", "moveto", "x", x, "y", y)
437    
438    async def get_arm_position(self) -> tuple[float, float]:
439        """
440        Returns:
441         - The current position of the robotic arm in unknown units. The format is `(x, y)`.
442        """
443
444        response = await self.do("robotic_arm", "position", "?")
445        return (response.get_float(0), response.get_float(1))
446    
447    # TODO Allow different levels of force when opening?
448    # https://robomaster-dev.readthedocs.io/en/latest/text_sdk/protocol_api.html#mechanical-gripper-opening-control
449    async def open_gripper(self) -> None:
450        await self.do("robotic_gripper", "open", 1)
451    
452    # TODO Allow different levels of force when closing?
453    # https://robomaster-dev.readthedocs.io/en/latest/text_sdk/protocol_api.html#mechanical-gripper-opening-control
454    async def close_gripper(self) -> None:
455        """
456        This closes the gripper until it encounters any resistance. If there is an object in
457        the way, the gripper will close until it is tightly gripping the object, but it will
458        not damage itself.
459        """
460        await self.do("robotic_gripper", "close", 1)
461
462    async def get_gripper_status(self) -> GripperStatus:
463        """
464        Gets whether the gripper is Open, Closed, or Partially Open.
465        """
466        return (await self.do("robotic_gripper", "status", "?")).get_enum(0, GripperStatus)
467
468    async def set_line_recognition_colour(self, colour: LineColour) -> None:
469        await self.do("AI", "attribute", "line_color", colour)
470
471    async def set_line_recognition_enabled(self, enabled: bool = True) -> None:
472        """
473        Access the line data through the `RoboMasterClient.line` feed.
474        """
475        await self.do("AI", "push", "line", enabled)
476
477        self._line_enabled = enabled
478
479    def get_line_recognition_enabled(self) -> bool:
480        return self._line_enabled
481    
482    async def set_leds(self, leds: LedPosition, colour: Colour, effect: LedEffect = LedEffect.Solid) -> None:
483        """
484        Sets the given LEDs to display a colour with a certain effect. Use
485        `.data.LedEffect.Off` to turn the leds off.
486
487        See `.data.LedPosition` to see how to pass in multiple leds at once.
488        
489        See `.data.LedEffect` to see the available effects.
490        """
491
492        async def set_led(led: str):
493            await self.do("led", "control", "comp", led, "r", colour.r, "g", colour.g, "b", colour.b, "effect", effect)
494
495        if leds == LedPosition.All:
496            await set_led("bottom_all")
497        else:
498            tasks: list[Coroutine[Any, Any, None]] = []
499
500            if LedPosition.Front in leds:
501                tasks.append(set_led("bottom_front"))
502            
503            if LedPosition.Back in leds:
504                tasks.append(set_led("bottom_back"))
505
506            if LedPosition.Left in leds:
507                tasks.append(set_led("bottom_left"))
508
509            if LedPosition.Right in leds:
510                tasks.append(set_led("bottom_right"))
511
512            await asyncio.gather(*tasks)
RoboMasterClient( command_conn: tuple[asyncio.streams.StreamReader, asyncio.streams.StreamWriter])
144    def __init__(self, command_conn: tuple[asyncio.StreamReader, asyncio.StreamWriter]) -> None:
145        """
146        This constructor shouldn't be used directly. Use `connect_to_robomaster` instead.
147        """
148
149        self.line = Feed()
150        self.rotation = Feed()
151        self.attitude = Feed()
152        self.status = Feed()
153
154        self._line_enabled = False
155        self._rotation_frequency = Frequency.Off
156        self._attitude_frequency = Frequency.Off
157        self._status_frequency = Frequency.Off
158
159        self._conn = command_conn
160        self._command_lock = asyncio.Lock()
161        self._push_receiver = PushReceiver(_PUSH_PORT)
162        self._handle_push_task = asyncio.create_task(self._handle_push(self._push_receiver.feed))
163        self._exiting = False

This constructor shouldn't be used directly. Use connect_to_robomaster instead.

A feed of line recognition data.

Line recognition has to be enabled first with RoboMasterClient.set_line_recognition_enabled and the line colour has to be set with RoboMasterClient.set_line_recognition_colour.

A feed of the robot's rotation.

This has to be enabled first with RoboMasterClient.set_rotation_push_rate.

A feed of the robot's attitude.

This has to be enabled first with RoboMasterClient.set_attitude_push_rate.

A feed of the robot's status.

This has to be enabled first with RoboMasterClient.set_status_push_rate.

async def async_init(self):
165    async def async_init(self):
166        """
167        Performs any asynchronous initialisation required. Should be called
168        immediately after the class is constructed. `connect_to_robomaster`
169        handles this automatically.
170        """
171        await self.do("command")
172
173        await self.set_line_recognition_enabled(False)
174        await self.set_rotation_push_rate(Frequency.Off)
175        await self.set_attitude_push_rate(Frequency.Off)
176        await self.set_status_push_rate(Frequency.Off)

Performs any asynchronous initialisation required. Should be called immediately after the class is constructed. connect_to_robomaster handles this automatically.

async def do( self, *command_data: str | int | float | bool | enum.Enum) -> sbhs_robomaster.data.Response:
220    async def do(self, *command_data: str | int | float | bool | Enum) -> Response:
221        """
222        Low level command sending. Every function that influences the robot's behaviour
223        uses this internally.
224
225        It's better to use the higher level functions instead of this.
226
227        Arguments:
228         - *command_data: The parts of the command to send to the robot. These can be strings, numbers, booleans or enums.
229                          All of these will be converted to strings using `command_to_str`.
230        """
231        
232        command = ' '.join(map(command_to_str, command_data)) + ';'
233        return await self._do(command)

Low level command sending. Every function that influences the robot's behaviour uses this internally.

It's better to use the higher level functions instead of this.

Arguments:

  • *command_data: The parts of the command to send to the robot. These can be strings, numbers, booleans or enums. All of these will be converted to strings using command_to_str.
async def get_version(self) -> str:
235    async def get_version(self) -> str:
236        return (await self.do("version")).get_str(0)
async def set_speed(self, forwards: float, right: float, clockwise: float = 0) -> None:
238    async def set_speed(self, forwards: float, right: float, clockwise: float = 0) -> None:
239        """
240        Causes the robot to move with the given speeds indefinitely.
241
242        Arguments:
243         - forwards: The forwards speed in m/s
244         - right: The right speed in m/s
245         - clockwise: The rotational speed in degrees/s clockwise
246        """
247
248        assert -3.5 <= forwards <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s"
249        assert -3.5 <= right <= 3.5,    "Movement speed must be between -3.5 and 3.5 m/s"
250        assert -600 <= clockwise <= 600, "Rotation speed must be between -600 and 600 degrees/s"
251
252        await self.do("chassis", "speed", "x", forwards, "y", right, "z", clockwise)

Causes the robot to move with the given speeds indefinitely.

Arguments:

  • forwards: The forwards speed in m/s
  • right: The right speed in m/s
  • clockwise: The rotational speed in degrees/s clockwise
async def set_wheel_speed( self, front_right: float, front_left: float, back_left: float, back_right: float) -> None:
254    async def set_wheel_speed(self, front_right: float, front_left: float, back_left: float, back_right: float) -> None:
255        """
256        All speeds are in rpm.
257        """
258
259        assert -1000 <= front_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm"
260        assert -1000 <= front_left <= 1000,  "Wheel speeds must be between -1000 and 1000 rpm"
261        assert -1000 <= back_left <= 1000,   "Wheel speeds must be between -1000 and 1000 rpm"
262        assert -1000 <= back_right <= 1000,  "Wheel speeds must be between -1000 and 1000 rpm"
263
264        await self.do(
265            "chassis", "wheel",
266            "w1", front_right,
267            "w2", front_left,
268            "w3", back_left,
269            "w4", back_right
270        )

All speeds are in rpm.

async def set_left_right_wheel_speeds(self, left: float, right: float) -> None:
272    async def set_left_right_wheel_speeds(self, left: float, right: float) -> None:
273        """
274        All speeds are in rpm.
275        """
276        await self.set_wheel_speed(right, left, left, right)

All speeds are in rpm.

async def set_all_wheel_speeds(self, speed: float) -> None:
278    async def set_all_wheel_speeds(self, speed: float) -> None:
279        """
280        All speeds are in rpm.
281        """
282        await self.set_wheel_speed(speed, speed, speed, speed)

All speeds are in rpm.

async def get_speed(self) -> sbhs_robomaster.data.ChassisSpeed:
284    async def get_speed(self) -> ChassisSpeed:
285        return ChassisSpeed.parse(await self.do("chassis", "speed", "?"))
async def rotate( self, clockwise: float, rotation_speed: float | None = None, timeout: float = 10) -> None:
287    async def rotate(self, clockwise: float, rotation_speed: float | None = None, timeout: float = 10) -> None:
288        """
289        **NOTE:** This function constantly polls the robot for its rotation and
290        may starve out other areas in the program trying to send messages to
291        the robot. In general, due to the serial nature of the communication
292        protocol, any communication with the robot should not be largely
293        parallel.
294
295        Arguments:
296         - clockwise: The degrees clockwise to rotate.
297         - rotation_speed: The speed to rotate, in degrees/s. (Must be positive.)
298         - timeout: How long to wait for the turn to finish (in seconds).
299        """
300
301        assert -1800 <= clockwise <= 1800, "Rotation must be between -1800 and 1800 degrees"
302        
303        if rotation_speed is not None:
304            assert 0 <= rotation_speed <= 600, "Rotation speed must be positive and less than 600 degrees/s"
305
306        args = [
307            "chassis", "move",
308            "x", 0,
309            "y", 0,
310            "z", clockwise
311        ]
312
313        if rotation_speed is not None:
314            args.append("vz")
315            args.append(rotation_speed)
316
317        await self.do(*args)
318
319        start_time = time()
320        while time() - start_time <= timeout:
321            status = await self.get_status()
322
323            if status.static:
324                return
325            
326            await asyncio.sleep(0.02) # 50Hz
327            
328        raise TimeoutError()

NOTE: This function constantly polls the robot for its rotation and may starve out other areas in the program trying to send messages to the robot. In general, due to the serial nature of the communication protocol, any communication with the robot should not be largely parallel.

Arguments:

  • clockwise: The degrees clockwise to rotate.
  • rotation_speed: The speed to rotate, in degrees/s. (Must be positive.)
  • timeout: How long to wait for the turn to finish (in seconds).
async def get_rotation(self) -> sbhs_robomaster.data.ChassisRotation:
330    async def get_rotation(self) -> ChassisRotation:
331        return ChassisRotation.parse(await self.do("chassis", "position", "?"))
async def get_attitude(self) -> sbhs_robomaster.data.ChassisAttitude:
333    async def get_attitude(self) -> ChassisAttitude:
334        return ChassisAttitude.parse(await self.do("chassis", "attitude", "?"))
async def get_status(self) -> sbhs_robomaster.data.ChassisStatus:
336    async def get_status(self) -> ChassisStatus:
337        return ChassisStatus.parse(await self.do("chassis", "status", "?"))
async def set_rotation_push_rate(self, freq: sbhs_robomaster.data.Frequency) -> None:
339    async def set_rotation_push_rate(self, freq: Frequency) -> None:
340        """
341        Sets the push rate of the chassis rotation in Hz. Use `Frequency.Off` to disable.
342
343        Access the rotation data through the `RoboMasterClient.rotation` feed.
344        """
345
346        # This sets the "position" push frequency because
347        # that is where the rotation is stored.
348        if freq == Frequency.Off:
349            await self.do("chassis", "push", "position", False)
350        else:
351            await self.do("chassis", "push", "position", True, "pfreq", freq)
352
353        self._rotation_frequency = freq

Sets the push rate of the chassis rotation in Hz. Use Frequency.Off to disable.

Access the rotation data through the RoboMasterClient.rotation feed.

def get_rotation_push_rate(self) -> sbhs_robomaster.data.Frequency:
355    def get_rotation_push_rate(self) -> Frequency:
356        """
357        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
358        """
359        return self._rotation_frequency

Gets the push rate of the chassis rotation in Hz. Frequency.Off means it is disabled.

async def set_attitude_push_rate(self, freq: sbhs_robomaster.data.Frequency) -> None:
361    async def set_attitude_push_rate(self, freq: Frequency) -> None:
362        """
363        Sets the push rate of the chassis attitude in Hz. Use `Frequency.Off` to disable.
364
365        Access the attitude data through the `RoboMasterClient.attitude` feed.
366        """
367
368        if freq == Frequency.Off:
369            await self.do("chassis", "push", "attitude", False)
370        else:
371            await self.do("chassis", "push", "attitude", True, "afreq", freq)
372
373        self._attitude_frequency = freq

Sets the push rate of the chassis attitude in Hz. Use Frequency.Off to disable.

Access the attitude data through the RoboMasterClient.attitude feed.

def get_attitude_push_rate(self) -> sbhs_robomaster.data.Frequency:
375    def get_attitude_push_rate(self) -> Frequency:
376        """
377        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
378        """
379        return self._attitude_frequency

Gets the push rate of the chassis rotation in Hz. Frequency.Off means it is disabled.

async def set_status_push_rate(self, freq: sbhs_robomaster.data.Frequency) -> None:
381    async def set_status_push_rate(self, freq: Frequency) -> None:
382        """
383        Sets the push rate of the chassis status in Hz. Use `Frequency.Off` to disable.
384
385        Access the status data through the `RoboMasterClient.status` feed.
386        """
387
388        if freq == Frequency.Off:
389            await self.do("chassis", "push", "status", False)
390        else:
391            await self.do("chassis", "push", "status", True, "sfreq", freq)
392
393        self._status_frequency = freq

Sets the push rate of the chassis status in Hz. Use Frequency.Off to disable.

Access the status data through the RoboMasterClient.status feed.

def get_status_push_rate(self) -> sbhs_robomaster.data.Frequency:
395    def get_status_push_rate(self) -> Frequency:
396        """
397        Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled.
398        """
399        return self._status_frequency

Gets the push rate of the chassis rotation in Hz. Frequency.Off means it is disabled.

async def set_ir_enabled(self, enabled: bool = True) -> None:
401    async def set_ir_enabled(self, enabled: bool = True) -> None:
402        await self.do("ir_distance_sensor", "measure", enabled)
async def get_ir_distance(self, ir_id: int) -> float:
404    async def get_ir_distance(self, ir_id: int) -> float:
405        """
406        The IR sensor has to be enabled first with `set_ir_enabled`.
407
408        Arguments:
409         - ir_id: The ID of the IR sensor. There appears to be only one IR sensor, so this should always be `1`.
410
411        Returns:
412         - The distance in centimeters.
413        """
414        return (await self.do("ir_distance_sensor", "distance", ir_id, "?")).get_float(0)

The IR sensor has to be enabled first with set_ir_enabled.

Arguments:

  • ir_id: The ID of the IR sensor. There appears to be only one IR sensor, so this should always be 1.

Returns:

  • The distance in centimeters.
async def move_arm(self, x_dist: float, y_dist: float) -> None:
416    async def move_arm(self, x_dist: float, y_dist: float) -> None:
417        """
418        Moves the robotic arm by the given distance relative to its current position.
419        To move to a specific position, use `set_arm_position` instead.
420
421        The units are unknown. Physically move the arm around and record the results from
422        `get_arm_position` to determine the desired inputs for this function.
423        """
424        await self.do("robotic_arm", "move", "x", x_dist, "y", y_dist)

Moves the robotic arm by the given distance relative to its current position. To move to a specific position, use set_arm_position instead.

The units are unknown. Physically move the arm around and record the results from get_arm_position to determine the desired inputs for this function.

async def set_arm_position(self, x: float, y: float) -> None:
426    async def set_arm_position(self, x: float, y: float) -> None:
427        """
428        Moves the robotic arm to the given position. To move relative to the current position,
429        use `move_arm` instead.
430
431        The units are unknown. Physically move the arm to the desired position and then use
432        `get_arm_position` to find the desired inputs for this function. The origin for the
433        x and y values appears to be different for every robot, so values for one robot will
434        not carry over to another.
435        """
436        await self.do("robotic_arm", "moveto", "x", x, "y", y)

Moves the robotic arm to the given position. To move relative to the current position, use move_arm instead.

The units are unknown. Physically move the arm to the desired position and then use get_arm_position to find the desired inputs for this function. The origin for the x and y values appears to be different for every robot, so values for one robot will not carry over to another.

async def get_arm_position(self) -> tuple[float, float]:
438    async def get_arm_position(self) -> tuple[float, float]:
439        """
440        Returns:
441         - The current position of the robotic arm in unknown units. The format is `(x, y)`.
442        """
443
444        response = await self.do("robotic_arm", "position", "?")
445        return (response.get_float(0), response.get_float(1))

Returns:

  • The current position of the robotic arm in unknown units. The format is (x, y).
async def open_gripper(self) -> None:
449    async def open_gripper(self) -> None:
450        await self.do("robotic_gripper", "open", 1)
async def close_gripper(self) -> None:
454    async def close_gripper(self) -> None:
455        """
456        This closes the gripper until it encounters any resistance. If there is an object in
457        the way, the gripper will close until it is tightly gripping the object, but it will
458        not damage itself.
459        """
460        await self.do("robotic_gripper", "close", 1)

This closes the gripper until it encounters any resistance. If there is an object in the way, the gripper will close until it is tightly gripping the object, but it will not damage itself.

async def get_gripper_status(self) -> sbhs_robomaster.data.GripperStatus:
462    async def get_gripper_status(self) -> GripperStatus:
463        """
464        Gets whether the gripper is Open, Closed, or Partially Open.
465        """
466        return (await self.do("robotic_gripper", "status", "?")).get_enum(0, GripperStatus)

Gets whether the gripper is Open, Closed, or Partially Open.

async def set_line_recognition_colour(self, colour: sbhs_robomaster.data.LineColour) -> None:
468    async def set_line_recognition_colour(self, colour: LineColour) -> None:
469        await self.do("AI", "attribute", "line_color", colour)
async def set_line_recognition_enabled(self, enabled: bool = True) -> None:
471    async def set_line_recognition_enabled(self, enabled: bool = True) -> None:
472        """
473        Access the line data through the `RoboMasterClient.line` feed.
474        """
475        await self.do("AI", "push", "line", enabled)
476
477        self._line_enabled = enabled

Access the line data through the RoboMasterClient.line feed.

def get_line_recognition_enabled(self) -> bool:
479    def get_line_recognition_enabled(self) -> bool:
480        return self._line_enabled
async def set_leds( self, leds: sbhs_robomaster.data.LedPosition, colour: sbhs_robomaster.data.Colour, effect: sbhs_robomaster.data.LedEffect = <LedEffect.Solid: 'solid'>) -> None:
482    async def set_leds(self, leds: LedPosition, colour: Colour, effect: LedEffect = LedEffect.Solid) -> None:
483        """
484        Sets the given LEDs to display a colour with a certain effect. Use
485        `.data.LedEffect.Off` to turn the leds off.
486
487        See `.data.LedPosition` to see how to pass in multiple leds at once.
488        
489        See `.data.LedEffect` to see the available effects.
490        """
491
492        async def set_led(led: str):
493            await self.do("led", "control", "comp", led, "r", colour.r, "g", colour.g, "b", colour.b, "effect", effect)
494
495        if leds == LedPosition.All:
496            await set_led("bottom_all")
497        else:
498            tasks: list[Coroutine[Any, Any, None]] = []
499
500            if LedPosition.Front in leds:
501                tasks.append(set_led("bottom_front"))
502            
503            if LedPosition.Back in leds:
504                tasks.append(set_led("bottom_back"))
505
506            if LedPosition.Left in leds:
507                tasks.append(set_led("bottom_left"))
508
509            if LedPosition.Right in leds:
510                tasks.append(set_led("bottom_right"))
511
512            await asyncio.gather(*tasks)

Sets the given LEDs to display a colour with a certain effect. Use sbhs_robomaster.data.LedEffect.Off to turn the leds off.

See sbhs_robomaster.data.LedPosition to see how to pass in multiple leds at once.

See sbhs_robomaster.data.LedEffect to see the available effects.