from dataclasses import (
dataclass,
)
from fa_purity.cmd import (
Cmd,
)
from fa_purity.frozen import (
FrozenDict,
)
from fa_purity.json.primitive.factory import (
to_primitive,
)
from fa_purity.utils import (
raise_exception,
)
from redshift_client.id_objs import (
SchemaId,
TableId,
)
from redshift_client.sql_client.core import (
InvalidType,
SqlClient,
)
from redshift_client.sql_client.query import (
dynamic_query,
new_query,
)
from typing import (
FrozenSet,
TypeVar,
)
_T = TypeVar("_T")
def _assert_bool(raw: _T) -> bool:
if isinstance(raw, bool):
return raw
raise InvalidType("Expected bool")
@dataclass(frozen=True)
class SchemaClient:
_db_client: SqlClient
def all_schemas(self) -> Cmd[FrozenSet[SchemaId]]:
_stm = (
"SELECT s.nspname AS table_schema",
"FROM pg_catalog.pg_namespace s",
"JOIN pg_catalog.pg_user u ON u.usesysid = s.nspowner",
"ORDER BY table_schema",
)
stm = " ".join(_stm)
return self._db_client.execute(
new_query(stm), None
) + self._db_client.fetch_all().map(
lambda i: frozenset(
SchemaId(to_primitive(e[0], str).unwrap()) for e in i
)
)
def table_ids(self, schema: SchemaId) -> Cmd[FrozenSet[TableId]]:
_stm = (
"SELECT tables.table_name FROM information_schema.tables",
"WHERE table_schema = %(schema_name)s",
)
stm = " ".join(_stm)
return self._db_client.execute(
new_query(stm), FrozenDict({"schema_name": schema.name})
) + self._db_client.fetch_all().map(
lambda i: frozenset(
TableId(schema, to_primitive(e[0], str).unwrap()) for e in i
)
)
def exist(self, schema: SchemaId) -> Cmd[bool]:
_stm = (
"SELECT EXISTS",
"(SELECT 1 FROM pg_namespace",
"WHERE nspname = %(schema_name)s)",
)
stm = " ".join(_stm)
return self._db_client.execute(
new_query(stm), FrozenDict({"schema_name": schema.name})
) + self._db_client.fetch_one().map(
lambda i: _assert_bool(i[0])
if i is not None
else raise_exception(InvalidType("Expected not None"))
)
def delete(self, schema: SchemaId, cascade: bool) -> Cmd[None]:
opt = " CASCADE" if cascade else ""
stm: str = "DROP SCHEMA {schema_name}" + opt
return self._db_client.execute(
dynamic_query(stm, FrozenDict({"schema_name": schema.name})), None
)
def rename(self, old: SchemaId, new: SchemaId) -> Cmd[None]:
stm = "ALTER SCHEMA {from_schema} RENAME TO {to_schema}"
return self._db_client.execute(
dynamic_query(
stm,
FrozenDict(
{
"from_schema": old.name,
"to_schema": new.name,
}
),
),
None,
)
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
共22个文件
py:19个
pkg-info:1个
toml:1个
资源分类:Python库 所属语言:Python 资源全名:redshift_client-0.2.2.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源推荐
资源详情
资源评论
收起资源包目录
redshift_client-0.2.2.tar.gz (22个子文件)
redshift_client-0.2.2
PKG-INFO 619B
pyproject.toml 427B
redshift_client
table
core.py 941B
client.py 2KB
__init__.py 0B
py.typed 0B
schema
core.py 278B
client.py 3KB
__init__.py 0B
column.py 244B
data_type
alias.py 2KB
core.py 1KB
__init__.py 0B
decode.py 1KB
__init__.py 0B
id_objs.py 175B
sql_client
core.py 2KB
__init__.py 0B
connection.py 2KB
query.py 894B
_assert.py 2KB
setup.py 836B
共 22 条
- 1
资源评论
挣扎的蓝藻
- 粉丝: 13w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功