# Copyright Kevin Deldycke <[email protected]> and contributors.
# All Rights Reserved.
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from collections import Counter, OrderedDict
from difflib import unified_diff
from itertools import combinations
from operator import attrgetter
from pathlib import Path
import textwrap
import click
from boltons.cacheutils import cachedproperty
from tabulate import tabulate
from . import ContentDiffAboveThreshold, SizeDiffAboveThreshold, TooFewHeaders, logger
from .colorize import choice_style, subtitle_style
from .mailbox import open_box
from .strategy import apply_strategy
# Reference all tracked statistics and their definition.
STATS_DEF = OrderedDict(
[
("mail_found", "Total number of mails encountered from all mail sources."),
(
"mail_rejected",
"Number of mails individuality rejected because they were unparseable or "
"did not had enough metadata to compute hashes.",
),
(
"mail_retained",
"Number of valid mails parsed and retained for deduplication.",
),
("mail_hashes", "Number of unique hashes."),
(
"mail_unique",
"Number of unique mails (which where automaticcaly added to selection).",
),
(
"mail_duplicates",
"Number of duplicate mails (sum of mails in all duplicate sets with at "
"least 2 mails).",
),
(
"mail_skipped",
"Number of mails ignored in the selection phase because the whole set "
"they belongs to was skipped.",
),
("mail_discarded", "Number of mails discarded from the final selection."),
(
"mail_selected",
"Number of mails kept in the final selection on which the "
"action will be performed.",
),
(
"mail_copied",
"Number of mails copied from their original mailbox to another.",
),
("mail_moved", "Number of mails moved from their original mailbox to another."),
("mail_deleted", "Number of mails deleted from their mailbox in-place."),
("set_total", "Total number of duplicate sets."),
(
"set_single",
"Total number of sets containing a single mail and did not had to have a "
"strategy applied to. They were automatticaly kept in the final selection.",
),
(
"set_skipped_encoding",
"Number of sets skipped from the selection process because they had "
"encoding issues.",
),
(
"set_skipped_size",
"Number of sets skipped from the selection process because they were "
"too disimilar in size.",
),
(
"set_skipeed_content",
"Number of sets skipped from the selection process because they were "
"too disimilar in content.",
),
(
"set_skipped_strategy",
"Number of sets skipped from the selection process because the strategy "
"could not be applied.",
),
(
"set_deduplicated",
"Number of valid sets on which the selection strategy was successfully "
"applied.",
),
]
)
class DuplicateSet:
"""A duplicate set of mails sharing the same hash.
Implements all the safety checks required before we can apply any selection
strategy.
"""
def __init__(self, hash_key, mail_set, conf):
"""Load-up the duplicate set of mail and freeze pool.
Once loaded-up, the pool of parsed mails is considered frozen for the
rest of the duplicate set's life. This allow aggressive caching of lazy
instance attributes depending on the pool content.
"""
self.hash_key = hash_key
# Global config.
self.conf = conf
# Pool referencing all duplicated mails and their attributes.
self.pool = frozenset(mail_set)
# There is no point creating a duplicate set with a single mail.
assert self.size > 1
# Set metrics.
self.stats = Counter()
self.stats["mail_duplicates"] += self.size
logger.debug(f"{self!r} created.")
def __repr__(self):
""" Print internal raw states for debugging. """
return f"<{self.__class__.__name__} hash={self.hash_key} size={self.size}>"
@cachedproperty
def size(self):
""" Return the size of the duplicate set. """
return len(self.pool)
@cachedproperty
def newest_timestamp(self):
return max(map(attrgetter("timestamp"), self.pool))
@cachedproperty
def oldest_timestamp(self):
return min(map(attrgetter("timestamp"), self.pool))
@cachedproperty
def biggest_size(self):
return max(map(attrgetter("size"), self.pool))
@cachedproperty
def smallest_size(self):
return min(map(attrgetter("size"), self.pool))
def check_differences(self):
"""Ensures all mail differs in the limits imposed by size and content
thresholds.
Compare all mails of the duplicate set with each other, both in size
and content. Raise an error if we're not within the limits imposed by
the threshold settings.
"""
logger.info("Check mail differences are below the thresholds.")
if self.conf.size_threshold < 0:
logger.info("Skip checking for size differences.")
if self.conf.content_threshold < 0:
logger.info("Skip checking for content differences.")
if self.conf.size_threshold < 0 and self.conf.content_threshold < 0:
return
# Compute differences of mail against one another.
for mail_a, mail_b in combinations(self.pool, 2):
# Compare mails on size.
if self.conf.size_threshold > -1:
size_difference = abs(mail_a.size - mail_b.size)
logger.debug(
f"{mail_a!r} and {mail_b!r} differs by {size_difference} bytes "
"in size."
)
if size_difference > self.conf.size_threshold:
raise SizeDiffAboveThreshold
# Compare mails on content.
if self.conf.content_threshold > -1:
content_difference = self.diff(mail_a, mail_b)
logger.debug(
f"{mail_a!r} and {mail_b!r} differs by {content_difference} bytes "
"in content."
)
if content_difference > self.conf.content_threshold:
if self.conf.show_diff:
logger.info(self.pretty_diff(mail_a, mail_b))
raise ContentDiffAboveThreshold
def diff(self, mail_a, mail_b):
"""Return difference in bytes between two mails' normalized body.
TODO: rewrite the diff algorithm to not rely on naive unified diff
result parsing.
"""
return len(
"".join(
unified_diff(
mail_a.body_lines,
mail_b.body_lines,
# Ignore difference in filename lengths and timestamps.
fromfile="a",
PyPI 官网下载 | mail-deduplicate-6.0.2.tar.gz
版权申诉
84 浏览量
2022-01-28
09:42:40
上传
评论
收藏 27KB GZ 举报
挣扎的蓝藻
- 粉丝: 13w+
- 资源: 15万+
最新资源
- HTML+CSS制作的个人博客网页.zip
- IMG_20240521_094903.jpg
- 基于htmlde 爱心代码,但HTML网页源码.zip
- 51单片机实现LED流水灯
- 基于Python的obEspoir分布式游戏框架设计源码
- 基于python的机械设计实用计算器,可计算电动机,传动装置,V带轮,齿轮,轴,轴承的几何或者力,运动学参数数值+源码+开发文档
- 基于HTML +JavaScript的元旦倒计时代码.docx
- 【Unity资源免费分享】孩子益智小游戏unity 5x系列Baby Doll House Cleaning
- 【资源免费分享】集市游戏(uniyt案例)
- 数据整理结果 2023-12-7 192544 6.dta
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