sbhs_robomaster.client
1from enum import Enum 2from types import TracebackType 3from typing import Any, Coroutine, Final, Optional, Type 4import asyncio 5from time import time 6from .data import * 7from .push_receiver import PushReceiver 8from .feed import Feed 9from .auto_connect import AutoConnectProtocol 10 11 12DIRECT_CONNECT_IP: Final[str] = "192.168.2.1" 13IP_PORT: Final[int] = 40926 14 15_CONTROL_PORT: Final[int] = 40923 16_PUSH_PORT: Final[int] = 40924 17 18 19async def auto_connect_to_robomaster() -> "RoboMasterClient": 20 """ 21 It is preferable to use `connect_to_robomaster` where possible, especially if 22 you are directly connected to the WiFi the robot provides (use `DIRECT_CONNECT_IP` 23 in that case.) This is due to it not accidentally getting confused when there 24 are multiple robots (see below). 25 26 Automatically finds an unpaired RoboMaster robot on the network and connects to it. 27 28 This assumes there will only be ONE unpaired RoboMaster robot on the network, or you 29 may end up connecting to a random robot. 30 31 ```py 32 import asyncio 33 from sbhs_robomaster import auto_connect_to_robomaster 34 35 async def main(): 36 async with await auto_connect_to_robomaster() as robot: 37 # Do stuff with robot 38 pass 39 40 asyncio.run(main()) 41 ``` 42 """ 43 44 loop = asyncio.get_event_loop() 45 transport, protocol = await loop.create_datagram_endpoint(AutoConnectProtocol, ("", IP_PORT)) 46 47 ip = await protocol.ip_future 48 49 transport.close() 50 51 return await connect_to_robomaster(ip) 52 53 54async def connect_to_robomaster(ip: str) -> "RoboMasterClient": 55 """ 56 Connects to a RoboMaster robot at the given IP address. 57 58 If the host computer (the computer the code is running on) is connected directly to robot's WiFi, 59 and not over another network, pass in `DIRECT_CONNECT_IP`: 60 61 ```py 62 import asyncio 63 from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP 64 65 async def main(): 66 async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot: 67 # Do stuff with robot 68 pass 69 70 asyncio.run(main()) 71 ``` 72 """ 73 74 command_socket = await asyncio.open_connection(ip, _CONTROL_PORT) 75 76 client = RoboMasterClient(command_socket) 77 await client.async_init() 78 79 return client 80 81 82def command_to_str(command: str | int | float | bool | Enum) -> str: 83 """ 84 Converts a data type to a string that can be sent to the robot. 85 """ 86 87 match command: 88 case str(): 89 return command 90 case bool(): 91 return "on" if command else "off" 92 case int() | float(): 93 return str(command) 94 case Enum(): 95 if isinstance(command.value, str) or isinstance(command.value, int) or isinstance(command.value, float) \ 96 or isinstance(command.value, bool) or isinstance(command.value, Enum): 97 return command_to_str(command.value) 98 else: 99 raise Exception("Invalid enum type") 100 101 102class RoboMasterClient: 103 line: Feed[Line] 104 """ 105 A feed of line recognition data. 106 107 Line recognition has to be enabled first with `RoboMasterClient.set_line_recognition_enabled` and 108 the line colour has to be set with `RoboMasterClient.set_line_recognition_colour`. 109 """ 110 111 rotation: Feed[ChassisRotation] 112 """ 113 A feed of the robot's rotation. 114 115 This has to be enabled first with `RoboMasterClient.set_rotation_push_rate`. 116 """ 117 118 attitude: Feed[ChassisAttitude] 119 """ 120 A feed of the robot's attitude. 121 122 This has to be enabled first with `RoboMasterClient.set_attitude_push_rate`. 123 """ 124 125 status: Feed[ChassisStatus] 126 """ 127 A feed of the robot's status. 128 129 This has to be enabled first with `RoboMasterClient.set_status_push_rate`. 130 """ 131 132 _line_enabled: bool 133 _rotation_frequency: Frequency 134 _attitude_frequency: Frequency 135 _status_frequency: Frequency 136 137 _conn: tuple[asyncio.StreamReader, asyncio.StreamWriter] 138 _command_lock: asyncio.Lock 139 _push_receiver: PushReceiver 140 _handle_push_task: asyncio.Task[None] 141 _exiting: bool 142 143 def __init__(self, command_conn: tuple[asyncio.StreamReader, asyncio.StreamWriter]) -> None: 144 """ 145 This constructor shouldn't be used directly. Use `connect_to_robomaster` instead. 146 """ 147 148 self.line = Feed() 149 self.rotation = Feed() 150 self.attitude = Feed() 151 self.status = Feed() 152 153 self._line_enabled = False 154 self._rotation_frequency = Frequency.Off 155 self._attitude_frequency = Frequency.Off 156 self._status_frequency = Frequency.Off 157 158 self._conn = command_conn 159 self._command_lock = asyncio.Lock() 160 self._push_receiver = PushReceiver(_PUSH_PORT) 161 self._handle_push_task = asyncio.create_task(self._handle_push(self._push_receiver.feed)) 162 self._exiting = False 163 164 async def async_init(self): 165 """ 166 Performs any asynchronous initialisation required. Should be called 167 immediately after the class is constructed. `connect_to_robomaster` 168 handles this automatically. 169 """ 170 await self.do("command") 171 172 await self.set_line_recognition_enabled(False) 173 await self.set_rotation_push_rate(Frequency.Off) 174 await self.set_attitude_push_rate(Frequency.Off) 175 await self.set_status_push_rate(Frequency.Off) 176 177 async def __aenter__(self): 178 return self 179 180 async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException], tb: Optional[TracebackType]): 181 await self.do("quit") 182 183 self._exiting = True 184 185 self._handle_push_task.cancel() 186 self._conn[1].close() 187 188 async def _handle_push(self, feed: Feed[Response]): 189 while True: 190 response = await feed.get() 191 192 topic = response.data[0] 193 subject = response.data[2] 194 parseData = Response(response.data[3:]) 195 196 if topic == "chassis": 197 if subject == "position": self.rotation.feed(ChassisRotation.parse(parseData)) 198 elif subject == "attitude": self.attitude.feed(ChassisAttitude.parse(parseData)) 199 elif subject == "status": self.status.feed(ChassisStatus.parse(parseData)) 200 else: print(f"Unknown chassis subject: {subject}") 201 elif topic == "AI": 202 if subject == "line": self.line.feed(Line.parse(parseData)) 203 else: print(f"Unknown AI subject: {subject}") 204 else: 205 print(f"Unknown topic: {topic}") 206 207 async def _do(self, command: str): 208 async with self._command_lock: 209 if self._exiting: 210 raise Exception("Client is exiting.") 211 212 self._conn[1].write(command.encode()) 213 await self._conn[1].drain() 214 215 data = await self._conn[0].readuntil(b";") 216 217 return Response(data.decode()[:-1].split(" ")) 218 219 async def do(self, *command_data: str | int | float | bool | Enum) -> Response: 220 """ 221 Low level command sending. Every function that influences the robot's behaviour 222 uses this internally. 223 224 It's better to use the higher level functions instead of this. 225 226 Arguments: 227 - *command_data: The parts of the command to send to the robot. These can be strings, numbers, booleans or enums. 228 All of these will be converted to strings using `command_to_str`. 229 """ 230 231 command = ' '.join(map(command_to_str, command_data)) + ';' 232 return await self._do(command) 233 234 async def get_version(self) -> str: 235 return (await self.do("version")).get_str(0) 236 237 async def set_speed(self, forwards: float, right: float, clockwise: float = 0) -> None: 238 """ 239 Causes the robot to move with the given speeds indefinitely. 240 241 Arguments: 242 - forwards: The forwards speed in m/s 243 - right: The right speed in m/s 244 - clockwise: The rotational speed in degrees/s clockwise 245 """ 246 247 assert -3.5 <= forwards <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s" 248 assert -3.5 <= right <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s" 249 assert -600 <= clockwise <= 600, "Rotation speed must be between -600 and 600 degrees/s" 250 251 await self.do("chassis", "speed", "x", forwards, "y", right, "z", clockwise) 252 253 async def set_wheel_speed(self, front_right: float, front_left: float, back_left: float, back_right: float) -> None: 254 """ 255 All speeds are in rpm. 256 """ 257 258 assert -1000 <= front_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 259 assert -1000 <= front_left <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 260 assert -1000 <= back_left <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 261 assert -1000 <= back_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 262 263 await self.do( 264 "chassis", "wheel", 265 "w1", front_right, 266 "w2", front_left, 267 "w3", back_left, 268 "w4", back_right 269 ) 270 271 async def set_left_right_wheel_speeds(self, left: float, right: float) -> None: 272 """ 273 All speeds are in rpm. 274 """ 275 await self.set_wheel_speed(right, left, left, right) 276 277 async def set_all_wheel_speeds(self, speed: float) -> None: 278 """ 279 All speeds are in rpm. 280 """ 281 await self.set_wheel_speed(speed, speed, speed, speed) 282 283 async def get_speed(self) -> ChassisSpeed: 284 return ChassisSpeed.parse(await self.do("chassis", "speed", "?")) 285 286 async def rotate(self, clockwise: float, rotation_speed: float | None = None, timeout: float = 10) -> None: 287 """ 288 **NOTE:** This function constantly polls the robot for its rotation and 289 may starve out other areas in the program trying to send messages to 290 the robot. In general, due to the serial nature of the communication 291 protocol, any communication with the robot should not be largely 292 parallel. 293 294 Arguments: 295 - clockwise: The degrees clockwise to rotate. 296 - rotation_speed: The speed to rotate, in degrees/s. (Must be positive.) 297 - timeout: How long to wait for the turn to finish (in seconds). 298 """ 299 300 assert -1800 <= clockwise <= 1800, "Rotation must be between -1800 and 1800 degrees" 301 302 if rotation_speed is not None: 303 assert 0 <= rotation_speed <= 600, "Rotation speed must be positive and less than 600 degrees/s" 304 305 args = [ 306 "chassis", "move", 307 "x", 0, 308 "y", 0, 309 "z", clockwise 310 ] 311 312 if rotation_speed is not None: 313 args.append("vz") 314 args.append(rotation_speed) 315 316 await self.do(*args) 317 318 start_time = time() 319 while time() - start_time <= timeout: 320 status = await self.get_status() 321 322 if status.static: 323 return 324 325 await asyncio.sleep(0.02) # 50Hz 326 327 raise TimeoutError() 328 329 async def get_rotation(self) -> ChassisRotation: 330 return ChassisRotation.parse(await self.do("chassis", "position", "?")) 331 332 async def get_attitude(self) -> ChassisAttitude: 333 return ChassisAttitude.parse(await self.do("chassis", "attitude", "?")) 334 335 async def get_status(self) -> ChassisStatus: 336 return ChassisStatus.parse(await self.do("chassis", "status", "?")) 337 338 async def set_rotation_push_rate(self, freq: Frequency) -> None: 339 """ 340 Sets the push rate of the chassis rotation in Hz. Use `Frequency.Off` to disable. 341 342 Access the rotation data through the `RoboMasterClient.rotation` feed. 343 """ 344 345 # This sets the "position" push frequency because 346 # that is where the rotation is stored. 347 if freq == Frequency.Off: 348 await self.do("chassis", "push", "position", False) 349 else: 350 await self.do("chassis", "push", "position", True, "pfreq", freq) 351 352 self._rotation_frequency = freq 353 354 def get_rotation_push_rate(self) -> Frequency: 355 """ 356 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 357 """ 358 return self._rotation_frequency 359 360 async def set_attitude_push_rate(self, freq: Frequency) -> None: 361 """ 362 Sets the push rate of the chassis attitude in Hz. Use `Frequency.Off` to disable. 363 364 Access the attitude data through the `RoboMasterClient.attitude` feed. 365 """ 366 367 if freq == Frequency.Off: 368 await self.do("chassis", "push", "attitude", False) 369 else: 370 await self.do("chassis", "push", "attitude", True, "afreq", freq) 371 372 self._attitude_frequency = freq 373 374 def get_attitude_push_rate(self) -> Frequency: 375 """ 376 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 377 """ 378 return self._attitude_frequency 379 380 async def set_status_push_rate(self, freq: Frequency) -> None: 381 """ 382 Sets the push rate of the chassis status in Hz. Use `Frequency.Off` to disable. 383 384 Access the status data through the `RoboMasterClient.status` feed. 385 """ 386 387 if freq == Frequency.Off: 388 await self.do("chassis", "push", "status", False) 389 else: 390 await self.do("chassis", "push", "status", True, "sfreq", freq) 391 392 self._status_frequency = freq 393 394 def get_status_push_rate(self) -> Frequency: 395 """ 396 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 397 """ 398 return self._status_frequency 399 400 async def set_ir_enabled(self, enabled: bool = True) -> None: 401 await self.do("ir_distance_sensor", "measure", enabled) 402 403 async def get_ir_distance(self, ir_id: int) -> float: 404 """ 405 The IR sensor has to be enabled first with `set_ir_enabled`. 406 407 Arguments: 408 - ir_id: The ID of the IR sensor. There appears to be only one IR sensor, so this should always be `1`. 409 410 Returns: 411 - The distance in centimeters. 412 """ 413 return (await self.do("ir_distance_sensor", "distance", ir_id, "?")).get_float(0) 414 415 async def move_arm(self, x_dist: float, y_dist: float) -> None: 416 """ 417 Moves the robotic arm by the given distance relative to its current position. 418 To move to a specific position, use `set_arm_position` instead. 419 420 The units are unknown. Physically move the arm around and record the results from 421 `get_arm_position` to determine the desired inputs for this function. 422 """ 423 await self.do("robotic_arm", "move", "x", x_dist, "y", y_dist) 424 425 async def set_arm_position(self, x: float, y: float) -> None: 426 """ 427 Moves the robotic arm to the given position. To move relative to the current position, 428 use `move_arm` instead. 429 430 The units are unknown. Physically move the arm to the desired position and then use 431 `get_arm_position` to find the desired inputs for this function. The origin for the 432 x and y values appears to be different for every robot, so values for one robot will 433 not carry over to another. 434 """ 435 await self.do("robotic_arm", "moveto", "x", x, "y", y) 436 437 async def get_arm_position(self) -> tuple[float, float]: 438 """ 439 Returns: 440 - The current position of the robotic arm in unknown units. The format is `(x, y)`. 441 """ 442 443 response = await self.do("robotic_arm", "position", "?") 444 return (response.get_float(0), response.get_float(1)) 445 446 # TODO Allow different levels of force when opening? 447 # https://robomaster-dev.readthedocs.io/en/latest/text_sdk/protocol_api.html#mechanical-gripper-opening-control 448 async def open_gripper(self) -> None: 449 await self.do("robotic_gripper", "open", 1) 450 451 # TODO Allow different levels of force when closing? 452 # https://robomaster-dev.readthedocs.io/en/latest/text_sdk/protocol_api.html#mechanical-gripper-opening-control 453 async def close_gripper(self) -> None: 454 """ 455 This closes the gripper until it encounters any resistance. If there is an object in 456 the way, the gripper will close until it is tightly gripping the object, but it will 457 not damage itself. 458 """ 459 await self.do("robotic_gripper", "close", 1) 460 461 async def get_gripper_status(self) -> GripperStatus: 462 """ 463 Gets whether the gripper is Open, Closed, or Partially Open. 464 """ 465 return (await self.do("robotic_gripper", "status", "?")).get_enum(0, GripperStatus) 466 467 async def set_line_recognition_colour(self, colour: LineColour) -> None: 468 await self.do("AI", "attribute", "line_color", colour) 469 470 async def set_line_recognition_enabled(self, enabled: bool = True) -> None: 471 """ 472 Access the line data through the `RoboMasterClient.line` feed. 473 """ 474 await self.do("AI", "push", "line", enabled) 475 476 self._line_enabled = enabled 477 478 def get_line_recognition_enabled(self) -> bool: 479 return self._line_enabled 480 481 async def set_leds(self, leds: LedPosition, colour: Colour, effect: LedEffect = LedEffect.Solid) -> None: 482 """ 483 Sets the given LEDs to display a colour with a certain effect. Use 484 `.data.LedEffect.Off` to turn the leds off. 485 486 See `.data.LedPosition` to see how to pass in multiple leds at once. 487 488 See `.data.LedEffect` to see the available effects. 489 """ 490 491 async def set_led(led: str): 492 await self.do("led", "control", "comp", led, "r", colour.r, "g", colour.g, "b", colour.b, "effect", effect) 493 494 if leds == LedPosition.All: 495 await set_led("bottom_all") 496 else: 497 tasks: list[Coroutine[Any, Any, None]] = [] 498 499 if LedPosition.Front in leds: 500 tasks.append(set_led("bottom_front")) 501 502 if LedPosition.Back in leds: 503 tasks.append(set_led("bottom_back")) 504 505 if LedPosition.Left in leds: 506 tasks.append(set_led("bottom_left")) 507 508 if LedPosition.Right in leds: 509 tasks.append(set_led("bottom_right")) 510 511 await asyncio.gather(*tasks)
20async def auto_connect_to_robomaster() -> "RoboMasterClient": 21 """ 22 It is preferable to use `connect_to_robomaster` where possible, especially if 23 you are directly connected to the WiFi the robot provides (use `DIRECT_CONNECT_IP` 24 in that case.) This is due to it not accidentally getting confused when there 25 are multiple robots (see below). 26 27 Automatically finds an unpaired RoboMaster robot on the network and connects to it. 28 29 This assumes there will only be ONE unpaired RoboMaster robot on the network, or you 30 may end up connecting to a random robot. 31 32 ```py 33 import asyncio 34 from sbhs_robomaster import auto_connect_to_robomaster 35 36 async def main(): 37 async with await auto_connect_to_robomaster() as robot: 38 # Do stuff with robot 39 pass 40 41 asyncio.run(main()) 42 ``` 43 """ 44 45 loop = asyncio.get_event_loop() 46 transport, protocol = await loop.create_datagram_endpoint(AutoConnectProtocol, ("", IP_PORT)) 47 48 ip = await protocol.ip_future 49 50 transport.close() 51 52 return await connect_to_robomaster(ip)
It is preferable to use connect_to_robomaster where possible, especially if
you are directly connected to the WiFi the robot provides (use DIRECT_CONNECT_IP
in that case.) This is due to it not accidentally getting confused when there
are multiple robots (see below).
Automatically finds an unpaired RoboMaster robot on the network and connects to it.
This assumes there will only be ONE unpaired RoboMaster robot on the network, or you may end up connecting to a random robot.
import asyncio
from sbhs_robomaster import auto_connect_to_robomaster
async def main():
async with await auto_connect_to_robomaster() as robot:
# Do stuff with robot
pass
asyncio.run(main())
55async def connect_to_robomaster(ip: str) -> "RoboMasterClient": 56 """ 57 Connects to a RoboMaster robot at the given IP address. 58 59 If the host computer (the computer the code is running on) is connected directly to robot's WiFi, 60 and not over another network, pass in `DIRECT_CONNECT_IP`: 61 62 ```py 63 import asyncio 64 from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP 65 66 async def main(): 67 async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot: 68 # Do stuff with robot 69 pass 70 71 asyncio.run(main()) 72 ``` 73 """ 74 75 command_socket = await asyncio.open_connection(ip, _CONTROL_PORT) 76 77 client = RoboMasterClient(command_socket) 78 await client.async_init() 79 80 return client
Connects to a RoboMaster robot at the given IP address.
If the host computer (the computer the code is running on) is connected directly to robot's WiFi,
and not over another network, pass in DIRECT_CONNECT_IP:
import asyncio
from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP
async def main():
async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot:
# Do stuff with robot
pass
asyncio.run(main())
83def command_to_str(command: str | int | float | bool | Enum) -> str: 84 """ 85 Converts a data type to a string that can be sent to the robot. 86 """ 87 88 match command: 89 case str(): 90 return command 91 case bool(): 92 return "on" if command else "off" 93 case int() | float(): 94 return str(command) 95 case Enum(): 96 if isinstance(command.value, str) or isinstance(command.value, int) or isinstance(command.value, float) \ 97 or isinstance(command.value, bool) or isinstance(command.value, Enum): 98 return command_to_str(command.value) 99 else: 100 raise Exception("Invalid enum type")
Converts a data type to a string that can be sent to the robot.
103class RoboMasterClient: 104 line: Feed[Line] 105 """ 106 A feed of line recognition data. 107 108 Line recognition has to be enabled first with `RoboMasterClient.set_line_recognition_enabled` and 109 the line colour has to be set with `RoboMasterClient.set_line_recognition_colour`. 110 """ 111 112 rotation: Feed[ChassisRotation] 113 """ 114 A feed of the robot's rotation. 115 116 This has to be enabled first with `RoboMasterClient.set_rotation_push_rate`. 117 """ 118 119 attitude: Feed[ChassisAttitude] 120 """ 121 A feed of the robot's attitude. 122 123 This has to be enabled first with `RoboMasterClient.set_attitude_push_rate`. 124 """ 125 126 status: Feed[ChassisStatus] 127 """ 128 A feed of the robot's status. 129 130 This has to be enabled first with `RoboMasterClient.set_status_push_rate`. 131 """ 132 133 _line_enabled: bool 134 _rotation_frequency: Frequency 135 _attitude_frequency: Frequency 136 _status_frequency: Frequency 137 138 _conn: tuple[asyncio.StreamReader, asyncio.StreamWriter] 139 _command_lock: asyncio.Lock 140 _push_receiver: PushReceiver 141 _handle_push_task: asyncio.Task[None] 142 _exiting: bool 143 144 def __init__(self, command_conn: tuple[asyncio.StreamReader, asyncio.StreamWriter]) -> None: 145 """ 146 This constructor shouldn't be used directly. Use `connect_to_robomaster` instead. 147 """ 148 149 self.line = Feed() 150 self.rotation = Feed() 151 self.attitude = Feed() 152 self.status = Feed() 153 154 self._line_enabled = False 155 self._rotation_frequency = Frequency.Off 156 self._attitude_frequency = Frequency.Off 157 self._status_frequency = Frequency.Off 158 159 self._conn = command_conn 160 self._command_lock = asyncio.Lock() 161 self._push_receiver = PushReceiver(_PUSH_PORT) 162 self._handle_push_task = asyncio.create_task(self._handle_push(self._push_receiver.feed)) 163 self._exiting = False 164 165 async def async_init(self): 166 """ 167 Performs any asynchronous initialisation required. Should be called 168 immediately after the class is constructed. `connect_to_robomaster` 169 handles this automatically. 170 """ 171 await self.do("command") 172 173 await self.set_line_recognition_enabled(False) 174 await self.set_rotation_push_rate(Frequency.Off) 175 await self.set_attitude_push_rate(Frequency.Off) 176 await self.set_status_push_rate(Frequency.Off) 177 178 async def __aenter__(self): 179 return self 180 181 async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException], tb: Optional[TracebackType]): 182 await self.do("quit") 183 184 self._exiting = True 185 186 self._handle_push_task.cancel() 187 self._conn[1].close() 188 189 async def _handle_push(self, feed: Feed[Response]): 190 while True: 191 response = await feed.get() 192 193 topic = response.data[0] 194 subject = response.data[2] 195 parseData = Response(response.data[3:]) 196 197 if topic == "chassis": 198 if subject == "position": self.rotation.feed(ChassisRotation.parse(parseData)) 199 elif subject == "attitude": self.attitude.feed(ChassisAttitude.parse(parseData)) 200 elif subject == "status": self.status.feed(ChassisStatus.parse(parseData)) 201 else: print(f"Unknown chassis subject: {subject}") 202 elif topic == "AI": 203 if subject == "line": self.line.feed(Line.parse(parseData)) 204 else: print(f"Unknown AI subject: {subject}") 205 else: 206 print(f"Unknown topic: {topic}") 207 208 async def _do(self, command: str): 209 async with self._command_lock: 210 if self._exiting: 211 raise Exception("Client is exiting.") 212 213 self._conn[1].write(command.encode()) 214 await self._conn[1].drain() 215 216 data = await self._conn[0].readuntil(b";") 217 218 return Response(data.decode()[:-1].split(" ")) 219 220 async def do(self, *command_data: str | int | float | bool | Enum) -> Response: 221 """ 222 Low level command sending. Every function that influences the robot's behaviour 223 uses this internally. 224 225 It's better to use the higher level functions instead of this. 226 227 Arguments: 228 - *command_data: The parts of the command to send to the robot. These can be strings, numbers, booleans or enums. 229 All of these will be converted to strings using `command_to_str`. 230 """ 231 232 command = ' '.join(map(command_to_str, command_data)) + ';' 233 return await self._do(command) 234 235 async def get_version(self) -> str: 236 return (await self.do("version")).get_str(0) 237 238 async def set_speed(self, forwards: float, right: float, clockwise: float = 0) -> None: 239 """ 240 Causes the robot to move with the given speeds indefinitely. 241 242 Arguments: 243 - forwards: The forwards speed in m/s 244 - right: The right speed in m/s 245 - clockwise: The rotational speed in degrees/s clockwise 246 """ 247 248 assert -3.5 <= forwards <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s" 249 assert -3.5 <= right <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s" 250 assert -600 <= clockwise <= 600, "Rotation speed must be between -600 and 600 degrees/s" 251 252 await self.do("chassis", "speed", "x", forwards, "y", right, "z", clockwise) 253 254 async def set_wheel_speed(self, front_right: float, front_left: float, back_left: float, back_right: float) -> None: 255 """ 256 All speeds are in rpm. 257 """ 258 259 assert -1000 <= front_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 260 assert -1000 <= front_left <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 261 assert -1000 <= back_left <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 262 assert -1000 <= back_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 263 264 await self.do( 265 "chassis", "wheel", 266 "w1", front_right, 267 "w2", front_left, 268 "w3", back_left, 269 "w4", back_right 270 ) 271 272 async def set_left_right_wheel_speeds(self, left: float, right: float) -> None: 273 """ 274 All speeds are in rpm. 275 """ 276 await self.set_wheel_speed(right, left, left, right) 277 278 async def set_all_wheel_speeds(self, speed: float) -> None: 279 """ 280 All speeds are in rpm. 281 """ 282 await self.set_wheel_speed(speed, speed, speed, speed) 283 284 async def get_speed(self) -> ChassisSpeed: 285 return ChassisSpeed.parse(await self.do("chassis", "speed", "?")) 286 287 async def rotate(self, clockwise: float, rotation_speed: float | None = None, timeout: float = 10) -> None: 288 """ 289 **NOTE:** This function constantly polls the robot for its rotation and 290 may starve out other areas in the program trying to send messages to 291 the robot. In general, due to the serial nature of the communication 292 protocol, any communication with the robot should not be largely 293 parallel. 294 295 Arguments: 296 - clockwise: The degrees clockwise to rotate. 297 - rotation_speed: The speed to rotate, in degrees/s. (Must be positive.) 298 - timeout: How long to wait for the turn to finish (in seconds). 299 """ 300 301 assert -1800 <= clockwise <= 1800, "Rotation must be between -1800 and 1800 degrees" 302 303 if rotation_speed is not None: 304 assert 0 <= rotation_speed <= 600, "Rotation speed must be positive and less than 600 degrees/s" 305 306 args = [ 307 "chassis", "move", 308 "x", 0, 309 "y", 0, 310 "z", clockwise 311 ] 312 313 if rotation_speed is not None: 314 args.append("vz") 315 args.append(rotation_speed) 316 317 await self.do(*args) 318 319 start_time = time() 320 while time() - start_time <= timeout: 321 status = await self.get_status() 322 323 if status.static: 324 return 325 326 await asyncio.sleep(0.02) # 50Hz 327 328 raise TimeoutError() 329 330 async def get_rotation(self) -> ChassisRotation: 331 return ChassisRotation.parse(await self.do("chassis", "position", "?")) 332 333 async def get_attitude(self) -> ChassisAttitude: 334 return ChassisAttitude.parse(await self.do("chassis", "attitude", "?")) 335 336 async def get_status(self) -> ChassisStatus: 337 return ChassisStatus.parse(await self.do("chassis", "status", "?")) 338 339 async def set_rotation_push_rate(self, freq: Frequency) -> None: 340 """ 341 Sets the push rate of the chassis rotation in Hz. Use `Frequency.Off` to disable. 342 343 Access the rotation data through the `RoboMasterClient.rotation` feed. 344 """ 345 346 # This sets the "position" push frequency because 347 # that is where the rotation is stored. 348 if freq == Frequency.Off: 349 await self.do("chassis", "push", "position", False) 350 else: 351 await self.do("chassis", "push", "position", True, "pfreq", freq) 352 353 self._rotation_frequency = freq 354 355 def get_rotation_push_rate(self) -> Frequency: 356 """ 357 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 358 """ 359 return self._rotation_frequency 360 361 async def set_attitude_push_rate(self, freq: Frequency) -> None: 362 """ 363 Sets the push rate of the chassis attitude in Hz. Use `Frequency.Off` to disable. 364 365 Access the attitude data through the `RoboMasterClient.attitude` feed. 366 """ 367 368 if freq == Frequency.Off: 369 await self.do("chassis", "push", "attitude", False) 370 else: 371 await self.do("chassis", "push", "attitude", True, "afreq", freq) 372 373 self._attitude_frequency = freq 374 375 def get_attitude_push_rate(self) -> Frequency: 376 """ 377 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 378 """ 379 return self._attitude_frequency 380 381 async def set_status_push_rate(self, freq: Frequency) -> None: 382 """ 383 Sets the push rate of the chassis status in Hz. Use `Frequency.Off` to disable. 384 385 Access the status data through the `RoboMasterClient.status` feed. 386 """ 387 388 if freq == Frequency.Off: 389 await self.do("chassis", "push", "status", False) 390 else: 391 await self.do("chassis", "push", "status", True, "sfreq", freq) 392 393 self._status_frequency = freq 394 395 def get_status_push_rate(self) -> Frequency: 396 """ 397 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 398 """ 399 return self._status_frequency 400 401 async def set_ir_enabled(self, enabled: bool = True) -> None: 402 await self.do("ir_distance_sensor", "measure", enabled) 403 404 async def get_ir_distance(self, ir_id: int) -> float: 405 """ 406 The IR sensor has to be enabled first with `set_ir_enabled`. 407 408 Arguments: 409 - ir_id: The ID of the IR sensor. There appears to be only one IR sensor, so this should always be `1`. 410 411 Returns: 412 - The distance in centimeters. 413 """ 414 return (await self.do("ir_distance_sensor", "distance", ir_id, "?")).get_float(0) 415 416 async def move_arm(self, x_dist: float, y_dist: float) -> None: 417 """ 418 Moves the robotic arm by the given distance relative to its current position. 419 To move to a specific position, use `set_arm_position` instead. 420 421 The units are unknown. Physically move the arm around and record the results from 422 `get_arm_position` to determine the desired inputs for this function. 423 """ 424 await self.do("robotic_arm", "move", "x", x_dist, "y", y_dist) 425 426 async def set_arm_position(self, x: float, y: float) -> None: 427 """ 428 Moves the robotic arm to the given position. To move relative to the current position, 429 use `move_arm` instead. 430 431 The units are unknown. Physically move the arm to the desired position and then use 432 `get_arm_position` to find the desired inputs for this function. The origin for the 433 x and y values appears to be different for every robot, so values for one robot will 434 not carry over to another. 435 """ 436 await self.do("robotic_arm", "moveto", "x", x, "y", y) 437 438 async def get_arm_position(self) -> tuple[float, float]: 439 """ 440 Returns: 441 - The current position of the robotic arm in unknown units. The format is `(x, y)`. 442 """ 443 444 response = await self.do("robotic_arm", "position", "?") 445 return (response.get_float(0), response.get_float(1)) 446 447 # TODO Allow different levels of force when opening? 448 # https://robomaster-dev.readthedocs.io/en/latest/text_sdk/protocol_api.html#mechanical-gripper-opening-control 449 async def open_gripper(self) -> None: 450 await self.do("robotic_gripper", "open", 1) 451 452 # TODO Allow different levels of force when closing? 453 # https://robomaster-dev.readthedocs.io/en/latest/text_sdk/protocol_api.html#mechanical-gripper-opening-control 454 async def close_gripper(self) -> None: 455 """ 456 This closes the gripper until it encounters any resistance. If there is an object in 457 the way, the gripper will close until it is tightly gripping the object, but it will 458 not damage itself. 459 """ 460 await self.do("robotic_gripper", "close", 1) 461 462 async def get_gripper_status(self) -> GripperStatus: 463 """ 464 Gets whether the gripper is Open, Closed, or Partially Open. 465 """ 466 return (await self.do("robotic_gripper", "status", "?")).get_enum(0, GripperStatus) 467 468 async def set_line_recognition_colour(self, colour: LineColour) -> None: 469 await self.do("AI", "attribute", "line_color", colour) 470 471 async def set_line_recognition_enabled(self, enabled: bool = True) -> None: 472 """ 473 Access the line data through the `RoboMasterClient.line` feed. 474 """ 475 await self.do("AI", "push", "line", enabled) 476 477 self._line_enabled = enabled 478 479 def get_line_recognition_enabled(self) -> bool: 480 return self._line_enabled 481 482 async def set_leds(self, leds: LedPosition, colour: Colour, effect: LedEffect = LedEffect.Solid) -> None: 483 """ 484 Sets the given LEDs to display a colour with a certain effect. Use 485 `.data.LedEffect.Off` to turn the leds off. 486 487 See `.data.LedPosition` to see how to pass in multiple leds at once. 488 489 See `.data.LedEffect` to see the available effects. 490 """ 491 492 async def set_led(led: str): 493 await self.do("led", "control", "comp", led, "r", colour.r, "g", colour.g, "b", colour.b, "effect", effect) 494 495 if leds == LedPosition.All: 496 await set_led("bottom_all") 497 else: 498 tasks: list[Coroutine[Any, Any, None]] = [] 499 500 if LedPosition.Front in leds: 501 tasks.append(set_led("bottom_front")) 502 503 if LedPosition.Back in leds: 504 tasks.append(set_led("bottom_back")) 505 506 if LedPosition.Left in leds: 507 tasks.append(set_led("bottom_left")) 508 509 if LedPosition.Right in leds: 510 tasks.append(set_led("bottom_right")) 511 512 await asyncio.gather(*tasks)
144 def __init__(self, command_conn: tuple[asyncio.StreamReader, asyncio.StreamWriter]) -> None: 145 """ 146 This constructor shouldn't be used directly. Use `connect_to_robomaster` instead. 147 """ 148 149 self.line = Feed() 150 self.rotation = Feed() 151 self.attitude = Feed() 152 self.status = Feed() 153 154 self._line_enabled = False 155 self._rotation_frequency = Frequency.Off 156 self._attitude_frequency = Frequency.Off 157 self._status_frequency = Frequency.Off 158 159 self._conn = command_conn 160 self._command_lock = asyncio.Lock() 161 self._push_receiver = PushReceiver(_PUSH_PORT) 162 self._handle_push_task = asyncio.create_task(self._handle_push(self._push_receiver.feed)) 163 self._exiting = False
This constructor shouldn't be used directly. Use connect_to_robomaster instead.
A feed of line recognition data.
Line recognition has to be enabled first with RoboMasterClient.set_line_recognition_enabled and
the line colour has to be set with RoboMasterClient.set_line_recognition_colour.
A feed of the robot's rotation.
This has to be enabled first with RoboMasterClient.set_rotation_push_rate.
A feed of the robot's attitude.
This has to be enabled first with RoboMasterClient.set_attitude_push_rate.
A feed of the robot's status.
This has to be enabled first with RoboMasterClient.set_status_push_rate.
165 async def async_init(self): 166 """ 167 Performs any asynchronous initialisation required. Should be called 168 immediately after the class is constructed. `connect_to_robomaster` 169 handles this automatically. 170 """ 171 await self.do("command") 172 173 await self.set_line_recognition_enabled(False) 174 await self.set_rotation_push_rate(Frequency.Off) 175 await self.set_attitude_push_rate(Frequency.Off) 176 await self.set_status_push_rate(Frequency.Off)
Performs any asynchronous initialisation required. Should be called
immediately after the class is constructed. connect_to_robomaster
handles this automatically.
220 async def do(self, *command_data: str | int | float | bool | Enum) -> Response: 221 """ 222 Low level command sending. Every function that influences the robot's behaviour 223 uses this internally. 224 225 It's better to use the higher level functions instead of this. 226 227 Arguments: 228 - *command_data: The parts of the command to send to the robot. These can be strings, numbers, booleans or enums. 229 All of these will be converted to strings using `command_to_str`. 230 """ 231 232 command = ' '.join(map(command_to_str, command_data)) + ';' 233 return await self._do(command)
Low level command sending. Every function that influences the robot's behaviour uses this internally.
It's better to use the higher level functions instead of this.
Arguments:
- *command_data: The parts of the command to send to the robot. These can be strings, numbers, booleans or enums.
All of these will be converted to strings using
command_to_str.
238 async def set_speed(self, forwards: float, right: float, clockwise: float = 0) -> None: 239 """ 240 Causes the robot to move with the given speeds indefinitely. 241 242 Arguments: 243 - forwards: The forwards speed in m/s 244 - right: The right speed in m/s 245 - clockwise: The rotational speed in degrees/s clockwise 246 """ 247 248 assert -3.5 <= forwards <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s" 249 assert -3.5 <= right <= 3.5, "Movement speed must be between -3.5 and 3.5 m/s" 250 assert -600 <= clockwise <= 600, "Rotation speed must be between -600 and 600 degrees/s" 251 252 await self.do("chassis", "speed", "x", forwards, "y", right, "z", clockwise)
Causes the robot to move with the given speeds indefinitely.
Arguments:
- forwards: The forwards speed in m/s
- right: The right speed in m/s
- clockwise: The rotational speed in degrees/s clockwise
254 async def set_wheel_speed(self, front_right: float, front_left: float, back_left: float, back_right: float) -> None: 255 """ 256 All speeds are in rpm. 257 """ 258 259 assert -1000 <= front_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 260 assert -1000 <= front_left <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 261 assert -1000 <= back_left <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 262 assert -1000 <= back_right <= 1000, "Wheel speeds must be between -1000 and 1000 rpm" 263 264 await self.do( 265 "chassis", "wheel", 266 "w1", front_right, 267 "w2", front_left, 268 "w3", back_left, 269 "w4", back_right 270 )
All speeds are in rpm.
272 async def set_left_right_wheel_speeds(self, left: float, right: float) -> None: 273 """ 274 All speeds are in rpm. 275 """ 276 await self.set_wheel_speed(right, left, left, right)
All speeds are in rpm.
278 async def set_all_wheel_speeds(self, speed: float) -> None: 279 """ 280 All speeds are in rpm. 281 """ 282 await self.set_wheel_speed(speed, speed, speed, speed)
All speeds are in rpm.
287 async def rotate(self, clockwise: float, rotation_speed: float | None = None, timeout: float = 10) -> None: 288 """ 289 **NOTE:** This function constantly polls the robot for its rotation and 290 may starve out other areas in the program trying to send messages to 291 the robot. In general, due to the serial nature of the communication 292 protocol, any communication with the robot should not be largely 293 parallel. 294 295 Arguments: 296 - clockwise: The degrees clockwise to rotate. 297 - rotation_speed: The speed to rotate, in degrees/s. (Must be positive.) 298 - timeout: How long to wait for the turn to finish (in seconds). 299 """ 300 301 assert -1800 <= clockwise <= 1800, "Rotation must be between -1800 and 1800 degrees" 302 303 if rotation_speed is not None: 304 assert 0 <= rotation_speed <= 600, "Rotation speed must be positive and less than 600 degrees/s" 305 306 args = [ 307 "chassis", "move", 308 "x", 0, 309 "y", 0, 310 "z", clockwise 311 ] 312 313 if rotation_speed is not None: 314 args.append("vz") 315 args.append(rotation_speed) 316 317 await self.do(*args) 318 319 start_time = time() 320 while time() - start_time <= timeout: 321 status = await self.get_status() 322 323 if status.static: 324 return 325 326 await asyncio.sleep(0.02) # 50Hz 327 328 raise TimeoutError()
NOTE: This function constantly polls the robot for its rotation and may starve out other areas in the program trying to send messages to the robot. In general, due to the serial nature of the communication protocol, any communication with the robot should not be largely parallel.
Arguments:
- clockwise: The degrees clockwise to rotate.
- rotation_speed: The speed to rotate, in degrees/s. (Must be positive.)
- timeout: How long to wait for the turn to finish (in seconds).
339 async def set_rotation_push_rate(self, freq: Frequency) -> None: 340 """ 341 Sets the push rate of the chassis rotation in Hz. Use `Frequency.Off` to disable. 342 343 Access the rotation data through the `RoboMasterClient.rotation` feed. 344 """ 345 346 # This sets the "position" push frequency because 347 # that is where the rotation is stored. 348 if freq == Frequency.Off: 349 await self.do("chassis", "push", "position", False) 350 else: 351 await self.do("chassis", "push", "position", True, "pfreq", freq) 352 353 self._rotation_frequency = freq
Sets the push rate of the chassis rotation in Hz. Use Frequency.Off to disable.
Access the rotation data through the RoboMasterClient.rotation feed.
355 def get_rotation_push_rate(self) -> Frequency: 356 """ 357 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 358 """ 359 return self._rotation_frequency
Gets the push rate of the chassis rotation in Hz. Frequency.Off means it is disabled.
361 async def set_attitude_push_rate(self, freq: Frequency) -> None: 362 """ 363 Sets the push rate of the chassis attitude in Hz. Use `Frequency.Off` to disable. 364 365 Access the attitude data through the `RoboMasterClient.attitude` feed. 366 """ 367 368 if freq == Frequency.Off: 369 await self.do("chassis", "push", "attitude", False) 370 else: 371 await self.do("chassis", "push", "attitude", True, "afreq", freq) 372 373 self._attitude_frequency = freq
Sets the push rate of the chassis attitude in Hz. Use Frequency.Off to disable.
Access the attitude data through the RoboMasterClient.attitude feed.
375 def get_attitude_push_rate(self) -> Frequency: 376 """ 377 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 378 """ 379 return self._attitude_frequency
Gets the push rate of the chassis rotation in Hz. Frequency.Off means it is disabled.
381 async def set_status_push_rate(self, freq: Frequency) -> None: 382 """ 383 Sets the push rate of the chassis status in Hz. Use `Frequency.Off` to disable. 384 385 Access the status data through the `RoboMasterClient.status` feed. 386 """ 387 388 if freq == Frequency.Off: 389 await self.do("chassis", "push", "status", False) 390 else: 391 await self.do("chassis", "push", "status", True, "sfreq", freq) 392 393 self._status_frequency = freq
Sets the push rate of the chassis status in Hz. Use Frequency.Off to disable.
Access the status data through the RoboMasterClient.status feed.
395 def get_status_push_rate(self) -> Frequency: 396 """ 397 Gets the push rate of the chassis rotation in Hz. `Frequency.Off` means it is disabled. 398 """ 399 return self._status_frequency
Gets the push rate of the chassis rotation in Hz. Frequency.Off means it is disabled.
404 async def get_ir_distance(self, ir_id: int) -> float: 405 """ 406 The IR sensor has to be enabled first with `set_ir_enabled`. 407 408 Arguments: 409 - ir_id: The ID of the IR sensor. There appears to be only one IR sensor, so this should always be `1`. 410 411 Returns: 412 - The distance in centimeters. 413 """ 414 return (await self.do("ir_distance_sensor", "distance", ir_id, "?")).get_float(0)
The IR sensor has to be enabled first with set_ir_enabled.
Arguments:
- ir_id: The ID of the IR sensor. There appears to be only one IR sensor, so this should always be
1.
Returns:
- The distance in centimeters.
416 async def move_arm(self, x_dist: float, y_dist: float) -> None: 417 """ 418 Moves the robotic arm by the given distance relative to its current position. 419 To move to a specific position, use `set_arm_position` instead. 420 421 The units are unknown. Physically move the arm around and record the results from 422 `get_arm_position` to determine the desired inputs for this function. 423 """ 424 await self.do("robotic_arm", "move", "x", x_dist, "y", y_dist)
Moves the robotic arm by the given distance relative to its current position.
To move to a specific position, use set_arm_position instead.
The units are unknown. Physically move the arm around and record the results from
get_arm_position to determine the desired inputs for this function.
426 async def set_arm_position(self, x: float, y: float) -> None: 427 """ 428 Moves the robotic arm to the given position. To move relative to the current position, 429 use `move_arm` instead. 430 431 The units are unknown. Physically move the arm to the desired position and then use 432 `get_arm_position` to find the desired inputs for this function. The origin for the 433 x and y values appears to be different for every robot, so values for one robot will 434 not carry over to another. 435 """ 436 await self.do("robotic_arm", "moveto", "x", x, "y", y)
Moves the robotic arm to the given position. To move relative to the current position,
use move_arm instead.
The units are unknown. Physically move the arm to the desired position and then use
get_arm_position to find the desired inputs for this function. The origin for the
x and y values appears to be different for every robot, so values for one robot will
not carry over to another.
438 async def get_arm_position(self) -> tuple[float, float]: 439 """ 440 Returns: 441 - The current position of the robotic arm in unknown units. The format is `(x, y)`. 442 """ 443 444 response = await self.do("robotic_arm", "position", "?") 445 return (response.get_float(0), response.get_float(1))
Returns:
- The current position of the robotic arm in unknown units. The format is
(x, y).
454 async def close_gripper(self) -> None: 455 """ 456 This closes the gripper until it encounters any resistance. If there is an object in 457 the way, the gripper will close until it is tightly gripping the object, but it will 458 not damage itself. 459 """ 460 await self.do("robotic_gripper", "close", 1)
This closes the gripper until it encounters any resistance. If there is an object in the way, the gripper will close until it is tightly gripping the object, but it will not damage itself.
462 async def get_gripper_status(self) -> GripperStatus: 463 """ 464 Gets whether the gripper is Open, Closed, or Partially Open. 465 """ 466 return (await self.do("robotic_gripper", "status", "?")).get_enum(0, GripperStatus)
Gets whether the gripper is Open, Closed, or Partially Open.
471 async def set_line_recognition_enabled(self, enabled: bool = True) -> None: 472 """ 473 Access the line data through the `RoboMasterClient.line` feed. 474 """ 475 await self.do("AI", "push", "line", enabled) 476 477 self._line_enabled = enabled
Access the line data through the RoboMasterClient.line feed.
482 async def set_leds(self, leds: LedPosition, colour: Colour, effect: LedEffect = LedEffect.Solid) -> None: 483 """ 484 Sets the given LEDs to display a colour with a certain effect. Use 485 `.data.LedEffect.Off` to turn the leds off. 486 487 See `.data.LedPosition` to see how to pass in multiple leds at once. 488 489 See `.data.LedEffect` to see the available effects. 490 """ 491 492 async def set_led(led: str): 493 await self.do("led", "control", "comp", led, "r", colour.r, "g", colour.g, "b", colour.b, "effect", effect) 494 495 if leds == LedPosition.All: 496 await set_led("bottom_all") 497 else: 498 tasks: list[Coroutine[Any, Any, None]] = [] 499 500 if LedPosition.Front in leds: 501 tasks.append(set_led("bottom_front")) 502 503 if LedPosition.Back in leds: 504 tasks.append(set_led("bottom_back")) 505 506 if LedPosition.Left in leds: 507 tasks.append(set_led("bottom_left")) 508 509 if LedPosition.Right in leds: 510 tasks.append(set_led("bottom_right")) 511 512 await asyncio.gather(*tasks)
Sets the given LEDs to display a colour with a certain effect. Use
sbhs_robomaster.data.LedEffect.Off to turn the leds off.
See sbhs_robomaster.data.LedPosition to see how to pass in multiple leds at once.
See sbhs_robomaster.data.LedEffect to see the available effects.