/*
* Copyright (c) 2008-2014 Intel Corporation. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* OpenIB.org BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*/
#if HAVE_CONFIG_H
# include <config.h>
#endif /* HAVE_CONFIG_H */
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <stdarg.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stddef.h>
#include <string.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <search.h>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <rdma/rsocket.h>
#include "cma.h"
#include "indexer.h"
#define RS_OLAP_START_SIZE 2048
#define RS_MAX_TRANSFER 65536
#define RS_SNDLOWAT 2048
#define RS_QP_MIN_SIZE 16
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4 /* must be power of 2 */
#define RS_CONN_RETRIES 6
#define RS_SGL_SIZE 2
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
struct rsocket;
enum {
RS_SVC_NOOP,
RS_SVC_ADD_DGRAM,
RS_SVC_REM_DGRAM,
RS_SVC_ADD_KEEPALIVE,
RS_SVC_REM_KEEPALIVE,
RS_SVC_MOD_KEEPALIVE
};
struct rs_svc_msg {
uint32_t cmd;
uint32_t status;
struct rsocket *rs;
};
struct rs_svc {
pthread_t id;
int sock[2];
int cnt;
int size;
int context_size;
void *(*run)(void *svc);
struct rsocket **rss;
void *contexts;
};
static struct pollfd *udp_svc_fds;
static void *udp_svc_run(void *arg);
static struct rs_svc udp_svc = {
.context_size = sizeof(*udp_svc_fds),
.run = udp_svc_run
};
static uint32_t *tcp_svc_timeouts;
static void *tcp_svc_run(void *arg);
static struct rs_svc tcp_svc = {
.context_size = sizeof(*tcp_svc_timeouts),
.run = tcp_svc_run
};
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
static uint16_t def_rqsize = 384;
static uint32_t def_mem = (1 << 17);
static uint32_t def_wmem = (1 << 17);
static uint32_t polling_time = 10;
/*
* Immediate data format is determined by the upper bits
* bit 31: message type, 0 - data, 1 - control
* bit 30: buffers updated, 0 - target, 1 - direct-receive
* bit 29: more data, 0 - end of transfer, 1 - more data available
*
* for data transfers:
* bits [28:0]: bytes transferred
* for control messages:
* SGL, CTRL
* bits [28-0]: receive credits granted
* IOMAP_SGL
* bits [28-16]: reserved, bits [15-0]: index
*/
enum {
RS_OP_DATA,
RS_OP_RSVD_DATA_MORE,
RS_OP_WRITE, /* opcode is not transmitted over the network */
RS_OP_RSVD_DRA_MORE,
RS_OP_SGL,
RS_OP_RSVD,
RS_OP_IOMAP_SGL,
RS_OP_CTRL
};
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
#define RS_MSG_SIZE sizeof(uint32_t)
#define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
#define RS_WR_ID_FLAG_MSG_SEND (((uint64_t) 1) << 62) /* See RS_OPT_MSG_SEND */
#define rs_send_wr_id(data) ((uint64_t) data)
#define rs_recv_wr_id(data) (RS_WR_ID_FLAG_RECV | (uint64_t) data)
#define rs_wr_is_recv(wr_id) (wr_id & RS_WR_ID_FLAG_RECV)
#define rs_wr_is_msg_send(wr_id) (wr_id & RS_WR_ID_FLAG_MSG_SEND)
#define rs_wr_data(wr_id) ((uint32_t) wr_id)
enum {
RS_CTRL_DISCONNECT,
RS_CTRL_KEEPALIVE,
RS_CTRL_SHUTDOWN
};
struct rs_msg {
uint32_t op;
uint32_t data;
};
struct ds_qp;
struct ds_rmsg {
struct ds_qp *qp;
uint32_t offset;
uint32_t length;
};
struct ds_smsg {
struct ds_smsg *next;
};
struct rs_sge {
uint64_t addr;
uint32_t key;
uint32_t length;
};
struct rs_iomap {
uint64_t offset;
struct rs_sge sge;
};
struct rs_iomap_mr {
uint64_t offset;
struct ibv_mr *mr;
dlist_entry entry;
atomic_t refcnt;
int index; /* -1 if mapping is local and not in iomap_list */
};
#define RS_MAX_CTRL_MSG (sizeof(struct rs_sge))
#define rs_host_is_net() (1 == htonl(1))
#define RS_CONN_FLAG_NET (1 << 0)
#define RS_CONN_FLAG_IOMAP (1 << 1)
struct rs_conn_data {
uint8_t version;
uint8_t flags;
uint16_t credits;
uint8_t reserved[3];
uint8_t target_iomap_size;
struct rs_sge target_sgl;
struct rs_sge data_buf;
};
struct rs_conn_private_data {
union {
struct rs_conn_data conn_data;
struct {
struct ib_connect_hdr ib_hdr;
struct rs_conn_data conn_data;
} af_ib;
};
};
/*
* rsocket states are ordered as passive, connecting, connected, disconnected.
*/
enum rs_state {
rs_init,
rs_bound = 0x0001,
rs_listening = 0x0002,
rs_opening = 0x0004,
rs_resolving_addr = rs_opening | 0x0010,
rs_resolving_route = rs_opening | 0x0020,
rs_connecting = rs_opening | 0x0040,
rs_accepting = rs_opening | 0x0080,
rs_connected = 0x0100,
rs_writable = 0x0200,
rs_readable = 0x0400,
rs_connect_rdwr = rs_connected | rs_readable | rs_writable,
rs_connect_error = 0x0800,
rs_disconnected = 0x1000,
rs_error = 0x2000,
};
#define RS_OPT_SWAP_SGL (1 << 0)
/*
* iWarp does not support RDMA write with immediate data. For iWarp, we
* transfer rsocket messages as inline sends.
*/
#define RS_OPT_MSG_SEND (1 << 1)
#define RS_OPT_SVC_ACTIVE (1 << 2)
union socket_addr {
struct sockaddr sa;
struct sockaddr_in sin;
struct sockaddr_in6 sin6;
};
struct ds_header {
uint8_t version;
uint8_t length;
uint16_t port;
union {
uint32_t ipv4;
struct {
uint32_t flowinfo;
uint8_t addr[16];
} ipv6;
} addr;
};
#define DS_IPV4_HDR_LEN 8
#define DS_IPV6_HDR_LEN 24
struct ds_dest {
union socket_addr addr; /* must be first */
struct ds_qp *qp;
struct ibv_ah *ah;
uint32_t qpn;
};
struct ds_qp {
dlist_entry list;
struct rsocket *rs;
struct rdma_cm_id *cm_id;
struct ds_header hdr;
struct ds_dest dest;
struct ibv_mr *smr;
struct ibv_mr *rmr;
uint8_t *rbuf;
int cq_armed;
};
struct rsocket {
int type;
int index;
fastlock_t slock;
fastlock_t rlock;
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
fastlock_t map_lock; /* acquire slock first if needed */
union {
/* data stream */
struct {
struct rdma_cm_id *cm_id;
uint64_t tcp_opts;
unsigned int keepalive_time;
unsigned int ctrl_seqno;
unsigned int ctrl_max_seqno;
uint16_t sseq_no;
uint16_t sseq_comp;
uint16_t rseq_no;
uint16_t rseq_comp;
int remote_sge;
struct rs_sge remote_sgl;
struct rs_sge remote_iomap;
struct ibv_mr *target_mr;
int target_sge;
int target_iomap_size;
void *target_buffer_list;
volatile struct rs_sge *target_sgl;
struct rs_iomap *target_iomap;
int rbuf_msg_index;
int rbuf_bytes_avail;
int rbuf_free_offset;
int rbuf_offset;
struct ibv_mr