import re
from functools import lru_cache
from validate_email import validate_email
import ipaddress
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
import uuid
import struct
from jinja2 import Template
import time
import sys
printer = ""
# Well known regex mapping.
regex_map = {
"UNKNOWN": "",
"HTTP_HEADER_NAME": r'^:?[0-9a-zA-Z!#$%&\'*+-.^_|~\x60]+$',
"HTTP_HEADER_VALUE": r'^[^\u0000-\u0008\u000A-\u001F\u007F]*$',
"HEADER_STRING": r'^[^\u0000\u000A\u000D]*$'
}
class ValidatingMessage(object):
"""Wrap a proto message to cache validate functions with the message class name.
A validate function is defined per message class in protoc-gen-validate,
so we can reuse an already generated function for the same message class.
"""
def __init__(self, proto_message):
self.DESCRIPTOR = proto_message.DESCRIPTOR
def __hash__(self):
return hash(self.DESCRIPTOR.full_name)
def __eq__(self, other):
if isinstance(other, ValidatingMessage):
return self.DESCRIPTOR.full_name == other.DESCRIPTOR.full_name
else:
return False
def validate(proto_message):
return _validate_inner(ValidatingMessage(proto_message))
# Cache generated functions to avoid the performance issue caused by repeated proto messages,
# which generate the same functions repeatedly.
@lru_cache()
def _validate_inner(proto_message):
func = file_template(proto_message)
global printer
printer += func + "\n"
exec(func)
try:
return generate_validate
except NameError:
return locals()['generate_validate']
def print_validate(proto_message):
return "".join([s for s in printer.splitlines(True) if s.strip()])
def has_validate(field):
if field.GetOptions() is None:
return False
for option_descriptor, option_value in field.GetOptions().ListFields():
if option_descriptor.full_name == "validate.rules":
return True
return False
def byte_len(s):
try:
return len(s.encode('utf-8'))
except:
return len(s)
def _validateHostName(host):
if not host:
return False
if len(host) > 253:
return False
if host[-1] == '.':
host = host[:-1]
for part in host.split("."):
if len(part) == 0 or len(part) > 63:
return False
# Host names cannot begin or end with hyphens
if part[0] == "-" or part[-1] == '-':
return False
for r in part:
if (r < 'A' or r > 'Z') and (r < 'a' or r > 'z') and (r < '0' or r > '9') and r != '-':
return False
return True
def _validateEmail(addr):
if '<' in addr and '>' in addr: addr = addr.split("<")[1].split(">")[0]
if not validate_email(addr):
return False
if len(addr) > 254:
return False
parts = addr.split("@")
if len(parts[0]) > 64:
return False
return _validateHostName(parts[1])
def _has_field(message_pb, property_name):
# NOTE: As of proto3, HasField() only works for message fields, not for
# singular (non-message) fields. First try to use HasField and
# if it fails (with a ValueError) we manually consult the fields.
try:
return message_pb.HasField(property_name)
except:
all_fields = set([field.name for field in message_pb.DESCRIPTOR.fields])
return property_name in all_fields
def const_template(option_value, name):
const_tmpl = """{%- if str(o.string) and o.string.HasField('const') -%}
if {{ name }} != \"{{ o.string['const'] }}\":
raise ValidationFailed(\"{{ name }} not equal to {{ o.string['const'] }}\")
{%- elif str(o.bool) and o.bool['const'] != "" -%}
if {{ name }} != {{ o.bool['const'] }}:
raise ValidationFailed(\"{{ name }} not equal to {{ o.bool['const'] }}\")
{%- elif str(o.enum) and o.enum['const'] -%}
if {{ name }} != {{ o.enum['const'] }}:
raise ValidationFailed(\"{{ name }} not equal to {{ o.enum['const'] }}\")
{%- elif str(o.bytes) and o.bytes.HasField('const') -%}
{% if sys.version_info[0] >= 3 %}
if {{ name }} != {{ o.bytes['const'] }}:
raise ValidationFailed(\"{{ name }} not equal to {{ o.bytes['const'] }}\")
{% else %}
if {{ name }} != b\"{{ o.bytes['const'].encode('string_escape') }}\":
raise ValidationFailed(\"{{ name }} not equal to {{ o.bytes['const'].encode('string_escape') }}\")
{% endif %}
{%- endif -%}
"""
return Template(const_tmpl).render(sys=sys, o=option_value, name=name, str=str)
def in_template(value, name):
in_tmpl = """
{%- if value['in'] %}
if {{ name }} not in {{ value['in'] }}:
raise ValidationFailed(\"{{ name }} not in {{ value['in'] }}\")
{%- endif -%}
{%- if value['not_in'] %}
if {{ name }} in {{ value['not_in'] }}:
raise ValidationFailed(\"{{ name }} in {{ value['not_in'] }}\")
{%- endif -%}
"""
return Template(in_tmpl).render(value=value, name=name)
def string_template(option_value, name):
if option_value.string.well_known_regex:
known_regex_type = option_value.string.DESCRIPTOR.fields_by_name['well_known_regex'].enum_type
regex_value = option_value.string.well_known_regex
regex_name = known_regex_type.values_by_number[regex_value].name
if regex_name in ["HTTP_HEADER_NAME", "HTTP_HEADER_VALUE"] and not option_value.string.strict:
option_value.string.pattern = regex_map["HEADER_STRING"]
else:
option_value.string.pattern = regex_map[regex_name]
str_templ = """
{{ const_template(o, name) -}}
{{ in_template(o.string, name) -}}
{%- set s = o.string -%}
{%- if s['len'] %}
if len({{ name }}) != {{ s['len'] }}:
raise ValidationFailed(\"{{ name }} length does not equal {{ s['len'] }}\")
{%- endif -%}
{%- if s['min_len'] %}
if len({{ name }}) < {{ s['min_len'] }}:
raise ValidationFailed(\"{{ name }} length is less than {{ s['min_len'] }}\")
{%- endif -%}
{%- if s['max_len'] %}
if len({{ name }}) > {{ s['max_len'] }}:
raise ValidationFailed(\"{{ name }} length is more than {{ s['max_len'] }}\")
{%- endif -%}
{%- if s['len_bytes'] %}
if byte_len({{ name }}) != {{ s['len_bytes'] }}:
raise ValidationFailed(\"{{ name }} length does not equal {{ s['len_bytes'] }}\")
{%- endif -%}
{%- if s['min_bytes'] %}
if byte_len({{ name }}) < {{ s['min_bytes'] }}:
raise ValidationFailed(\"{{ name }} length is less than {{ s['min_bytes'] }}\")
{%- endif -%}
{%- if s['max_bytes'] %}
if byte_len({{ name }}) > {{ s['max_bytes'] }}:
raise ValidationFailed(\"{{ name }} length is greater than {{ s['max_bytes'] }}\")
{%- endif -%}
{%- if s['pattern'] %}
if re.search(r\'{{ s['pattern'] }}\', {{ name }}) is None:
raise ValidationFailed(\"{{ name }} pattern does not match {{ s['pattern'] }}\")
{%- endif -%}
{%- if s['prefix'] %}
if not {{ name }}.startswith(\"{{ s['prefix'] }}\"):
raise ValidationFailed(\"{{ name }} does not start with prefix {{ s['prefix'] }}\")
{%- endif -%}
{%- if s['suffix'] %}
if not {{ name }}.endswith(\"{{ s['suffix'] }}\"):
raise ValidationFailed(\"{{ name }} does not end with suffix {{ s['suffix'] }}\")
{%- endif -%}
{%- if s['contains'] %}
if not \"{{ s['contains'] }}\" in {{ name }}:
raise ValidationFailed(\"{{ name }} does not contain {{ s['contains'] }}\")
{%- endif -%}
{%- if s['not_contains'] %}
if \"{{ s['not_contains'] }}\" in {{ name }}:
raise ValidationFailed(\"{{ name }} contains {{ s['not_contains'] }}\")
{%- endif -%}
{%- if s['email'] %}
if not _validateEmail({{ name }}):
raise ValidationFailed(\"{{ name }} is not a valid email\")
{%- endif -%}
{%- if s['hostname'] %}
if not _validateHostName({{ name }}):
raise ValidationFailed(\"{{ name }} is not a valid email\")
{%- endif -%}
{%- if s['address'] %}
try:
ipaddress.ip_address({{ name }})
except ValueError:
if not _validateHostName({{ name }}):