from __future__ import with_statement
from itertools import chain
import datetime
import sys
import warnings
import time
import threading
import time as mod_time
import hashlib
from redis._compat import (b, basestring, bytes, imap, iteritems, iterkeys,
itervalues, izip, long, nativestr, unicode,
safe_unicode)
from redis.connection import (ConnectionPool, UnixDomainSocketConnection,
SSLConnection, Token)
from redis.lock import Lock, LuaLock
from redis.exceptions import (
ConnectionError,
DataError,
ExecAbortError,
NoScriptError,
PubSubError,
RedisError,
ResponseError,
TimeoutError,
WatchError,
)
SYM_EMPTY = b('')
def list_or_args(keys, args):
# returns a single list combining keys and args
try:
iter(keys)
# a string or bytes instance can be iterated, but indicates
# keys wasn't passed as a list
if isinstance(keys, (basestring, bytes)):
keys = [keys]
except TypeError:
keys = [keys]
if args:
keys.extend(args)
return keys
def timestamp_to_datetime(response):
"Converts a unix timestamp to a Python datetime object"
if not response:
return None
try:
response = int(response)
except ValueError:
return None
return datetime.datetime.fromtimestamp(response)
def string_keys_to_dict(key_string, callback):
return dict.fromkeys(key_string.split(), callback)
def dict_merge(*dicts):
merged = {}
for d in dicts:
merged.update(d)
return merged
def parse_debug_object(response):
"Parse the results of Redis's DEBUG OBJECT command into a Python dict"
# The 'type' of the object is the first item in the response, but isn't
# prefixed with a name
response = nativestr(response)
response = 'type:' + response
response = dict([kv.split(':') for kv in response.split()])
# parse some expected int values from the string response
# note: this cmd isn't spec'd so these may not appear in all redis versions
int_fields = ('refcount', 'serializedlength', 'lru', 'lru_seconds_idle')
for field in int_fields:
if field in response:
response[field] = int(response[field])
return response
def parse_object(response, infotype):
"Parse the results of an OBJECT command"
if infotype in ('idletime', 'refcount'):
return int_or_none(response)
return response
def parse_info(response):
"Parse the result of Redis's INFO command into a Python dict"
info = {}
response = nativestr(response)
def get_value(value):
if ',' not in value or '=' not in value:
try:
if '.' in value:
return float(value)
else:
return int(value)
except ValueError:
return value
else:
sub_dict = {}
for item in value.split(','):
k, v = item.rsplit('=', 1)
sub_dict[k] = get_value(v)
return sub_dict
for line in response.splitlines():
if line and not line.startswith('#'):
if line.find(':') != -1:
key, value = line.split(':', 1)
info[key] = get_value(value)
else:
# if the line isn't splittable, append it to the "__raw__" key
info.setdefault('__raw__', []).append(line)
return info
SENTINEL_STATE_TYPES = {
'can-failover-its-master': int,
'config-epoch': int,
'down-after-milliseconds': int,
'failover-timeout': int,
'info-refresh': int,
'last-hello-message': int,
'last-ok-ping-reply': int,
'last-ping-reply': int,
'last-ping-sent': int,
'master-link-down-time': int,
'master-port': int,
'num-other-sentinels': int,
'num-slaves': int,
'o-down-time': int,
'pending-commands': int,
'parallel-syncs': int,
'port': int,
'quorum': int,
'role-reported-time': int,
's-down-time': int,
'slave-priority': int,
'slave-repl-offset': int,
'voted-leader-epoch': int
}
def parse_sentinel_state(item):
result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES)
flags = set(result['flags'].split(','))
for name, flag in (('is_master', 'master'), ('is_slave', 'slave'),
('is_sdown', 's_down'), ('is_odown', 'o_down'),
('is_sentinel', 'sentinel'),
('is_disconnected', 'disconnected'),
('is_master_down', 'master_down')):
result[name] = flag in flags
return result
def parse_sentinel_master(response):
return parse_sentinel_state(imap(nativestr, response))
def parse_sentinel_masters(response):
result = {}
for item in response:
state = parse_sentinel_state(imap(nativestr, item))
result[state['name']] = state
return result
def parse_sentinel_slaves_and_sentinels(response):
return [parse_sentinel_state(imap(nativestr, item)) for item in response]
def parse_sentinel_get_master(response):
return response and (response[0], int(response[1])) or None
def pairs_to_dict(response):
"Create a dict given a list of key/value pairs"
it = iter(response)
return dict(izip(it, it))
def pairs_to_dict_typed(response, type_info):
it = iter(response)
result = {}
for key, value in izip(it, it):
if key in type_info:
try:
value = type_info[key](value)
except:
# if for some reason the value can't be coerced, just use
# the string value
pass
result[key] = value
return result
def zset_score_pairs(response, **options):
"""
If ``withscores`` is specified in the options, return the response as
a list of (value, score) pairs
"""
if not response or not options.get('withscores'):
return response
score_cast_func = options.get('score_cast_func', float)
it = iter(response)
return list(izip(it, imap(score_cast_func, it)))
def sort_return_tuples(response, **options):
"""
If ``groups`` is specified, return the response as a list of
n-element tuples with n being the value found in options['groups']
"""
if not response or not options['groups']:
return response
n = options['groups']
return list(izip(*[response[i::n] for i in range(n)]))
def int_or_none(response):
if response is None:
return None
return int(response)
def float_or_none(response):
if response is None:
return None
return float(response)
def bool_ok(response):
return nativestr(response) == 'OK'
def parse_client_list(response, **options):
clients = []
for c in nativestr(response).splitlines():
clients.append(dict([pair.split('=') for pair in c.split(' ')]))
return clients
def parse_config_get(response, **options):
response = [nativestr(i) if i is not None else None for i in response]
return response and pairs_to_dict(response) or {}
def parse_scan(response, **options):
cursor, r = response
return long(cursor), r
def parse_hscan(response, **options):
cursor, r = response
return long(cursor), r and pairs_to_dict(r) or {}
def parse_zscan(response, **options):
score_cast_func = options.get('score_cast_func', float)
cursor, r = response
it = iter(r)
return long(cursor), list(izip(it, imap(score_cast_func, it)))
def parse_slowlog_get(response, **options):
return [{
'id': item[0],
'start_time': int(item[1]),
'duration': int(item[2]),
'command': b(' ').join(item[3])
} for item in response]
def parse_cluster_info(response, **options):
return dict([line.split(':') for line in response.splitlines() if line])
def _parse_node_line(line):
line_items = line.split(' ')
node_id,