/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 only,
* as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License version 2 for more details. A copy is
* included in the COPYING file that accompanied this code.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* GPL HEADER END
*/
/*
* lustre/ptlrpc/nrs.c
*
* Network Request Scheduler (NRS)
*
* Allows to reorder the handling of RPCs at servers.
*
* Author: Liang Zhen <liang@whamcloud.com>
* Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
*/
/**
* \addtogoup nrs
* @{
*/
#define DEBUG_SUBSYSTEM S_RPC
#include <obd_support.h>
#include <obd_class.h>
#include <lustre_net.h>
#include <lprocfs_status.h>
#include <linux/libcfs/libcfs.h>
#include "ptlrpc_internal.h"
/* XXX: This is just for liblustre. Remove the #if defined directive when the
* "cfs_" prefix is dropped from cfs_list_head. */
extern struct list_head ptlrpc_all_services;
/**
* NRS core object.
*/
struct nrs_core nrs_core;
static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
{
return policy->pol_desc->pd_ops->op_policy_init != NULL ?
policy->pol_desc->pd_ops->op_policy_init(policy) : 0;
}
static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
{
LASSERT(policy->pol_ref == 0);
LASSERT(policy->pol_req_queued == 0);
if (policy->pol_desc->pd_ops->op_policy_fini != NULL)
policy->pol_desc->pd_ops->op_policy_fini(policy);
}
static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy,
enum ptlrpc_nrs_ctl opc, void *arg)
{
/**
* The policy may be stopped, but the lprocfs files and
* ptlrpc_nrs_policy instances remain present until unregistration time.
* Do not perform the ctl operation if the policy is stopped, as
* policy->pol_private will be NULL in such a case.
*/
if (policy->pol_state == NRS_POL_STATE_STOPPED)
return -ENODEV;
return policy->pol_desc->pd_ops->op_policy_ctl != NULL ?
policy->pol_desc->pd_ops->op_policy_ctl(policy, opc, arg) :
-ENOSYS;
}
static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
{
struct ptlrpc_nrs *nrs = policy->pol_nrs;
if (policy->pol_desc->pd_ops->op_policy_stop != NULL) {
spin_unlock(&nrs->nrs_lock);
policy->pol_desc->pd_ops->op_policy_stop(policy);
spin_lock(&nrs->nrs_lock);
}
LASSERT(list_empty(&policy->pol_list_queued));
LASSERT(policy->pol_req_queued == 0 &&
policy->pol_req_started == 0);
policy->pol_private = NULL;
policy->pol_state = NRS_POL_STATE_STOPPED;
if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
module_put(policy->pol_desc->pd_owner);
}
static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
{
struct ptlrpc_nrs *nrs = policy->pol_nrs;
if (nrs->nrs_policy_fallback == policy && !nrs->nrs_stopping)
return -EPERM;
if (policy->pol_state == NRS_POL_STATE_STARTING)
return -EAGAIN;
/* In progress or already stopped */
if (policy->pol_state != NRS_POL_STATE_STARTED)
return 0;
policy->pol_state = NRS_POL_STATE_STOPPING;
/* Immediately make it invisible */
if (nrs->nrs_policy_primary == policy) {
nrs->nrs_policy_primary = NULL;
} else {
LASSERT(nrs->nrs_policy_fallback == policy);
nrs->nrs_policy_fallback = NULL;
}
/* I have the only refcount */
if (policy->pol_ref == 1)
nrs_policy_stop0(policy);
return 0;
}
/**
* Transitions the \a nrs NRS head's primary policy to
* ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no
* pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED.
*
* \param[in] nrs the NRS head to carry out this operation on
*/
static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
{
struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
if (tmp == NULL) {
return;
}
nrs->nrs_policy_primary = NULL;
LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
tmp->pol_state = NRS_POL_STATE_STOPPING;
if (tmp->pol_ref == 0)
nrs_policy_stop0(tmp);
}
/**
* Transitions a policy across the ptlrpc_nrs_pol_state range of values, in
* response to an lprocfs command to start a policy.
*
* If a primary policy different to the current one is specified, this function
* will transition the new policy to the
* ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTING and then to
* ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED, and will then transition
* the old primary policy (if there is one) to
* ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
* references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED.
*
* If the fallback policy is specified, this is taken to indicate an instruction
* to stop the current primary policy, without substituting it with another
* primary policy, so the primary policy (if any) is transitioned to
* ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
* references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
* this case, the fallback policy is only left active in the NRS head.
*/
static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
{
struct ptlrpc_nrs *nrs = policy->pol_nrs;
int rc = 0;
/**
* Don't allow multiple starting which is too complex, and has no real
* benefit.
*/
if (nrs->nrs_policy_starting)
return -EAGAIN;
LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
if (policy->pol_state == NRS_POL_STATE_STOPPING)
return -EAGAIN;
if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
/**
* This is for cases in which the user sets the policy to the
* fallback policy (currently fifo for all services); i.e. the
* user is resetting the policy to the default; so we stop the
* primary policy, if any.
*/
if (policy == nrs->nrs_policy_fallback) {
nrs_policy_stop_primary(nrs);
return 0;
}
/**
* If we reach here, we must be setting up the fallback policy
* at service startup time, and only a single policy with the
* nrs_policy_flags::PTLRPC_NRS_FL_FALLBACK flag set can
* register with NRS core.
*/
LASSERT(nrs->nrs_policy_fallback == NULL);
} else {
/**
* Shouldn't start primary policy if w/o fallback policy.
*/
if (nrs->nrs_policy_fallback == NULL)
return -EPERM;
if (policy->pol_state == NRS_POL_STATE_STARTED)
return 0;
}
/**
* Increase the module usage count for policies registering from other
* modules.
*/
if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
!try_module_get(policy->pol_desc->pd_owner)) {
atomic_dec(&policy->pol_desc->pd_refs);
CERROR("NRS: cannot get module for policy %s; is it alive?\n",
policy->pol_desc->pd_name);
return -ENODEV;
}
/**
* Serialize policy starting across the NRS head
*/
nrs->nrs_policy_starting = 1;
policy->pol_state = NRS_POL_STATE_STARTING;
if (policy->pol_desc->pd_ops->op_policy_start) {
spin_unlock(&nrs->nrs_lock);
rc = policy->pol_desc->pd_ops->op_policy_start(policy);
spin_lock(&nrs->nrs_lock);
if (rc != 0) {
if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
module_put(policy->pol_desc->pd_owner);
policy->pol_state = NRS_POL_STATE_STOPPED;
GOTO(out, rc);
}
}
policy->pol_state = NRS_POL_STATE_STARTED;
if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
/**
* This path is only used at PTLRPC service setup time.
*/
nrs->nrs_policy_fallback = policy;
} else {
/*
* Try to stop the current primary policy if there is one.
*/
nrs_policy_stop_primary(nrs);
/**
* And set