# Watermill data stream organizing framework
Assume that you have numerous stream data producers and processors.
If data streams are logically linked together you may want to organize processing of joined data stream.
Watermill helps to get streams joined in a clear, lightweight way.
![ ](https://gitlab.com/kraevs/watermill/-/raw/d6665483c9bc44d39f3584a9f8082fb6bd39f4b6/img/fig-1.png)
## User guide
### Briefly
Input data stream is configured using processing function signature.
```python
def calculate_robot_speed(
prev_location: RobotLocation,
next_locations: Mapping[RobotLocation, List[RobotLocationNextStep]]
) -> RobotSpeed:
pass
```
First argument can have type of data stream element or list of stream elements.
Subsequent arguments are mapping of types of stream elements when using stream joins. In the example above
we want to get all robot locations joined with joined locations of the next time step.
If there is no result data stream simply omit return value or explicitly specify NoReturn type.
If you want to return several results for every processing function call you can return generator.
Note that you have to yield at least one valid element when returning generator.
Data themselves can be stored in any shared storage like Kafka, file system, databases.
![ ](https://gitlab.com/kraevs/watermill/-/raw/837cdb32affb3ab8b715bbffcb0dd6e8daab00b9/img/fig2.png)
Watermill doesn't introduce any dependency/coupling between services based on it.
Every service or simply process can independently decide which actual data stream it wants to input and
how to join data streams together. The only shared key of data streams is file name of file in FS, topic name for Kafka, etc.
Smallest piece of data of which data streams consists is dictionary. To describe data stream elements in code python dataclasses are used.
Very often you have to get reference to external objects in the processing function.
Dependency injection is used for that. It is enough to specify an argument with the type of the required object
in the processing function and DI will provide this argument with reference to the object.
```python
def store_robot_path(location: RobotLocation, db_connection: Connection) -> NoReturn:
pass
```
Message brokers are used to access to streams data in storages.
Watermill package includes two commonly used brokers. JSON file broker and Kafka message broker.
Both of them can use default serializers and deserializers.
Moreover Watermill users can implement deserializers that receive only the required fields.
![ ](https://gitlab.com/kraevs/watermill/-/raw/837cdb32affb3ab8b715bbffcb0dd6e8daab00b9/img/fig3.png)
### Stream joins
For the simplest case when there is only one data stream data processing function looks like
```python
def is_inside_area(location: RobotLocation) -> InsideArea:
return InsideArea(
x=location.x,
y=locatin.y,
inside=check_inside_area(location.x, location.y)
)
```
But usually there are several input data streams that should be combined by some condition.
Here 2 data streams are joined and falls into the processing function as combined elements.
For example join condition can be `audio_stream.frame_index = video_stream.frame_index`.
In the code it looks like
```python
def merge_audio_video(
audio_frame: AudioFrame,
video_frames: Mapping[AudioFrame, List[VideoFrame]]
) -> AudioVideoFrame:
pass
```
The first argument describes left data stream in the join and is called the master stream.
Iterating over the master stream returns every element just once.
Elements from secondary streams can be skipped if join condition is not met.
Unlike SQL joins Watermill supports only inner joins. So if there are no matching data in secondary streams
there will be no data at all for the processing. Furthermore, if there are no data at all in secondary streams
the processing function will never be executed.
It may be easier to understand streams join if you thing about join tree with a master stream as the root node.
To get next data for processing Watermill goes through the join tree and returns linked streams elements
only when there is at least one element found for each join tree node.
As can be seen from the processing function signature secondary streams can return more than one element.
Join tree can have no loops or duplicate nodes.
### End of stream
Stream elements are processed until there are elements in every input stream.
This is quite simple to understand where the data stream for files in file system ends. But originally _indefinite_
data streams like Kafka or RabbitMQ should be provided with some **End Of Stream** mark explicitly.
`KafkaMessageBroker` supports special type of elements `EndOfStream` defined in `message_brokers.message_broker` module.
Data stream processing is immediately stopped as soon as `EndOfStream` message appears in any of joined data streams.
`EndOfStream` message is then automatically put to output stream to state that there will be no more data.
You can also manually return instance of `EndOfStream` dataclass from the processing function
to indicate that processing loop should be stopped. `EndOfStream` is allowed to be the last element of yielded by
processing function generator.
### Join conditions
When joining streams join condition is evaluated for left and right elements.
Every element must consist of key. It can be ordinary field of expression based on one or more fields.
Join conditions are defined as a parameter to Watermill constructor.
The following examples demonstrates how to join streams based on conditions with expressions.
```python
@dataclass
class AudioFrame:
time_seconds: int
data: float
@dataclass
class VideoFrame:
time_ms: int
data: float
frame_index: int
first_frame_ts_seconds: int
...
mill = WaterMill(
...
process_func=merge_audio_video,
join_tree=join_streams(
AudioFrame,
JoinWith(
with_type=VideoFrame,
left_expression=get_field('time_seconds'),
right_expression=get_field('time_ms') // 1000
)
)
)
mill = WaterMill(
...
process_func=merge_audio_video,
join_tree=join_streams(
AudioFrame,
JoinWith(
with_type=VideoFrame,
left_expression=get_field('time_seconds'),
right_expression=
get_field('frame_index') * FRAME_DURATION_SECONDS +
get_fields('first_frame_ts_seconds')
)
)
)
```
Expressions calculation implementation is trivial and does not support operator precedence or parentheses.
Computed key value for every stream must increase monotonously. Based on this rule Watermill can detect streams
divergence. For example at the start of data processing left (master) stream key less than right (secondary) stream key.
In this case Watermill will skip right stream elements until stream elements synchronizes.
During data streams design you should keep in mind key comparison rules to correctly identify what the key will be
for every data stream.
Special case for joining streams is when the stream appears in join tree several times, e.g. self-join of the stream.
In this case separate class have to be defined for every join tree node of this stream type.
Refer to [kafka_join_streams](https://gitlab.com/kraevs/watermill/-/blob/master/examples/kafka_join_streams.py) example.
### Windows
Sometimes you may want to get master stream elements combined into lists based on some condition before processing.
For example collect one minute statistics and then process whole bunch of elements at once.
Thus processing function will get list of elements every time. Below is the illustration of appropriate stream configuration.
```python
def calculate_robot_speed(prev_location: List[RobotLocation]) -> RobotSpeed:
pass
...
mill = WaterMill(
...
process_func=calculate_robot
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
共38个文件
py:30个
txt:4个
pkg-info:2个
资源分类:Python库 所属语言:Python 资源全名:watermill-3.6.3.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源推荐
资源详情
资源评论
收起资源包目录
watermill-3.6.3.tar.gz (38个子文件)
watermill-3.6.3
watermill
message_brokers
message_broker.py 2KB
kafka_message_broker.py 8KB
json_file_message_broker.py 3KB
__init__.py 0B
stream_join.py 4KB
stream_types.py 615B
__init__.py 0B
windows.py 534B
expressions.py 2KB
mill.py 15KB
PKG-INFO 16KB
watermill.egg-info
PKG-INFO 16KB
requires.txt 80B
SOURCES.txt 1KB
top_level.txt 25B
dependency_links.txt 1B
tests
__init__.py 0B
test_expressions.py 1KB
test_join_tree.py 6KB
setup.cfg 38B
examples
window_fetch.py 3KB
kafka_chained_join.py 5KB
time_based_window_fetch.py 3KB
windowed_join_fetch.py 4KB
kafka_ssl.py 4KB
kafka_join_streams.py 4KB
read_kafka_write_file.py 4KB
__init__.py 0B
kafka_join_multistreams.py 5KB
manual_result_commit.py 5KB
single_stream_with_result.py 1KB
kafka_single_stream.py 3KB
single_stream.py 1KB
single_input_multiple_results.py 4KB
window_parallel_streams.py 4KB
stop_stream_processing.py 4KB
setup.py 1KB
README.md 13KB
共 38 条
- 1
资源评论
挣扎的蓝藻
- 粉丝: 13w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- ModStartCMS v8.4.0 框架稳定性持续迭代,修复部分已知问题
- bleder 教室学校学生教育室办公室考试
- 人脸检测-使用OpenCV实现的动漫+漫画人脸检测算法-附项目源码-优质项目实战.zip
- 道路贴图,材质材料免费
- 人脸检测-基于OpenCV+Node.js+WebSockets实现的实时人脸检测应用-附项目源码-优质项目实战.zip
- 一些常见的MySQL死锁案例-mysql-deadlocks-master(源代码+案例+图解说明)
- UE4动画烘焙器-ue4.27
- 新建文件夹.zip
- 1103a2a791bbd96ea98021062e327495b1c422e32fb27e0c2d6404b1bd74b692.gif
- 同城相亲交友php小程序
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功