"""Select axis labels (columns or index) of a data frame."""
import operator
from typing import Any, Callable, List, Optional, Sequence
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from warnings import warn
import numpy as np
import pandas as pd
Indices = List[int]
class Selection:
"""Container for selection along a data frame axis with combination logic. """
def __init__(self, included:Optional[Indices]=None, excluded:Optional[Indices]=None, *, mask:Optional[Sequence[int]]=None):
"""
If ``mask`` is passed, ``included`` and ``excluded`` must be ``None``!
Parameters
----------
included:
List of indices included in the selection.
excluded:
List of indices excluded from the selection.
mask
Boolean array that will be converted to list of included
indices: All indices with corresponding truthy/non-zero value
will be included in the selection.
"""
if mask is not None:
if included is not None:
raise ValueError("included indices and mask cannot be passed together")
if excluded is not None:
raise ValueError("excluded indices and mask cannot be passed together")
included = np.nonzero(mask)[0].tolist()
self.included: Optional[Indices] = included
self.excluded: Optional[Indices] = excluded
def apply(self, axis:Literal["columns", "index"], df: pd.DataFrame):
labels = getattr(df, axis)
included = self.included
if included is None:
included = range(len(labels))
if self.excluded is not None:
excluded = set(self.excluded)
else:
excluded = set()
return labels[[i for i in included if not i in excluded]]
def __and__(self, other: "Selection") -> "Selection":
included=_combine_nones(self.included, other.included, intersect_indices)
excluded=_combine_nones(self.excluded, other.excluded, union_indices)
if included is not None and excluded is not None:
included = [i for i in included if i not in excluded]
return Selection(included, excluded)
def __or__(self, other: "Selection") -> "Selection":
included = _combine_nones(self.included, other.included, union_indices)
excluded = _combine_nones(self.excluded, other.excluded, intersect_indices)
if included is not None and excluded is not None:
excluded = [i for i in excluded if i not in included]
return Selection(included, excluded)
def __invert__(self) -> "Selection":
return Selection(self.excluded, self.included)
# Utilities to collect and combine column selections
def _combine_nones(a: Optional[Indices], b: Optional[Indices], fn_both:Callable[[Indices, Indices], Indices]) -> Optional[Indices]:
if a is None and b is None:
return None
if a is not None and b is None:
return a
if a is None and b is not None:
return b
return fn_both(a, b)
def intersect_indices(left: Indices, right: Indices) -> Indices:
r = []
for i in right:
if i in left:
r.append(i)
return r
def union_indices(left: Indices, right: Indices) -> Indices:
return left + [i for i in right if i not in left]
# Column selection operator closures
class BaseOp:
"""API definition of the closure object."""
def __call__(self, axis: Literal["columns", "index"], df: pd.DataFrame) -> Selection:
"""Evaluate operator on data frame from context."""
raise NotImplementedError("Must be implemented in sub-class.")
class LabelSelectionOp(BaseOp):
"""Explicitely select labels."""
def __init__(self, labels, level=None):
if isinstance(labels, list):
labels = tuple(labels)
elif not isinstance(labels, (slice, tuple)):
# Convert "scalar" values to some iterable
labels = (labels,)
self.labels = labels
self.level = level
def __call__(self, axis, df):
labels = getattr(df, axis)
idx = np.arange(len(labels))
if self.level is None:
cands = labels
else:
cands = labels.get_level_values(self.level)
indices = []
if isinstance(self.labels, tuple):
for lbl in self.labels:
indices.extend(idx[cands == lbl])
elif isinstance(self.labels, slice):
# NOTE: We need to make this more complex because we also need
# to treat situation with multiple repetitions of the same
# value, e.g., cases of multi-index levels.
in_slice = self.labels.start is None
reached_slice_stop = False
for i, lbl in enumerate(cands):
if not in_slice and lbl == self.labels.start:
in_slice = True
if reached_slice_stop and lbl != self.labels.stop:
# We stepped over the end of the slice.
break
if in_slice:
indices.append(i)
if self.labels.stop is not None and lbl == self.labels.stop:
reached_slice_stop = True
else:
# This should never be reached becaus of the argument processing
# in __init__.
raise ValueError(f"Unexpected type for self.labels: {type(self.labels)}: {self.labels!r}")
return Selection(indices)
def __str__(self):
if isinstance(self.labels, slice):
fmt = lambda o, default: repr(o) if o else default
items = [fmt(self.labels.start, ''), fmt(self.labels.stop, '')]
if self.labels.step:
items.append(repr(self.labels.step))
pp_labels = ':'.join(items)
else:
pp_labels = ', '.join(str(l) for l in self.labels)
if self.level:
return f'(level={self.level})[{pp_labels}]'
return f'[{pp_labels}]'
class LabelPredicateOp(BaseOp):
"""Select labels by a predicate, e.g. ``startswith``."""
def __init__(self, meth, args, kwargs, level=None):
self.meth = meth
self.args = args
self.kwargs = kwargs
self.level = level
def __str__(self):
def pp(a):
if isinstance(a, tuple):
return [repr(i) for i in a]
elif isinstance(a, dict):
return [f'{k}={v!r}' for k, v in a.items()]
return [repr(a)]
pp_args = ', '.join(pp(self.args) + pp(self.kwargs))
if self.level:
return f'(level={self.level}).{self.meth}({pp_args})'
return f'.{self.meth}({pp_args})'
def __call__(self, axis, df: pd.DataFrame) -> Selection:
labels = getattr(df, axis)
if self.level is None:
str_accessor = labels.str
else:
str_accessor = labels.get_level_values(self.level).str
meth = getattr(str_accessor, self.meth)
mask = meth(*self.args, **self.kwargs)
return Selection(mask=mask)
class EllipsisOp(BaseOp):
"""Select all columns."""
def __call__(self, axis, df: pd.DataFrame) -> Selection:
labels = getattr(df, axis)
return Selection(mask=np.ones(len(labels), dtype=bool))
def __str__(self):
return '...'
class BinaryOp(BaseOp):
"""Combine two operators."""
def __init__(self, left: BaseOp, right: BaseOp, op: Callable[[Any, Any], Any]):
self.left = left
self.right = right
self.op = op
def __str__(self):
op_name = getattr(self.op, '__name__', str(self.op))
return f'({self.left}) {op_name} ({self.right})'
def __call__(self, axis, df: pd.DataFrame) -> Selection:
sel_left = self.left(axis, df)
sel_right = self.right(axis, df)
return self.op(sel_left, sel_right)
class
没有合适的资源?快使用搜索试试~ 我知道了~
pandas_paddles-1.4.0.tar.gz
需积分: 1 0 下载量 78 浏览量
2024-03-15
23:32:39
上传
评论
收藏 18KB GZ 举报
温馨提示
Python库是一组预先编写的代码模块,旨在帮助开发者实现特定的编程任务,无需从零开始编写代码。这些库可以包括各种功能,如数学运算、文件操作、数据分析和网络编程等。Python社区提供了大量的第三方库,如NumPy、Pandas和Requests,极大地丰富了Python的应用领域,从数据科学到Web开发。Python库的丰富性是Python成为最受欢迎的编程语言之一的关键原因之一。这些库不仅为初学者提供了快速入门的途径,而且为经验丰富的开发者提供了强大的工具,以高效率、高质量地完成复杂任务。例如,Matplotlib和Seaborn库在数据可视化领域内非常受欢迎,它们提供了广泛的工具和技术,可以创建高度定制化的图表和图形,帮助数据科学家和分析师在数据探索和结果展示中更有效地传达信息。
资源推荐
资源详情
资源评论
收起资源包目录
pandas_paddles-1.4.0.tar.gz (10个子文件)
pandas_paddles-1.4.0
README.rst 6KB
LICENSE 1KB
PKG-INFO 7KB
pandas_paddles
__init__.py 6KB
contexts.py 10KB
util.py 3KB
closures.py 6KB
axis.py 18KB
pipe.py 3KB
pyproject.toml 1KB
共 10 条
- 1
资源评论
程序员Chino的日记
- 粉丝: 3667
- 资源: 5万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 柯尼卡美能达Bizhub C364e打印机驱动下载
- CMake 入门实战的源代码
- c7383c5d0009dfc59e9edf595bb0bcd0.zip
- 柯尼卡美能达Bizhub C266打印机驱动下载
- java游戏之我当皇帝那些年.zip开发资料
- 基于Matlab的汉明码(Hamming Code)纠错传输以及交织编码(Interleaved coding)仿真.zip
- 中国省级新质生产力发展指数数据(任宇新版本)2010-2023年.txt
- 基于Matlab的2Q-FSK移频键控通信系统仿真.zip
- 使用C++实现的常见算法
- travel-web-springboot【程序员VIP专用】.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功