<img src="logo/logo.jpg" alt="drawing" width="200"/>
# aioreactive - ReactiveX for asyncio using async and await
[![PyPI](https://img.shields.io/pypi/v/aioreactive.svg)](https://pypi.python.org/pypi/aioreactive)
![Python package](https://github.com/dbrattli/aioreactive/workflows/Python%20package/badge.svg)
![Upload Python Package](https://github.com/dbrattli/aioreactive/workflows/Upload%20Python%20Package/badge.svg)
[![codecov](https://codecov.io/gh/dbrattli/aioreactive/branch/master/graph/badge.svg)](https://codecov.io/gh/dbrattli/aioreactive)
> *NEWS: Project rebooted Nov. 2020. Rebuilt using [Expression](https://github.com/dbrattli/Expression).*
Aioreactive is [RxPY](https://github.com/ReactiveX/RxPY) for asyncio.
It's an asynchronous and reactive Python library for asyncio using async
and await. Aioreactive is built on the
[Expression](https://github.com/dbrattli/Expression) functional library
and, integrates naturally with the Python language.
> aioreactive is the unification of RxPY and reactive programming with
> asyncio using async and await.
## The design goals for aioreactive:
* Python 3.9+ only. We have a hard dependency [PEP
585](https://www.python.org/dev/peps/pep-0585/), Type Hinting Generics
In Standard Collections, data classes and type variables.
* All operators and tools are implemented as plain old functions.
* Everything is `async`. Sending values is async, subscribing to
observables is async. Disposing subscriptions is async.
* One scheduler to rule them all. Everything runs on the asyncio base
event-loop.
* No multi-threading. Only async and await with concurrency using
asyncio. Threads are hard, and in many cases it doesn’t make sense to
use multi-threading in Python applications. If you need to use threads
you may wrap them with
[`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)
and compose them into the chain with `flat_map()` or similar. See
[`parallel.py`](https://github.com/dbrattli/aioreactive/blob/master/examples/parallel/parallel.py)
for an example.
* Simple, clean and use few abstractions. Try to align with the
itertools package, and reuse as much from the Python standard library
as possible.
* Support type hints and static type checking using [Pylance](https://devblogs.microsoft.com/python/announcing-pylance-fast-feature-rich-language-support-for-python-in-visual-studio-code/).
* Implicit synchronous back-pressure ™. Producers of events will
simply be awaited until the event can be processed by the down-stream
consumers.
## AsyncObservable and AsyncObserver
With aioreactive you subscribe observers to observables, and the key
abstractions of aioreactive can be seen in this single line of code:
```python
subscription = await observable.subscribe_async(observer)
```
The difference from RxPY can be seen with the `await` expression.
Aioreactive is built around the asynchronous duals, or opposites of the
AsyncIterable and AsyncIterator abstract base classes. These async
classes are called AsyncObservable and AsyncObserver.
AsyncObservable is a producer of events. It may be seen as the dual or
opposite of AsyncIterable and provides a single setter method called
`subscribe_async()` that is the dual of the `__aiter__()` getter method:
```python
from abc import ABC, abstractmethod
class AsyncObservable(ABC):
@abstractmethod
async def subscribe_async(self, observer):
return NotImplemented
```
AsyncObserver is a consumer of events and is modeled after the
so-called [consumer interface](http://effbot.org/zone/consumer.htm), the
enhanced generator interface in
[PEP-342](https://www.python.org/dev/peps/pep-0342/) and async
generators in [PEP-525](https://www.python.org/dev/peps/pep-0525/). It
is the dual of the AsyncIterator `__anext__()` method, and expands to
three async methods `asend()`, that is the opposite of `__anext__()`,
`athrow()` that is the opposite of an `raise Exception()` and `aclose()`
that is the opposite of `raise StopAsyncIteration`:
```python
from abc import ABC, abstractmethod
class AsyncObserver(ABC):
@abstractmethod
async def asend(self, value):
return NotImplemented
@abstractmethod
async def athrow(self, error):
return NotImplemented
@abstractmethod
async def aclose(self):
return NotImplemented
```
## Subscribing to observables
An observable becomes hot and starts streaming items by using the
`subscribe_async()` method. The `subscribe_async()` method takes an
observable and returns a disposable subscription. So the
`subscribe_async()` method is used to attach a observer to the
observable.
```python
async def asend(value):
print(value)
disposable = await subscribe(source, AsyncAnonymousObserver(asend))
```
`AsyncAnonymousObserver` is an anonymous observer that constructs an
`AsyncObserver` out of plain async functions, so you don't have to
implement a new named observer every time you need one.
The subscription returned by `subscribe_async()` is disposable, so to
unsubscribe you need to await the `dispose_async()` method on the
subscription.
```python
await subscription.dispose_async()
```
## Asynchronous iteration
Even more interesting, with `to_async_iterable` you can flip around from
`AsyncObservable` to an `AsyncIterable` and use `async-for` to consume
the stream of events.
```python
obv = AsyncIteratorObserver()
subscription = subscribe(source, obv)
async for x in obv:
print(x)
```
They effectively transform us from an async push model to an async pull
model, and lets us use the awesome new language features such as `async
for` and `async-with`. We do this without any queueing, as a push by the
`AsyncObservable` will await the pull by the `AsyncIterator. This
effectively applies so-called "back-pressure" up the subscription as the
producer will await the iterator to pick up the item send.
The for-loop may be wrapped with async-with to control the lifetime of
the subscription:
```python
import aioreactive as rx
xs = rx.from_iterable([1, 2, 3])
result = []
obv = rx.AsyncIteratorObserver()
async with await xs.subscribe(obv) as subscription:
async for x in obv:
result.append(x)
assert result == [1, 2, 3]
```
## Async streams
An async stream is both an async observer and an async observable.
Aioreactive lets you create streams explicitly.
```python
import aioreactive as rx
stream = AsyncSubject() # Alias for AsyncMultiStream
sink = rx.AsyncAnonymousObserver()
await stream.subscribe_async(sink)
await stream.asend(42)
```
You can create streams directly from `AsyncMultiStream` or
`AsyncSingleStream`. `AsyncMultiStream` supports multiple observers, and
is hot in the sense that it will drop any event that is sent if there
are currently no observers attached. `AsyncSingleStream` on the other
hand supports a single observer, and is cold in the sense that it will
await any producer until there is an observer attached.
## Operators
The Rx operators in aioreactive are all plain old functions. You can
apply them to an observable and compose it into a transformed, filtered,
aggregated or combined observable. This transformed observable can be
streamed into an observer.
Observable -> Operator -> Operator -> Operator -> Observer
Aioreactive contains many of the same operators as you know from RxPY.
Our goal is not to implement them all, but to provide the most essential
ones.
* **concat** -- Concatenates two or more observables.
* **choose** -- Filters and/or transforms the observable.
* **choose_asnc** -- Asynchronously filters and/or transforms the observable.
* **debounce** -- Throttles an observable.
* **delay** -- delays the items within an observable.
* **distinct_until_changed** -- an o
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
共57个文件
py:46个
txt:4个
pkg-info:2个
资源分类:Python库 所属语言:Python 资源全名:aioreactive-0.14.0.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源推荐
资源详情
资源评论
收起资源包目录
aioreactive-0.14.0.tar.gz (57个子文件)
aioreactive-0.14.0
MANIFEST.in 54B
PKG-INFO 20KB
aioreactive
filtering.py 12KB
timeshift.py 6KB
py.typed 0B
utils.py 773B
testing
virtual_events.py 4KB
utils.py 150B
__init__.py 361B
observer.py 2KB
subject.py 2KB
types.py 1KB
create.py 7KB
observables.py 2KB
__init__.py 28KB
combine.py 13KB
subscription.py 2KB
iterable
to_async_observable.py 537B
__init__.py 0B
transform.py 13KB
notification.py 4KB
subject.py 4KB
leave.py 571B
_version.py 498B
observers.py 9KB
msg.py 4KB
test
test_filter.py 1KB
test_single_stream.py 5KB
test_debounce.py 1KB
test_concat.py 545B
test_flat_map.py 2KB
test_stream.py 4KB
test_merge.py 2KB
test_pipe.py 2KB
test_async_iteration.py 1KB
test_slice.py 1KB
__init__.py 0B
test_delay.py 2KB
test_with_latest_from.py 2KB
test_distinct_until_changed.py 1KB
test_forward_pipe.py 2KB
test_from_iterable.py 2KB
test_eventloop.py 2KB
test_chain.py 2KB
test_single.py 3KB
test_take.py 2KB
test_map.py 2KB
setup.cfg 269B
setup.py 2KB
README.md 16KB
versioneer.py 67KB
aioreactive.egg-info
PKG-INFO 20KB
requires.txt 11B
SOURCES.txt 1KB
top_level.txt 17B
dependency_links.txt 1B
zip-safe 1B
共 57 条
- 1
资源评论
挣扎的蓝藻
- 粉丝: 13w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功