sbhs_robomaster.feed
1from typing import Generic, TypeVar 2import asyncio 3 4 5FeedT = TypeVar("FeedT") 6class Feed(Generic[FeedT]): 7 """ 8 Used for expressing a constant stream of data. 9 10 To get the next piece of data, call `get`. 11 12 Example: 13 ```py 14 import asyncio 15 from sbhs_robomaster import Feed 16 17 feed = Feed() 18 19 async def send_data(): 20 for i in range(5): 21 feed.feed(i) 22 await asyncio.sleep(1) 23 24 async def receive_data(): 25 while True: 26 print(await feed.get()) 27 28 # Run both coroutines concurrently 29 asyncio.run(asyncio.gather(send_data(), receive_data())) 30 31 # Prints: 32 # 0 33 # 1 34 # 2 35 # 3 36 # 4 37 ``` 38 39 Feeds can be used to implement push-based data streams. For example, the RoboMaster robot 40 sends data over UDP to the computer. This data can be received using the `Feed` class: 41 42 ```py 43 import asyncio 44 from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP, LineColour 45 46 async def main(): 47 async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot: 48 await robot.set_line_recognition_enabled() 49 await robot.set_line_recognition_color(LineColour.Red) 50 51 while True: 52 print(await robot.line.get()) 53 54 asyncio.run(main()) 55 ``` 56 57 **NOTE:** Data sent to a feed *before* `get` is called won't be returned. This means that 58 if a feed is receiving data faster than a consumer is processing it, some of the data will 59 not be delivered to the consumer. 60 """ 61 62 _return_futures: set[asyncio.Future[FeedT]] 63 64 def __init__(self): 65 self._return_futures = set() 66 67 def feed(self, data: FeedT) -> None: 68 """ 69 Send data to all consumers of this feed. 70 """ 71 for future in self._return_futures: 72 future.set_result(data) 73 74 self._return_futures.clear() 75 76 def get(self) -> asyncio.Future[FeedT]: 77 """ 78 Wait for the next piece of data from this feed. 79 """ 80 future = asyncio.get_event_loop().create_future() 81 self._return_futures.add(future) 82 return future
class
Feed(typing.Generic[~FeedT]):
7class Feed(Generic[FeedT]): 8 """ 9 Used for expressing a constant stream of data. 10 11 To get the next piece of data, call `get`. 12 13 Example: 14 ```py 15 import asyncio 16 from sbhs_robomaster import Feed 17 18 feed = Feed() 19 20 async def send_data(): 21 for i in range(5): 22 feed.feed(i) 23 await asyncio.sleep(1) 24 25 async def receive_data(): 26 while True: 27 print(await feed.get()) 28 29 # Run both coroutines concurrently 30 asyncio.run(asyncio.gather(send_data(), receive_data())) 31 32 # Prints: 33 # 0 34 # 1 35 # 2 36 # 3 37 # 4 38 ``` 39 40 Feeds can be used to implement push-based data streams. For example, the RoboMaster robot 41 sends data over UDP to the computer. This data can be received using the `Feed` class: 42 43 ```py 44 import asyncio 45 from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP, LineColour 46 47 async def main(): 48 async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot: 49 await robot.set_line_recognition_enabled() 50 await robot.set_line_recognition_color(LineColour.Red) 51 52 while True: 53 print(await robot.line.get()) 54 55 asyncio.run(main()) 56 ``` 57 58 **NOTE:** Data sent to a feed *before* `get` is called won't be returned. This means that 59 if a feed is receiving data faster than a consumer is processing it, some of the data will 60 not be delivered to the consumer. 61 """ 62 63 _return_futures: set[asyncio.Future[FeedT]] 64 65 def __init__(self): 66 self._return_futures = set() 67 68 def feed(self, data: FeedT) -> None: 69 """ 70 Send data to all consumers of this feed. 71 """ 72 for future in self._return_futures: 73 future.set_result(data) 74 75 self._return_futures.clear() 76 77 def get(self) -> asyncio.Future[FeedT]: 78 """ 79 Wait for the next piece of data from this feed. 80 """ 81 future = asyncio.get_event_loop().create_future() 82 self._return_futures.add(future) 83 return future
Used for expressing a constant stream of data.
To get the next piece of data, call get.
Example:
import asyncio
from sbhs_robomaster import Feed
feed = Feed()
async def send_data():
for i in range(5):
feed.feed(i)
await asyncio.sleep(1)
async def receive_data():
while True:
print(await feed.get())
# Run both coroutines concurrently
asyncio.run(asyncio.gather(send_data(), receive_data()))
# Prints:
# 0
# 1
# 2
# 3
# 4
Feeds can be used to implement push-based data streams. For example, the RoboMaster robot
sends data over UDP to the computer. This data can be received using the Feed class:
import asyncio
from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP, LineColour
async def main():
async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot:
await robot.set_line_recognition_enabled()
await robot.set_line_recognition_color(LineColour.Red)
while True:
print(await robot.line.get())
asyncio.run(main())
NOTE: Data sent to a feed before get is called won't be returned. This means that
if a feed is receiving data faster than a consumer is processing it, some of the data will
not be delivered to the consumer.
def
feed(self, data: ~FeedT) -> None:
68 def feed(self, data: FeedT) -> None: 69 """ 70 Send data to all consumers of this feed. 71 """ 72 for future in self._return_futures: 73 future.set_result(data) 74 75 self._return_futures.clear()
Send data to all consumers of this feed.
def
get(self) -> _asyncio.Future[~FeedT]:
77 def get(self) -> asyncio.Future[FeedT]: 78 """ 79 Wait for the next piece of data from this feed. 80 """ 81 future = asyncio.get_event_loop().create_future() 82 self._return_futures.add(future) 83 return future
Wait for the next piece of data from this feed.