Glide: Easy ETL
===============
[![Generic badge](https://img.shields.io/badge/Status-Alpha-yellow.svg)](https://shields.io/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Documentation Status](https://readthedocs.org/projects/glide-etl/badge/?version=latest)](https://glide-etl.readthedocs.io/en/latest/?badge=latest)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
Introduction
------------
Glide was inspired by and uses a similar syntax to [Consecution](https://github.com/robdmc/consecution), which is an easy-to-use
pipeline abstraction tool inspired by [Apache Storm Topologies](http://storm.apache.org/releases/current/Tutorial.html).
Like those libraries, Glide is:
- A simple, reusable approach to building robust ETL pipelines
- A system for wiring together processing nodes to form a DAG
Glide also has:
- An expanding suite of built-in nodes and pipelines that extract, transform, and load data from/to any combination of:
- SQL databases (SQLite, DBAPI, and SQLAlchemy support)
- URLs
- Local or remote files including:
- CSVs
- Excel files (including multi-sheet support)
- Raw/generic files
- Emails
- Built-in nodes for Pandas DataFrame-based pipelines, including optional support for DataFrame transformation via [Dask](https://dask.org/) or [Swifter](https://github.com/jmcarpenter2/swifter)
- A variety of node and DAG parallel processing strategies via concurrent.futures Executors or optional [Dask](https://dask.org/) support
- A simple decorator to generate a command line interface from a pipeline in ~one line of code
- The ability to control node contexts via defaults and/or simple runtime overrides
Table of Contents
-----------------
- [Installation](#installation)
- [Examples](#examples)
- [Creating Nodes](#creatingnodes)
- [CLI Generation](#cligeneration)
- [Extensions](#extensions)
- [Docs](#documentation)
- [How to Contribute](#howtocontribute)
<a name="installation"></a>
Installation
------------
> ⚠️ **Warning**: This project is still in an alpha state and should probably not be used in production.
```shell
$ pip install glide
```
<a name="examples"></a>
Examples
--------
The following examples serve as a quickstart to illustrate some core features
and built-in nodes. More complete documentation is in progress and can be
viewed [here](https://glide-etl.readthedocs.io/en/latest/index.html).
`Glider` is the main pipeline class that takes a DAG of `Nodes` as input and
then accepts data to process in its `consume` method. In most cases `consume`
will iterate over its input as-is passing each item to the DAG to be
processed.
> **Note:** Some inputs, such as Pandas objects, strings, file objects, dicts,
and callables are automatically wrapped in a list to prevent them from being
broken up, as iteration is often inefficient or nonsensical in those cases.
The examples below assume you have used the following (taboo) shortcut to
import all necessary node and pipeline classes:
```python
from glide import *
```
The names of the built-in classes aim to be explicit and therefore can end up
a bit longer given the many combinations of ways to process data with Glide. As
a convention, nodes prefixed with "Row" expect to operate on plain old python
iterables, while nodes prefixed with "DataFrame" propagate Pandas DataFrames.
Let's build some pipelines to explore Glide further...
### Example: Read a CSV
Here is a trivial example that reads a CSV and passes all rows to a `PrettyPrinter`
node in a single push to be pretty-printed:
```python
glider = Glider(
RowCSVExtractor("extract")
| PrettyPrinter("load")
)
glider.consume(["/path/to/file.csv"])
```
### Example: DataFrame Transformation
Here is a slightly more realistic example applying a transformation to a
DataFrame read from a CSV, in this case lowercasing all strings before loading
into an output CSV:
```python
def lower(s):
return s.lower() if type(s) == str else s
glider = Glider(
DataFrameCSVExtractor("extract")
| DataFrameApplyMapTransformer("transform")
| DataFrameCSVLoader("load", index=False, mode="a")
)
glider.consume(
["/path/to/infile.csv"],
extract=dict(chunksize=100),
transform=dict(func=lower),
load=dict(outfile="/path/to/outfile.csv"),
)
```
### Node Context
The above example also demonstrates two separate ways to pass context to nodes:
1. Passing kwargs when instantiating the node. This becomes a default context
for the node any time it is used/reused.
2. Passing kwargs to `consume` that are node_name->node_context pairs. This context
lasts only for the `consume` call.
> **Note:** Further details can be found in the node creation documentation.
> **Also Note:** Many of the provided nodes pass their context to
well-documented functions, such as `DataFrame.to_csv` in the case of
`DataFrameCSVLoader`. Review the documentation/code for each node for more
detail on how args are processed and which are required.
### Example: Parallel DataFrame Transformation
Let's do the same thing with the data split in parallel processes using a
`ProcessPoolExecutor` at the transformation step. Note that we instead use a
`DataFrameProcessPoolTransformer` and adjusted the `func` argument to the
transformer since it operates on a chunk of the DataFrame instead of being fed
individual elements from the DataFrame as `apply_map` does under the hood in
the previous example:
```python
def lower(s):
return s.lower() if type(s) == str else s
def df_lower(df):
df = df.applymap(lower)
return df
glider = Glider(
DataFrameCSVExtractor("extract")
| DataFrameProcessPoolTransformer("transform")
| DataFrameCSVLoader("load", index=False, mode="a")
)
glider.consume(
["infile.csv"],
transform=dict(func=df_lower),
load=dict(outfile="outfile.csv"),
)
```
> **Note:** there are transformer nodes for using Swifter and Dask as well if
you install those extensions.
### Example: Placeholder Nodes
You can also easily drop replacement nodes into a templated pipeline. In this
case we use a `PlaceholderNode` for the extract node in the pipeline
definition and then replace that with a `DataFrameCSVExtractor`. The result is
a pipeline that can extract a CSV from one file, perform some custom
transformation on the DataFrame, and then load it to another CSV.
```python
glider = Glider(
PlaceholderNode("extract")
| MyTransformer("transform")
| DataFrameCSVLoader("load", index=False, mode="a")
)
glider["extract"] = DataFrameCSVExtractor("extract")
glider.consume(
["/path/to/infile.csv"],
extract=dict(chunksize=100),
load=dict(outfile="/path/to/outfile.csv")
)
```
> **Note:** Any node can be replaced by name. `PlaceholderNode` is just a convenience.
### Example: Global State
A `Glider` can also have a shared context that can be used to populate node
arguments via its optional `global_state` argument:
```python
conn = get_my_sqlalchemy_conn()
sql = "select * from in_table limit 10"
glider = Glider(
DataFrameSQLExtractor("extract")
| DataFrameSQLLoader("load", if_exists="replace", index=False),
global_state=dict(conn=conn) # conn will automagically be passed to any nodes that require it
)
glider.consume(
[sql],
load=dict(table="out_table")
)
```
### Example: Parallel Pipelines via ParaGlider
Glide also has support for completely parallelizing pipelines using a
`ParaGlider` (who said ETL isn't fun?!?) instead of a `Glider`. The following
code will create a process pool and split processing of the inputs over the
pool, with each process running the entire pipeline on part of the consumed
data:
```python
glider = ProcessPoolParaGlider(
RowCSVExtractor('extract')
| Printer('load')
)
glider.consume(
["/path/to/infile1.csv", "/path/to/infile2.csv"],
extract=dict(nrows=50)
)
```
### Example: Parallel Branching
If you