"""Select axis labels (columns or index) of a data frame."""
import operator
from typing import Any, Callable, List, Optional, Sequence
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from warnings import warn
import numpy as np
import pandas as pd
Indices = List[int]
class Selection:
"""Container for selection along a data frame axis with combination logic. """
def __init__(self, included:Optional[Indices]=None, excluded:Optional[Indices]=None, *, mask:Optional[Sequence[int]]=None):
"""
If ``mask`` is passed, ``included`` and ``excluded`` must be ``None``!
Parameters
----------
included:
List of indices included in the selection.
excluded:
List of indices excluded from the selection.
mask
Boolean array that will be converted to list of included
indices: All indices with corresponding truthy/non-zero value
will be included in the selection.
"""
if mask is not None:
if included is not None:
raise ValueError("included indices and mask cannot be passed together")
if excluded is not None:
raise ValueError("excluded indices and mask cannot be passed together")
included = np.nonzero(mask)[0].tolist()
self.included: Optional[Indices] = included
self.excluded: Optional[Indices] = excluded
def apply(self, axis:Literal["columns", "index"], df: pd.DataFrame):
labels = getattr(df, axis)
included = self.included
if included is None:
included = range(len(labels))
if self.excluded is not None:
excluded = set(self.excluded)
else:
excluded = set()
return labels[[i for i in included if not i in excluded]]
def __and__(self, other: "Selection") -> "Selection":
included=_combine_nones(self.included, other.included, intersect_indices)
excluded=_combine_nones(self.excluded, other.excluded, union_indices)
if included is not None and excluded is not None:
included = [i for i in included if i not in excluded]
return Selection(included, excluded)
def __or__(self, other: "Selection") -> "Selection":
included = _combine_nones(self.included, other.included, union_indices)
excluded = _combine_nones(self.excluded, other.excluded, intersect_indices)
if included is not None and excluded is not None:
excluded = [i for i in excluded if i not in included]
return Selection(included, excluded)
def __invert__(self) -> "Selection":
return Selection(self.excluded, self.included)
# Utilities to collect and combine column selections
def _combine_nones(a: Optional[Indices], b: Optional[Indices], fn_both:Callable[[Indices, Indices], Indices]) -> Optional[Indices]:
if a is None and b is None:
return None
if a is not None and b is None:
return a
if a is None and b is not None:
return b
return fn_both(a, b)
def intersect_indices(left: Indices, right: Indices) -> Indices:
r = []
for i in right:
if i in left:
r.append(i)
return r
def union_indices(left: Indices, right: Indices) -> Indices:
return left + [i for i in right if i not in left]
# Column selection operator closures
class BaseOp:
"""API definition of the closure object."""
def __call__(self, axis: Literal["columns", "index"], df: pd.DataFrame) -> Selection:
"""Evaluate operator on data frame from context."""
raise NotImplementedError("Must be implemented in sub-class.")
class LabelSelectionOp(BaseOp):
"""Explicitely select labels."""
def __init__(self, labels, level=None):
if isinstance(labels, list):
labels = tuple(labels)
elif not isinstance(labels, (slice, tuple)):
# Convert "scalar" values to some iterable
labels = (labels,)
self.labels = labels
self.level = level
def __call__(self, axis, df):
labels = getattr(df, axis)
idx = np.arange(len(labels))
if self.level is None:
cands = labels
else:
cands = labels.get_level_values(self.level)
indices = []
if isinstance(self.labels, tuple):
for lbl in self.labels:
indices.extend(idx[cands == lbl])
elif isinstance(self.labels, slice):
# NOTE: We need to make this more complex because we also need
# to treat situation with multiple repetitions of the same
# value, e.g., cases of multi-index levels.
in_slice = self.labels.start is None
reached_slice_stop = False
for i, lbl in enumerate(cands):
if not in_slice and lbl == self.labels.start:
in_slice = True
if reached_slice_stop and lbl != self.labels.stop:
# We stepped over the end of the slice.
break
if in_slice:
indices.append(i)
if self.labels.stop is not None and lbl == self.labels.stop:
reached_slice_stop = True
else:
# This should never be reached becaus of the argument processing
# in __init__.
raise ValueError(f"Unexpected type for self.labels: {type(self.labels)}: {self.labels!r}")
return Selection(indices)
def __str__(self):
if isinstance(self.labels, slice):
fmt = lambda o, default: repr(o) if o else default
items = [fmt(self.labels.start, ''), fmt(self.labels.stop, '')]
if self.labels.step:
items.append(repr(self.labels.step))
pp_labels = ':'.join(items)
else:
pp_labels = ', '.join(str(l) for l in self.labels)
if self.level:
return f'(level={self.level})[{pp_labels}]'
return f'[{pp_labels}]'
class LabelPredicateOp(BaseOp):
"""Select labels by a predicate, e.g. ``startswith``."""
def __init__(self, meth, args, kwargs, level=None):
self.meth = meth
self.args = args
self.kwargs = kwargs
self.level = level
def __str__(self):
def pp(a):
if isinstance(a, tuple):
return [repr(i) for i in a]
elif isinstance(a, dict):
return [f'{k}={v!r}' for k, v in a.items()]
return [repr(a)]
pp_args = ', '.join(pp(self.args) + pp(self.kwargs))
if self.level:
return f'(level={self.level}).{self.meth}({pp_args})'
return f'.{self.meth}({pp_args})'
def __call__(self, axis, df: pd.DataFrame) -> Selection:
labels = getattr(df, axis)
if self.level is None:
str_accessor = labels.str
else:
str_accessor = labels.get_level_values(self.level).str
meth = getattr(str_accessor, self.meth)
mask = meth(*self.args, **self.kwargs)
return Selection(mask=mask)
class EllipsisOp(BaseOp):
"""Select all columns."""
def __call__(self, axis, df: pd.DataFrame) -> Selection:
labels = getattr(df, axis)
return Selection(mask=np.ones(len(labels), dtype=bool))
def __str__(self):
return '...'
class BinaryOp(BaseOp):
"""Combine two operators."""
def __init__(self, left: BaseOp, right: BaseOp, op: Callable[[Any, Any], Any]):
self.left = left
self.right = right
self.op = op
def __str__(self):
op_name = getattr(self.op, '__name__', str(self.op))
return f'({self.left}) {op_name} ({self.right})'
def __call__(self, axis, df: pd.DataFrame) -> Selection:
sel_left = self.left(axis, df)
sel_right = self.right(axis, df)
return self.op(sel_left, sel_right)
class

