/* Redis Cluster implementation.
*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include "cluster.h"
#include "endianconv.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <math.h>
/* A global reference to myself is handy to make code more clear.
* Myself always points to server.cluster->myself, that is, the clusterNode
* that represents this node. */
clusterNode *myself = NULL;
clusterNode *createClusterNode(char *nodename, int flags);
int clusterAddNode(clusterNode *node);
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void clusterSendPing(clusterLink *link, int type);
void clusterSendFail(char *nodename);
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(int filter);
clusterNode *clusterLookupNode(char *name);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetMaster(clusterNode *n);
void clusterHandleSlaveFailover(void);
void clusterHandleSlaveMigration(int max_slaves);
int bitmapTestBit(unsigned char *bitmap, int pos);
void clusterDoBeforeSleep(int flags);
void clusterSendUpdate(clusterLink *link, clusterNode *node);
void resetManualFailover(void);
void clusterCloseAllSlots(void);
void clusterSetNodeAsMaster(clusterNode *n);
void clusterDelNode(clusterNode *delnode);
sds representClusterNodeFlags(sds ci, uint16_t flags);
uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
/* -----------------------------------------------------------------------------
* Initialization
* -------------------------------------------------------------------------- */
/* Load the cluster config from 'filename'.
*
* If the file does not exist or is zero-length (this may happen because
* when we lock the nodes.conf file, we create a zero-length one for the
* sake of locking if it does not already exist), C_ERR is returned.
* If the configuration was loaded from the file, C_OK is returned. */
int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r");
struct stat sb;
char *line;
int maxline, j;
if (fp == NULL) {
if (errno == ENOENT) {
return C_ERR;
} else {
serverLog(LL_WARNING,
"Loading the cluster node config from %s: %s",
filename, strerror(errno));
exit(1);
}
}
/* Check if the file is zero-length: if so return C_ERR to signal
* we have to write the config. */
if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
fclose(fp);
return C_ERR;
}
/* Parse the file. Note that single lines of the cluster config file can
* be really long as they include all the hash slots of the node.
* This means in the worst possible case, half of the Redis slots will be
* present in a single line, possibly in importing or migrating state, so
* together with the node ID of the sender/receiver.
*
* To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
maxline = 1024+CLUSTER_SLOTS*128;
line = zmalloc(maxline);
while(fgets(line,maxline,fp) != NULL) {
int argc;
sds *argv;
clusterNode *n, *master;
char *p, *s;
/* Skip blank lines, they can be created either by users manually
* editing nodes.conf or by the config writing process if stopped
* before the truncate() call. */
if (line[0] == '\n' || line[0] == '\0') continue;
/* Split the line into arguments for processing. */
argv = sdssplitargs(line,&argc);
if (argv == NULL) goto fmterr;
/* Handle the special "vars" line. Don't pretend it is the last
* line even if it actually is when generated by Redis. */
if (strcasecmp(argv[0],"vars") == 0) {
for (j = 1; j < argc; j += 2) {
if (strcasecmp(argv[j],"currentEpoch") == 0) {
server.cluster->currentEpoch =
strtoull(argv[j+1],NULL,10);
} else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
server.cluster->lastVoteEpoch =
strtoull(argv[j+1],NULL,10);
} else {
serverLog(LL_WARNING,
"Skipping unknown cluster config variable '%s'",
argv[j]);
}
}
sdsfreesplitres(argv,argc);
continue;
}
/* Regular config lines have at least eight fields */
if (argc < 8) goto fmterr;
/* Create this node if it does not exist */
n = clusterLookupNode(argv[0]);
if (!n) {
n = createClusterNode(argv[0],0);
clusterAddNode(n);
}
/* Address and port */
if ((p = strrchr(argv[1],':')) == NULL) goto fmterr;
*p = '\0';
memcpy(n->ip,argv[1],strlen(argv[1])+1);
char *port = p+1;
char *busp = strchr(port,'@');
if (busp) {
*busp = '\0';
busp++;
}
n->port = atoi(port);
/* In older versions of nodes.conf the "@busport" part is missing.
* In this case we set it to the default offset of 10000 from the
* base port. */
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
/* Parse flags */
p = s = argv[2];
while(p) {
p = strchr(s,',');
if (p) *p = '\0';
if (!strcasecmp(s,"myself")) {
serverAssert(server.cluster->myself == NULL);
myself = server.cluster->myself = n;
n->flags |= CLUSTER_NODE_MYSELF;
} else if (!strcasecmp(s,"master")) {
n->flags |= CLUSTER_NODE_MASTER;
} else if (!strcasecmp(s,"slave")) {
n->flags |= CLUSTER_NODE_SLAVE;
} else if (!strcasecmp(s,"fail?")) {
n->flags |= C