sbhs_robomaster.feed

 1from typing import Generic, TypeVar
 2import asyncio
 3
 4
 5FeedT = TypeVar("FeedT")
 6class Feed(Generic[FeedT]):
 7    """
 8    Used for expressing a constant stream of data.
 9
10    To get the next piece of data, call `get`.
11
12    Example:
13    ```py
14    import asyncio
15    from sbhs_robomaster import Feed
16
17    feed = Feed()
18
19    async def send_data():
20        for i in range(5):
21            feed.feed(i)
22            await asyncio.sleep(1)
23
24    async def receive_data():
25        while True:
26            print(await feed.get())
27    
28    # Run both coroutines concurrently
29    asyncio.run(asyncio.gather(send_data(), receive_data()))
30
31    # Prints:
32    # 0
33    # 1
34    # 2
35    # 3
36    # 4
37    ```
38
39    Feeds can be used to implement push-based data streams. For example, the RoboMaster robot
40    sends data over UDP to the computer. This data can be received using the `Feed` class:
41
42    ```py
43    import asyncio
44    from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP, LineColour
45
46    async def main():
47        async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot:
48            await robot.set_line_recognition_enabled()
49            await robot.set_line_recognition_color(LineColour.Red)
50
51            while True:
52                print(await robot.line.get())
53
54    asyncio.run(main())
55    ```
56
57    **NOTE:** Data sent to a feed *before* `get` is called won't be returned. This means that
58    if a feed is receiving data faster than a consumer is processing it, some of the data will
59    not be delivered to the consumer.
60    """
61
62    _return_futures: set[asyncio.Future[FeedT]]
63
64    def __init__(self):
65        self._return_futures = set()
66
67    def feed(self, data: FeedT) -> None:
68        """
69        Send data to all consumers of this feed.
70        """
71        for future in self._return_futures:
72            future.set_result(data)
73
74        self._return_futures.clear()
75
76    def get(self) -> asyncio.Future[FeedT]:
77        """
78        Wait for the next piece of data from this feed.
79        """
80        future = asyncio.get_event_loop().create_future()
81        self._return_futures.add(future)
82        return future
class Feed(typing.Generic[~FeedT]):
 7class Feed(Generic[FeedT]):
 8    """
 9    Used for expressing a constant stream of data.
10
11    To get the next piece of data, call `get`.
12
13    Example:
14    ```py
15    import asyncio
16    from sbhs_robomaster import Feed
17
18    feed = Feed()
19
20    async def send_data():
21        for i in range(5):
22            feed.feed(i)
23            await asyncio.sleep(1)
24
25    async def receive_data():
26        while True:
27            print(await feed.get())
28    
29    # Run both coroutines concurrently
30    asyncio.run(asyncio.gather(send_data(), receive_data()))
31
32    # Prints:
33    # 0
34    # 1
35    # 2
36    # 3
37    # 4
38    ```
39
40    Feeds can be used to implement push-based data streams. For example, the RoboMaster robot
41    sends data over UDP to the computer. This data can be received using the `Feed` class:
42
43    ```py
44    import asyncio
45    from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP, LineColour
46
47    async def main():
48        async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot:
49            await robot.set_line_recognition_enabled()
50            await robot.set_line_recognition_color(LineColour.Red)
51
52            while True:
53                print(await robot.line.get())
54
55    asyncio.run(main())
56    ```
57
58    **NOTE:** Data sent to a feed *before* `get` is called won't be returned. This means that
59    if a feed is receiving data faster than a consumer is processing it, some of the data will
60    not be delivered to the consumer.
61    """
62
63    _return_futures: set[asyncio.Future[FeedT]]
64
65    def __init__(self):
66        self._return_futures = set()
67
68    def feed(self, data: FeedT) -> None:
69        """
70        Send data to all consumers of this feed.
71        """
72        for future in self._return_futures:
73            future.set_result(data)
74
75        self._return_futures.clear()
76
77    def get(self) -> asyncio.Future[FeedT]:
78        """
79        Wait for the next piece of data from this feed.
80        """
81        future = asyncio.get_event_loop().create_future()
82        self._return_futures.add(future)
83        return future

Used for expressing a constant stream of data.

To get the next piece of data, call get.

Example:

import asyncio
from sbhs_robomaster import Feed

feed = Feed()

async def send_data():
    for i in range(5):
        feed.feed(i)
        await asyncio.sleep(1)

async def receive_data():
    while True:
        print(await feed.get())

# Run both coroutines concurrently
asyncio.run(asyncio.gather(send_data(), receive_data()))

# Prints:
# 0
# 1
# 2
# 3
# 4

Feeds can be used to implement push-based data streams. For example, the RoboMaster robot sends data over UDP to the computer. This data can be received using the Feed class:

import asyncio
from sbhs_robomaster import connect_to_robomaster, DIRECT_CONNECT_IP, LineColour

async def main():
    async with await connect_to_robomaster(DIRECT_CONNECT_IP) as robot:
        await robot.set_line_recognition_enabled()
        await robot.set_line_recognition_color(LineColour.Red)

        while True:
            print(await robot.line.get())

asyncio.run(main())

NOTE: Data sent to a feed before get is called won't be returned. This means that if a feed is receiving data faster than a consumer is processing it, some of the data will not be delivered to the consumer.

def feed(self, data: ~FeedT) -> None:
68    def feed(self, data: FeedT) -> None:
69        """
70        Send data to all consumers of this feed.
71        """
72        for future in self._return_futures:
73            future.set_result(data)
74
75        self._return_futures.clear()

Send data to all consumers of this feed.

def get(self) -> _asyncio.Future[~FeedT]:
77    def get(self) -> asyncio.Future[FeedT]:
78        """
79        Wait for the next piece of data from this feed.
80        """
81        future = asyncio.get_event_loop().create_future()
82        self._return_futures.add(future)
83        return future

Wait for the next piece of data from this feed.