程序员Chino的日记
- 粉丝: 3816
- 资源: 5万+
最新资源
- 《基于FPGA的Verilog语言FOC控制永磁同步异步电机资料:涵盖Cordic算法与SVPWM代码实现详解》,FPGA Verilog编程与电机控制:基于FOC的永磁同步与异步电机开环闭环控制理论
- kernel-devel-5.10.0-46.uelc20.x86-64.rpm
- (源码)基于CUDA的并行计算项目 Testworld.zip
- Delphi 12 控件之DelphiDeepseek.zip
- 微信输入法,可以统一电脑整体与微信输入法一致
- B站黑马程序员千万播放C++入门视频课程代码 代码随想录官网力扣题目C++代码
- kernel-headers-5.10.0-46.uelc20.x86-64.rpm
- 基于三菱PLC与组态王技术的兰花灌溉控制系统在农业农田的应用研究,基于三菱PLC与组态王的兰花灌溉控制技术在农业农田的实践应用,93#基于三菱PLC和组态王的兰花灌溉控制系统的农业农田 ,核心关
- 区域双碳目标与路径规划研究(碳达峰).zip
- kernel-modules-5.10.0-46.uelc20.x86-64.rpm
- 三菱PLC与组态王控制下的高效污水处理系统设计与应用,基于三菱PLC与组态王组态控制的智能污水处理系统设计与应用,91#基于三菱PLC和组态王组态控制的污水处理系统 ,三菱PLC; 组态王组态控
- Delphi 12 控件之TMS VCL UI Pack v13.4.0.1 for Delphi & CB 7-12 Athens Full Source.rar
- 基于Java Web的个人财务管理系统的课程设计实现与功能解析
- 元胞自动机交通流仿真:探究公交车专用道与非专用道混合行驶情形下的速度变化及仿真结果分析,元胞自动机交通流仿真:基于公交专用道与非专用道环境下的多模态车辆行驶模拟及平均速度图像生成研究,元胞自动机交通流
- Deepseek Ollama大模型管理脚本
- deepseek 应该怎样提问.docx
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈


