#!/usr/bin/python
"""ノード定義
* UserとGroupのモデル定義を書きます。
* 関係テーブルのモデル実装は別モジュールにしようかと思ってる
* sqlalchemyのベースクラスを拡張したNodeクラスに共通のプロパティを載せて、そいつらをUserとGroupに継承させてます。
Todo:
* sqlalchemy用にUser型とGroup型を作って、↓のクラスをそのまま使ってDB呼び出しできるようにしたい
"""
import base64
import hashlib
import random
import secrets
import smtplib
import bcrypt
import jwt
import magic
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy import event
from mitama.db import BaseDatabase, func, ForeignKey, relationship, Table, backref
from mitama.db.types import Column, Group, Integer, LargeBinary
from mitama.db.types import Node as NodeType
from mitama.db.types import String
from mitama.db.model import UUID
from mitama.noimage import load_noimage_group, load_noimage_user
from mitama.conf import get_from_project_dir
from mitama._extra import _classproperty
class Database(BaseDatabase):
pass
db = Database(prefix='mitama')
secret = secrets.token_hex(32)
class AuthorizationError(Exception):
INVALID_TOKEN = 0
WRONG_PASSWORD = 1
USER_NOT_FOUND= 2
def __init__(self, code):
self.code = code
@property
def message(self):
return [
"トークンが不正です",
"パスワードが間違っています",
"ユーザーが見つかりません"
][self.code]
pass
user_group = Table(
"mitama_user_group",
db.metadata,
Column("_id", String(64), default=UUID(), primary_key=True),
Column("group_id", String(64), ForeignKey("mitama_group._id", ondelete="CASCADE")),
Column("user_id", String(64), ForeignKey("mitama_user._id", ondelete="CASCADE")),
)
class UserGroup(db.Model):
__table__ = user_group
_id = user_group.c._id
group_id= user_group.c.group_id
user_id = user_group.c.user_id
user = relationship("User")
group = relationship("Group")
class Node(object):
_icon = Column(LargeBinary)
_name = Column("name", String(255))
_screen_name = Column("screen_name", String(255))
_name_proxy = list()
_screen_name_proxy = list()
_icon_proxy = list()
@property
def id(self):
return self._id
@property
def name(self):
name = self._name
for fn in self._name_proxy:
name = fn(name)
return name
@property
def screen_name(self):
screen_name = self._screen_name
for fn in self._screen_name_proxy:
screen_name = fn(screen_name)
return screen_name
def to_dict(self):
return {
"_id": self._id,
"name": self.name,
"screen_name": self.screen_name,
}
@property
def icon(self):
if self._icon != None:
icon = self._icon
else:
icon = self.load_noimage()
for fn in self._icon_proxy:
icon = fn(icon)
return icon
@name.setter
def name(self, value):
self._name = value
@screen_name.setter
def screen_name(self, value):
self._screen_name = value
@icon.setter
def icon(self, value):
self._icon = value
def icon_to_dataurl(self):
f = magic.Magic(mime=True, uncompress=True)
mime = f.from_buffer(self.icon)
return "data:" + mime + ";base64," + base64.b64encode(self.icon).decode()
@classmethod
def add_name_proxy(cls, fn):
cls._name_proxy.append(fn)
@classmethod
def add_screen_name_proxy(cls, fn):
cls._screen_name_proxy.append(fn)
@classmethod
def add_icon_proxy(cls, fn):
cls._icon_proxy.append(fn)
@classmethod
def retrieve(cls, _id=None, screen_name=None, **kwargs):
if _id is not None:
return super().retrieve(_id = _id)
elif screen_name is not None:
return super().retrieve(_screen_name = screen_name)
else:
return super().retrieve(**kwargs)
def __eq__(self, other):
return self._id == other._id
class User(Node, db.Model):
"""ユーザーのモデルクラスです
:param _id: 固有のID
:param screen_name: ログイン名
:param name: 名前
:param password: パスワード
:param icon: アイコン
"""
__tablename__ = "mitama_user"
_id = Column(String(64), default=UUID("user"), primary_key = True, nullable=False)
_project = None
_token = Column(String(64))
email = Column(String(64), nullable=False)
password = Column(String(255))
groups = relationship(
"Group",
secondary=user_group,
)
def to_dict(self, only_profile=False):
profile = super().to_dict()
if not only_profile:
profile["groups"] = [p.to_dict(True) for p in self.groups()]
return profile
def load_noimage(self):
return load_noimage_user()
def delete(self):
"""ユーザーを削除します"""
from mitama.app.hook import HookRegistry
hook_registry = HookRegistry()
hook_registry.delete_user(self)
super().delete()
def update(self):
"""ユーザー情報を更新します"""
super().update()
from mitama.app.hook import HookRegistry
hook_registry = HookRegistry()
hook_registry.update_user(self)
def create(self):
"""ユーザーを作成します"""
super().create()
from mitama.app.hook import HookRegistry
hook_registry = HookRegistry()
hook_registry.create_user(self)
def password_check(self, password):
password = base64.b64encode(hashlib.sha256(password.encode() * 10).digest())
return bcrypt.checkpw(password, self.password)
@classmethod
def password_auth(cls, screen_name, password):
"""ログイン名とパスワードで認証します
:param screen_name: ログイン名
:param password: パスワード
:return: Userインスタンス
"""
try:
user = cls.retrieve(_screen_name=screen_name)
if user is None:
raise AuthorizationError(AuthorizationError.USER_NOT_FOUND)
except:
raise AuthorizationError(AuthorizationError.USER_NOT_FOUND)
password = base64.b64encode(hashlib.sha256(password.encode() * 10).digest())
if bcrypt.checkpw(password, user.password):
return user
else:
raise AuthorizationError(AuthorizationError.WRONG_PASSWORD)
def valid_password(self, password):
"""パスワードが安全か検証します
:param password: パスワードのプレーンテキスト
:return: 検証済みパスワード
"""
if self._project is None:
return password
config = self._project.config
MIN_PASSWORD_LEN = config.password_validation.get('MIN_PASSWORD_LEN', None)
COMPLICATED_PASSWORD = config.password_validation.get('COMPLICATED_PASSWORD', False)
if MIN_PASSWORD_LEN and len(password) < MIN_PASSWORD_LEN:
raise ValueError('パスワードは{}文字以上である必要があります'.format(MIN_PASSWORD_LEN))
if COMPLICATED_PASSWORD and (not any(c.isdigit() for c in password)) or (not any(c.isalpha() for c in password)):
raise ValueError('パスワードは数字とアルファベットの両方を含む必要があります')
return password
def set_password(self, password):
"""パスワードをハッシュ化します
:param password: パスワードのプレーンテキスト
:return: パスワードハッシュ
"""
password = self.valid_password(password