package com.dm.data.util;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.FieldSortBuilder;
import com.dm.core.system.SystemParameter;
import com.dm.core.util.CollectionUtils;
import com.dm.core.util.StringUtil;
import com.dm.meta.domain.Column;
/**
* ES工具类
* @author ZRH
*
*/
public class ESUtils {
static Pattern badChars = Pattern.compile("\\s*[\\s~!\\^&\\(\\)\\-\\+:\\|\\\\\"\\\\$]+\\s*");
private static Client clients = null;
private ESUtils() {
}
/**
* 关闭对应client
* @param client
*/
public static void close(Client client) {
if (client != null) {
try {
client.close();
} catch (Exception e) {
}
client = null;
}
}
public static void flush(Client client, String indexName, String indexType) {
try{
client.admin().indices().flush(new FlushRequest(indexName.toLowerCase(), indexType)).actionGet();
}catch(Exception e){};
}
/**
* 根据默认系统默认配置初始化库,如果已经有连接则使用该连接
* @return
*/
public static Client getClient() {
if(clients!=null) {
return clients;
}
clients = newClient();
return clients;
}
/**
* 初始化并连接elasticsearch集群,返回连接后的client
* @return 返回连接的集群的client
*/
public static Client newClient() {
String clusterName = SystemParameter.get("es.clusterName", "elasticsearch");
String _clientTransportSniff = SystemParameter.get("es.clientTransportSniff", "true");
String _port = SystemParameter.get("es.port", "9300");
String hostname = SystemParameter.get("es.hostname", "127.0.0.1");
String hostnames[] = hostname.split(",");
boolean clientTransportSniff = false;
try{
if( !"false".equals(_clientTransportSniff.toLowerCase().trim())) {
clientTransportSniff = true;
}
}catch(Exception e){};
int port = 9300;
try{
port = Integer.parseInt(_port);
}catch(Exception e){};
return newClient(clusterName, clientTransportSniff, port, hostnames);
}
/**
* 初始化并连接elasticsearch集群,返回连接后的client
* @param clusterName 中心节点名称
* @param clientTransportSniff 是否自动发现新加入的节点
* @param port 节点端口
* @param hostname 集群节点所在服务器IP,支持多个
* @return 返回连接的集群的client
*/
public static Client newClient(String clusterName, boolean clientTransportSniff, int port, String... hostname) {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", clientTransportSniff)
.build();
TransportClient transportClient = new TransportClient(settings);
if(hostname!=null){
for(String host: hostname) {
transportClient.addTransportAddress(new InetSocketTransportAddress(host, port));
}
}
return transportClient;
}
public static boolean indicesExists(Client client, String indexName){
IndicesExistsRequest ier = new IndicesExistsRequest();
ier.indices(new String[]{indexName.toLowerCase()});
return client.admin().indices().exists(ier).actionGet().isExists();
}
public static boolean typesExists(Client client, String indexName, String indexType){
if(indicesExists(client, indexName)) {
TypesExistsRequest ter = new TypesExistsRequest(new String[]{indexName.toLowerCase()}, indexType);
return client.admin().indices().typesExists(ter).actionGet().isExists();
}
return false;
}
/**
* 根据索引数据id删除索引
* @param indexName 索引名称
* @param indexType 索引类型
* @param id 对应数据ID
*/
public static void deleteIndex(Client client, String indexName, String indexType, String id){
try {
client.prepareDelete(indexName.toLowerCase(), indexType.toLowerCase(), id).execute().actionGet();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 根据索引名称删除索引
* @param indexName 索引名称
*/
public static void deleteIndex(String indexName){
try {
IndicesExistsRequest ier = new IndicesExistsRequest();
ier.indices(new String[]{indexName.toLowerCase()});
boolean exists = getClient().admin().indices().exists(ier).actionGet().isExists();
if(exists){
getClient().admin().indices().prepareDelete(indexName.toLowerCase()).execute().actionGet();
}
}
catch(IndexMissingException ime){}
}
public static SearchHits search(String indexName, List<String> indexTypes, QueryBuilder query, List<FieldSortBuilder> sortBuilders, int from, int size) throws NoNodeAvailableException, IndexMissingException {
if(getClient() == null ) {
return null;
}
indexName = indexName.toLowerCase();
// 去掉不存在的索引
IndicesExistsRequest ier = new IndicesExistsRequest();
ier.indices(new String[]{indexName});
boolean exists = getClient().admin().indices().exists(ier).actionGet().isExists();
if(exists){
getClient().admin().indices().open(new OpenIndexRequest(indexName)).actionGet();
}else{
Index index = new Index(indexName);
//throw new IndexMissingException(index);
return null;
}
try {
getClient().admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
} catch (IndexMissingException e) {
e.printStackTrace();
}
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(indexName);
if(indexTypes != null && indexTypes.size() > 0) {
String[] types = new String[indexTypes.size()];
for(int i=0; i<indexTypes.size(); i++) {
types[i] = indexTypes.get(i).toLowerCase();
}
searchRequestBuilder.setTypes(types);
}
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setFrom(from);
searchRequestBuilder.setSize(size);
searchRequestBuilder.setExplain(false);
searchRequestBuilder.setQuery(query);
if(sortBuilders!=null && sortBuilders.size()>0){
for(FieldSortBuilder sortBuilder: sortBuilders){
searchRequestBuilder.addSort(sortBuilder);
}
}
return searchRequestBuilde