# copyright 2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""cubicweb-vcreview specific hooks and operations"""
__docformat__ = "restructuredtext en"
import re
from cubicweb import RegistryException
from cubicweb.server import hook
from cubicweb.selectors import is_instance, on_fire_transition
from cubicweb.hooks import notification
IGNORE_FILES = set( ('README', 'series', 'status', '.hgignore', '.hgtags') )
def fire(patch, trname, comment):
patch.cw_adapt_to('IWorkflowable').fire_transition(trname, comment)
class LinkOrCreatePatchOp(hook.DataOperationMixIn, hook.Operation):
"""operation to track patch's state according to incoming revisions"""
containercls = list # operation order matters
def precommit_event(self):
for entity in self.get_data():
patch = None
# if the revision is a renaming another one which is linked to the
# patch (take care DeletedVersionContent has no vc_rename relation)
if getattr(entity, 'vc_rename', None) and entity.vc_rename[0].patch:
patch = entity.vc_rename[0].patch
else:
# search for previous revision of the patch file
for parent in entity.previous_versions():
if (entity.__regid__ == 'DeletedVersionContent' and
getattr(parent, 'reverse_vc_rename', None)):
return # skip file deleted because of renaming
if parent.patch:
patch = parent.patch
break
# create a new patch entity if no one found or if found a closed
# patch and this is not a file deletion
if patch is None or (not patch.is_active() and entity.__regid__ == 'VersionContent'):
revision = entity.rev
patch = self.session.create_entity(
'Patch', originator=revision.author, branch=revision.branch,
patch_repository=revision.from_repository, patch_name=entity.file.path)
patch.set_relations(patch_revision=entity)
# patch file is being removed: mark it on the patch entity if still
# active
if (entity.__regid__ == 'DeletedVersionContent'
and entity.rev.branch == patch.branch
and patch.is_active()):
MarkPatchAsDeletedOp.get_instance(self.session).add_data(patch)
class LinkOrCreatePatchHook(hook.Hook):
__regid__ = 'vcreview.create-patch'
__select__ = hook.Hook.__select__ & is_instance('VersionContent',
'DeletedVersionContent')
events = ('after_add_entity',)
def __call__(self):
entity = self.entity
# skip file not from a patch repository
if not entity.repository.patchrepo_of:
return
if entity.file.name in IGNORE_FILES:
return
LinkOrCreatePatchOp.get_instance(self._cw).add_data(entity)
class InitPatchCreatorHook(hook.Hook):
__regid__ = 'vcreview.patch.created'
__select__ = hook.Hook.__select__ & is_instance('Patch')
events = ('before_add_entity',)
category = 'metadata'
def __call__(self):
email = self.entity.originator.rsplit('<')[-1].rsplit('>')[0]
rset = self._cw.execute('Any X WHERE X is CWUser, X use_email E, '
'E address %(email)s', {'email': email})
relations = []
for ueid, in rset:
relations.append( (self.entity.eid, ueid) )
# ensure user is in the nosy-list
self._cw.execute('SET P nosy_list U WHERE P eid %(p)s, U eid %(u)s,'
'NOT P nosy_list U',
{'p': self.entity.eid, 'u': ueid})
if relations:
self._cw.add_relations([('created_by', relations),
('owned_by', relations)])
# use late operation to be executed after workflow operation
class SearchPatchStateInstructionOp(hook.DataOperationMixIn,
hook.LateOperation):
"""search magic word in revision's commit message:
<patch path> ready for review
reject <patch path>
fold <patch path>
When found, the patch will be marked as pending review. You can put multiple
instructions like this, one per line.
"""
containercls = list
def precommit_event(self):
for rev in self.get_data():
# search patches among files modified by the revision
readycandidates = {}
pendingcandidates = {}
rejectcandidates = {}
for vc in rev.reverse_from_revision:
if not vc.patch:
continue
pstate = vc.patch.cw_adapt_to('IWorkflowable').state
if vc.__regid__ == 'VersionContent':
if pstate == 'in-progress':
readycandidates[vc.file.path.lower()] = vc.patch
elif pstate in ('pending-review', 'reviewed'):
pendingcandidates[vc.file.path.lower()] = vc.patch
elif (pstate in vc.patch.non_final_states and
vc.__regid__ == 'DeletedVersionContent'):
rejectcandidates[vc.file.path.lower()] = vc.patch
interestingwords = set( ('ready', 'applied', 'reject', 'fold') )
interestingwords.update(readycandidates)
interestingwords.update(pendingcandidates)
interestingwords.update(rejectcandidates)
# search for instruction
for line in rev.description.splitlines():
words = [w.lower() for w in re.findall('[-\.\w]+', line.strip())
if w.lower() in interestingwords]
if len(words) < 2:
continue
if words[1] in ('ready', 'pending') and words[0] in readycandidates:
fire(readycandidates.pop(words[0]),
'ask review', u'review asked by %s in %s' % (
rev.author, rev.dc_title()))
elif words[0] in ('refuse', 'refused') and words[1] in pendingcandidates:
patch = pendingcandidates.pop(words[1])
fire(patch, 'ask rework', u'refused by %s in %s' % (
rev.author, rev.dc_title()))
elif words[0] in ('reject', 'rejected') and words[1] in rejectcandidates:
patch = rejectcandidates.pop(words[1])
fire(patch, 'reject', u'rejected by %s in %s' % (
rev.author, rev.dc_title()))
self.ensure_wont_be_deleted(patch)
elif words[0] in ('fold', 'folded') and words[1] in rejectcandidates:
patch = rejectcandidates.pop(words[1])
fire(patch, 'fold', u'folded by %s in %s' % (
rev.author, rev.dc_title()))
self.ensure_wont_be_deleted(patch)
elif words[0] in ('apply', 'applied') and words[1] in rejectcandidates:
patch = rejectcandidates.pop(words[1])
fire(patch, '