/*
* (C) 2007-2010 Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
*
* Version: $Id: dataservice.cpp 5 2010-09-29 07:44:56Z duanfei@taobao.com $
*
* Authors:
* duolong <duolong@taobao.com>
* - initial release
* qushan<qushan@taobao.com>
* - modify 2009-03-27
*
*/
#include "dataservice.h"
#include "common/func.h"
#include <Memory.hpp>
namespace tfs
{
namespace dataserver
{
using namespace common;
using namespace message;
DataService::DataService()
{
//init dataserver info
memset(&data_server_info_, 0, sizeof(DataServerStatInfo));
server_local_port_ = 0;
stop_ = 0;
need_send_blockinfo_[0] = 1;
need_send_blockinfo_[1] = 1;
set_flag_[0] = false;
set_flag_[1] = false;
hb_ip_port_[0] = 0;
hb_ip_port_[1] = 0;
ns_ip_port_ = 0; //nameserver ip port;
repl_block_ = NULL;
compact_block_ = NULL;
sync_mirror_ = NULL;
sync_mirror_status_ = 0;
hb_client_[0] = NULL;
hb_client_[1] = NULL;
client_ = NULL;
compact_client_ = NULL;
thread_pids_ = NULL;
max_cpu_usage_ = SYSPARAM_DATASERVER.max_cpu_usage_;
}
DataService::~DataService()
{
CLIENT_POOL.release_client(client_);
CLIENT_POOL.release_client(hb_client_[0]);
CLIENT_POOL.release_client(hb_client_[1]);
CLIENT_POOL.release_client(compact_client_);
client_ = NULL;
hb_client_[0] = NULL;
hb_client_[1] = NULL;
compact_client_ = NULL;
}
int DataService::init(const std::string& server_index)
{
server_index_ = server_index;
//set name server ip
int ret = set_ns_ip();
if (TFS_SUCCESS != ret)
{
return ret;
}
data_server_info_.startup_time_ = time(NULL);
IpAddr* adr = reinterpret_cast<IpAddr*>(&data_server_info_.id_);
adr->ip_ = Func::get_local_addr(SYSPARAM_DATASERVER.dev_name_);
adr->port_ = SYSPARAM_DATASERVER.local_ds_port_;
TBSYS_LOG(INFO, "dataserver listen port: %d", adr->port_);
server_local_port_ = adr->port_;
//init file number to management
uint64_t file_number = ((adr->ip_ & 0xFFFFFF00) | (adr->port_ & 0xFF));
file_number = file_number << 32;
data_management_.set_file_number(file_number);
ds_requester_.init(data_server_info_.id_, ns_ip_port_, &data_management_);
return TFS_SUCCESS;
}
int DataService::set_ns_ip()
{
ns_ip_port_ = 0;
IpAddr* adr = reinterpret_cast<IpAddr*> (&ns_ip_port_);
uint32_t ip = Func::get_addr(SYSPARAM_DATASERVER.local_ns_ip_);
if (0 == ip)
{
TBSYS_LOG(ERROR, "nameserver ip is error.");
}
else
{
adr->ip_ = ip;
adr->port_ = SYSPARAM_DATASERVER.local_ns_port_;
}
char* ip_list = SYSPARAM_DATASERVER.ns_addr_list_;
if (NULL == ip_list)
{
TBSYS_LOG(ERROR, "nameserver real ip list is error");
}
else
{
std::vector <uint64_t> ns_ip_list;
int32_t buffer_len = 256;
char buffer[buffer_len];
memset(buffer, 0, sizeof(buffer));
strncpy(buffer, ip_list, buffer_len);
char* t = NULL;
char* s = buffer;
while (NULL != (t = strsep(&s, "|")))
{
ns_ip_list.push_back(Func::get_addr(t));
}
if (ns_ip_list.size() < 1)
{
TBSYS_LOG(WARN, "must have one ns, check your ns' list");
return TFS_ERROR;
}
if (ns_ip_list.size() != 2)
{
TBSYS_LOG(DEBUG, "must have two ns, check your ns' list");
need_send_blockinfo_[1] = 0;
}
for (uint32_t i = 0; i < ns_ip_list.size(); ++i)
{
adr = reinterpret_cast<IpAddr*>(&hb_ip_port_[i]);
adr->ip_ = ns_ip_list[i];
if (0 == adr->ip_)
{
TBSYS_LOG(ERROR, "nameserver real ip: %s list is error", ip_list);
if (0 == i)
{
return TFS_ERROR;
}
}
else
{
adr->port_ = (reinterpret_cast<IpAddr*>(&ns_ip_port_))->port_;
}
set_flag_[i] = true;
if (1 == i)
{
break;
}
}
}
return TFS_SUCCESS;
}
int DataService::start(VINT* pids)
{
client_ = CLIENT_POOL.get_client(ns_ip_port_);
hb_client_[0] = CLIENT_POOL.get_client(hb_ip_port_[0]);
hb_client_[1] = CLIENT_POOL.get_client(hb_ip_port_[1]);
compact_client_ = CLIENT_POOL.get_client(ns_ip_port_);
thread_pids_ = pids;
repl_block_ = new ReplicateBlock(&client_mutex_, client_);
compact_block_ = new CompactBlock(&compact_mutext_, compact_client_, data_server_info_.id_);
//backup type:1.tfs 2.nfs
int backup_type = SYSPARAM_DATASERVER.tfs_backup_type_;
TBSYS_LOG(INFO, "backup type: %d\n", SYSPARAM_DATASERVER.tfs_backup_type_);
sync_mirror_ = new SyncBase(backup_type);
int ret = data_management_.init_block_files(SysParam::instance().filesystem_param());
if (TFS_SUCCESS != ret)
{
TBSYS_LOG(ERROR, "dataservice::start, init block files fail! ret: %d\n", ret);
return ret;
}
data_management_.get_ds_filesystem_info(data_server_info_.block_count_, data_server_info_.use_capacity_,
data_server_info_.total_capacity_);
data_server_info_.current_load_ = Func::get_load_avg();
TBSYS_LOG(INFO,
"block file status, block count: %d, use capacity: %" PRI64_PREFIX "d, total capacity: %" PRI64_PREFIX "d",
data_server_info_.block_count_, data_server_info_.use_capacity_, data_server_info_.total_capacity_);
block_checker_.init(data_server_info_.id_, &ds_requester_);
//start connect namesever
if (0 == ns_ip_port_)
{
TBSYS_LOG(ERROR, "nameserver ip not set.");
return TFS_ERROR;
}
//retry
while (0 == stop_ && TFS_SUCCESS != client_->connect())
{
TBSYS_LOG(ERROR, "connect to nameserver fail, sleep 5s, retry\n");
Func::sleep(5, &stop_);
}
if (stop_)
return TFS_ERROR;
// heartbeat
while (0 == stop_ && (TFS_SUCCESS != hb_client_[0]->connect() && TFS_SUCCESS != hb_client_[1]->connect()))
{
TBSYS_LOG(ERROR, "hb connect to nameserver fail, sleep 5s, retry\n");
Func::sleep(5, &stop_);
}
if (stop_)
return TFS_ERROR;
// compact
while (0 == stop_ && TFS_SUCCESS != compact_client_->connect())
{
TBSYS_LOG(ERROR, "compact connect to nameserver fail, sleep 5s, retry\n");
Func::sleep(5, &stop_);
}
if (stop_)
return TFS_ERROR;
pthread_t tid;
//start heartbeat thread
pthread_create(&tid, NULL, DataService::do_heart, this);
thread_pids_->push_back(tid);
//start check expire data thread
pthread_create(&tid, NULL, DataService::do_check, this);
thread_pids_->push_back(tid);
//start replicate thread
int32_t replicate_thread_count = SYSPARAM_DATASERVER.replicate_thread_count_;
for (int32_t i = 0; i < replicate_thread_count; ++i)
{
pthread_create(&tid, NULL, ReplicateBlock::do_replicate_block, repl_block_);
thread_pids_->push_back(tid);
}
//start compact thread
pthread_create(&tid, NULL, CompactBlock::do_compact_block, compact_block_);
thread_pids_->push_back(tid);
//start sync thread
pthread_create(&tid, NULL, SyncBase::do_sync_mirror, sync_mirror_);
thread_pids_->push_back(tid);
//set process thread num for client
int32_t thread_count = SYSPARAM_DATASERVER.client_thread_client_;
task_queue_thread_.setThreadParameter(thr
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
tfs-1.3 淘宝内部使用的分布式文件系统 代码 (271个子文件)
configure.ac 4KB
Makefile.am 3KB
Makefile.am 2KB
Makefile.am 1KB
Makefile.am 1KB
Makefile.am 1KB
Makefile.am 1KB
Makefile.am 1KB
Makefile.am 730B
Makefile.am 595B
Makefile.am 582B
Makefile.am 545B
Makefile.am 445B
Makefile.am 412B
Makefile.am 398B
Makefile.am 324B
Makefile.am 106B
Makefile.am 71B
Makefile.am 58B
Makefile.am 58B
ChangeLog 0B
tfs.conf 2KB
configure 749KB
COPYING 18KB
dataservice.cpp 62KB
tfstool.cpp 57KB
nameserver.cpp 51KB
blockfile_manager.cpp 42KB
replicate.cpp 36KB
meta_manager.cpp 35KB
tfs_file.cpp 32KB
test_logic_block_and_compact.cpp 32KB
showssm.cpp 31KB
ds_lib.cpp 31KB
test_logic_block.cpp 29KB
logic_block.cpp 28KB
heart_manager.cpp 28KB
block_info_message.cpp 27KB
data_management.cpp 26KB
server_status_message.cpp 25KB
test_blockfile_manager.cpp 22KB
sync_backup.cpp 21KB
layout_manager.cpp 21KB
index_handle.cpp 20KB
compact.cpp 18KB
test_index_handle.cpp 16KB
oplog_sync_manager.cpp 16KB
test_batch_mix.cpp 16KB
ds_client.cpp 15KB
compact_block.cpp 14KB
file_queue.cpp 14KB
util.cpp 14KB
adminserver.cpp 14KB
replicate_block.cpp 14KB
strategy.cpp 13KB
func.cpp 12KB
read_data_message.cpp 12KB
parameter.cpp 12KB
file_system_image.cpp 10KB
lease_clerk.cpp 9KB
test_batch_write.cpp 9KB
tfs_ioapi_cases.cpp 9KB
async_client.cpp 9KB
write_data_message.cpp 9KB
block_checker.cpp 9KB
visit_stat.cpp 8KB
config.cpp 8KB
sync_base.cpp 7KB
client.cpp 7KB
file_repair.cpp 7KB
message_factory.cpp 7KB
test_mmap_file.cpp 7KB
test_batch_read.cpp 7KB
test_physical_block.cpp 7KB
directory_op.cpp 7KB
heart_message.cpp 7KB
dataserver.cpp 6KB
test_mmap_file_op.cpp 6KB
test_data_handle.cpp 6KB
tfs_session.cpp 6KB
service.cpp 6KB
test_superblock_impl.cpp 6KB
file_op.cpp 6KB
cpu_metrics.cpp 6KB
fsname.cpp 5KB
tfs_client_api.cpp 5KB
showsyncoplog.cpp 5KB
ds_task.cpp 5KB
oplog.cpp 5KB
requester.cpp 5KB
tfs_packet_streamer.cpp 5KB
redundant.cpp 5KB
mmap_file.cpp 5KB
test_file_op.cpp 5KB
block_chunk.cpp 5KB
compact_block_message.cpp 5KB
data_file.cpp 4KB
dataserver_message.cpp 4KB
bit_map.cpp 4KB
physical_block.cpp 4KB
共 271 条
- 1
- 2
- 3
资源评论
- linzhixia232014-07-22有点帮助,不过好像有更新的版本了、
- dukefred2011-11-23tfs开源到1.4了,想要下的去开源首页吧
superficiak
- 粉丝: 0
- 资源: 6
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功