/*
Copyright (c) 2008, 2010 QUE Hongyu
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
*/
/* Changelog:
* 2008-08-20, coding started
* 2008-09-04, v0.1 finished
* 2008-09-07, v0.2 finished, code cleanup, drive_get_server function
* 2008-09-09, support get/gets' multi keys(can > 7 keys)
* 2008-09-10, ketama allocation
* 2008-09-12, backup server added
* 2008-09-12, try backup server for get/gets command
* 2008-09-16, v0.3 finished
* 2008-09-20, support unix domain socket
* 2008-09-23, write "END\r\n" with the last packet of GET/GETS response
* 2008-09-23, combine drive_get_server with drive_server functions -> drive_memcached_server function
* 2008-10-05, fix header file include under BSD systems
*/
#define _GNU_SOURCE
#include <sys/types.h>
#if defined(__FreeBSD__)
#include <sys/uio.h>
#include <limits.h>
#else
#include <getopt.h>
#endif
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <net/if.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <fcntl.h>
#include <time.h>
#include <sys/ioctl.h>
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <event.h>
#include "ketama.h"
#define VERSION "0.6"
#define OUTOFCONN "SERVER_ERROR OUT OF CONNECTION"
#define BUFFERLEN 2048
#define MAX_TOKENS 8
#define COMMAND_TOKEN 0
#define KEY_TOKEN 1
#define BYTES_TOKEN 4
#define KEY_MAX_LENGTH 250
#define BUFFER_PIECE_SIZE 16
#define UNUSED(x) ( (void)(x) )
#define STEP 5
/* structure definitions */
typedef struct conn conn;
typedef struct matrix matrix;
typedef struct list list;
typedef struct buffer buffer;
typedef struct server server;
typedef enum {
CLIENT_COMMAND,
CLIENT_NREAD, /* MORE CLIENT DATA */
CLIENT_TRANSCATION
} client_state_t;
typedef enum {
SERVER_INIT,
SERVER_CONNECTING,
SERVER_CONNECTED,
SERVER_ERROR
} server_state_t;
struct buffer {
char *ptr;
size_t used;
size_t size;
size_t len; /* ptr length */
struct buffer *next;
};
/* list to buffers */
struct list {
buffer *first;
buffer *last;
};
/* connection to memcached server */
struct server {
int sfd;
server_state_t state;
struct event ev;
int ev_flags;
matrix *owner;
/* first response line
* NOT_FOUND\r\n
* STORED\r\n
*/
char line[BUFFERLEN];
int pos;
/* get/gets key ....
* VALUE <key> <flags> <bytes> [<cas unique>]\r\n
*/
int valuebytes;
int has_response_header:1;
int remove_trail:1;
/* input buffer */
list *request;
/* output buffer */
list *response;
int pool_idx;
};
struct conn {
/* client part */
int cfd;
client_state_t state;
struct event ev;
int ev_flags;
/* command buffer */
char line[BUFFERLEN+1];
int pos;
int storebytes; /* bytes stored by CAS/SET/ADD/... command */
struct flag {
unsigned int is_get_cmd:1;
unsigned int is_gets_cmd:1;
unsigned int is_set_cmd:1;
unsigned int is_incr_decr_cmd:1;
unsigned int no_reply:1;
unsigned int is_update_cmd:1;
unsigned int is_backup:1;
unsigned int is_last_key:1;
} flag;
int keycount; /* GET/GETS multi keys */
int keyidx;
char **keys;
/* input buffer */
list *request;
/* output buffer */
list *response;
struct server *srv;
};
/* memcached server structure */
struct matrix {
char *ip;
int port;
struct sockaddr_in dstaddr;
int size;
int used;
struct server **pool;
};
typedef struct token_s {
char *value;
size_t length;
} token_t;
/* static variables */
static int port = 11211, maxconns = 4096, curconns = 0, sockfd = -1, verbose_mode = 0, use_ketama = 0;
static struct event ev_master;
static struct matrix *matrixs = NULL; /* memcached server list */
static int matrixcnt = 0;
static struct ketama *ketama = NULL;
static struct matrix *backups= NULL; /* backup memcached server list */
static int backupcnt = 0;
static struct ketama *backupkt = NULL;
static char *socketpath = NULL;
static int unixfd = -1;
static struct event ev_unix;
static int maxidle = 20; /* max keep alive connections for one memcached server */
static struct event ev_timer;
time_t cur_ts;
char cur_ts_str[128];
static void drive_client(const int, const short, void *);
static void drive_backup_server(const int, const short, void *);
static void drive_memcached_server(const int, const short, void *);
static void finish_transcation(conn *);
static void do_transcation(conn *);
static void start_magent_transcation(conn *);
static void out_string(conn *, const char *);
static void process_update_response(conn *);
static void process_get_response(conn *, int);
static void append_buffer_to_list(list *, buffer *);
static void try_backup_server(conn *);
static void
show_help(void)
{
char *b = "memcached agent v" VERSION " Build-Date: " __DATE__ " " __TIME__ "\n"
"Usage:\n -h this message\n"
" -u uid\n"
" -g gid\n"
" -p port, default is 11211. (0 to disable tcp support)\n"
" -s ip:port, set memcached server ip and port\n"
" -b ip:port, set backup memcached server ip and port\n"
" -l ip, local bind ip address, default is 0.0.0.0\n"
" -n number, set max connections, default is 4096\n"
" -D don't go to background\n"
" -k use ketama key allocation algorithm\n"
" -f file, unix socket path to listen on. default is off\n"
" -i number, set max keep alive connections for one memcached server, default is 20\n"
" -v verbose\n"
"\n";
fprintf(stderr, b, strlen(b));
}
/* the famous DJB hash function for strings from stat_cache.c*/
static int
hashme(char *str)
{
unsigned int hash = 5381;
const char *s;
if (str == NULL) return 0;
for (s = str; *s; s++) {
hash = ((hash << 5) + hash) + *s;
}
hash &= 0x7FFFFFFF; /* strip the highest bit */
return hash;
}
static buffer *
buffer_init_size(int size)
{
buffer *b;
if (size <= 0) return NULL;
b = (struct buffer *) calloc(sizeof(struct buffer), 1);
if (b == NULL) return NULL;
size += BUFFER_PIECE_SIZE - (size % BUFFER_PIECE_SIZE);
b->ptr = (char *) calloc(1, size);
if (b->ptr == NULL) {
free(b);
return NULL;
}
b->len = size;
return b;
}
static void
buffer_free(buffer *b)
{
if (!b) return;
free(b->ptr);
free(b);
}
static list *
list_init(void)
{
list *l;
l = (struct list *) calloc(sizeof(struct list), 1);
return l;
}
static void
list_free(list *l, int keep_list)
{
buffer *b, *n;
if (l == NULL) return;
b = l->first;
while(b) {
n = b->next;
buffer_free(b);
b = n;
}
if (keep_list)
l->first = l->last = NULL;
else
free(l);
}
static void
remove_finished_buffers(list *l)
{
buffer *n, *b;
if (l == NULL) return;
b = l->first;
while(b) {
if (b->used < b->size) /* incompleted buffer */
break;
n = b->next;
buffer_free(b);
b = n;
}
if (b == NULL) {