package org.example.service;
import com.alibaba.fastjson.JSON;
import com.cloudera.api.model.ApiTimeSeries;
import com.cloudera.api.model.ApiTimeSeriesData;
import com.cloudera.api.model.ApiTimeSeriesResponse;
import com.cloudera.api.model.ApiTimeSeriesResponseList;
import com.cloudera.api.v11.TimeSeriesResourceV11;
import com.cloudera.api.v42.RootResourceV42;
import org.example.bean.Data;
import org.example.bean.KafkaBrokerDiskInfo;
import org.example.bean.Metric;
import org.example.bean.Metrics;
import org.example.utils.GetCMCli;
import org.example.utils.JdbcUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ClouderaManagerServiceMetrics {
public static RootResourceV42 apiRoot = GetCMCli.getApiRoot();
private static List<Metric> formatApiTimeSeriesResponse(List<ApiTimeSeriesResponse> apiTimeSeriesResponseList) {
List<Metric> metrics = new ArrayList<>();
for (ApiTimeSeriesResponse apiTimeSeriesResponse : apiTimeSeriesResponseList) {
List<ApiTimeSeries> apiTimeSeriesList = apiTimeSeriesResponse.getTimeSeries();
for (ApiTimeSeries apiTimeSeries : apiTimeSeriesList) {
Metric metric = new Metric();
metric.setMetricName(apiTimeSeries.getMetadata().getMetricName());
metric.setEntityName(apiTimeSeries.getMetadata().getEntityName());
metric.setStartTime(apiTimeSeries.getMetadata().getStartTime().toString());
metric.setEndTime(apiTimeSeries.getMetadata().getEndTime().toString());
List<ApiTimeSeriesData> timeSeriesDataList = apiTimeSeries.getData();
//他默认按照时间排序的,我只取最新的,即最后一条
//ps:cdh最细的粒度是按照10分钟聚合的
ApiTimeSeriesData apiTimeSeriesData = timeSeriesDataList.get(timeSeriesDataList.size() - 1);
List<Data> dataList = new ArrayList<>();
Data data = new Data();
data.setTimestamp(apiTimeSeriesData.getTimestamp().toString());
data.setType(apiTimeSeriesData.getType());
data.setValue(apiTimeSeriesData.getValue());
dataList.add(data);
metric.setData(dataList);
metrics.add(metric);
}
}
return metrics;
}
private static List<Metric> getServiceMetrics(String query, String startTime, String endTime) throws ParseException {
TimeSeriesResourceV11 timeSeriesResourceV11 = apiRoot.getTimeSeriesResource();
ApiTimeSeriesResponseList response = timeSeriesResourceV11.queryTimeSeries(query, startTime, endTime);
List<ApiTimeSeriesResponse> apiTimeSeriesResponseList = response.getResponses();
List<Metric> metrics = formatApiTimeSeriesResponse(apiTimeSeriesResponseList);
System.out.println(JSON.toJSON(metrics).toString());
return metrics;
}
public static double getKafkaBrokerDiskUseRate() throws ParseException {
String diskQuery=String.format("select %s,%s",Metrics.KAFKA_LOG_DIRECTORY_DISK_FREE_SPACE,Metrics.KAFKA_LOG_DIRECTORY_DISK_TOTAL_SPACE);
//.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
List<Metric> serviceMetrics = getServiceMetrics(diskQuery, LocalDateTime.now().minusHours(2).toString(), LocalDateTime.now().toString());
Map<String, List<Metric>> listMap = serviceMetrics.stream().collect(Collectors.groupingBy(Metric::getEntityName));
Double diskUsed = 0.0;
Double diskFree = 0.0;
Double diskTotal = 0.0;
Double diskUseRate = 0.0;
List<KafkaBrokerDiskInfo> list = new ArrayList<>();
for(Map.Entry<String, List<Metric>> metrics : listMap.entrySet()) {
String entityName = metrics.getKey();
for(Metric metric : metrics.getValue()) {
Double tempDisk = metric.getData().get(0).getValue();
if (metric.getMetricName().contains("free")) {
diskFree += tempDisk;
} else {
diskTotal += tempDisk;
}
}
diskUsed = diskTotal - diskFree;
diskUseRate = (Math.round(diskUsed * 1000 / diskTotal) / 1000.0);
String[] split = entityName.split(":");
String hostName = getKafkaBrokerHostName(split[0]);
if (diskUseRate > 0.2) {
String level = "C2";
String info = "Kafka Broker磁盘使用率超过预设阈值";
StringBuilder sb = new StringBuilder();
sb.append("【告警源】: GTMC大数据分析平台(DMP) \n");
sb.append("【告警服务】: Kafka \n");
sb.append("【告警等级】: ").append(level).append("\n");
sb.append("【告警信息】: ").append(info).append("\n");
sb.append("【告警时间】: ").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\n");
// sb.append("【告警次数】:1 \n");
sb.append("【关联设备】: ").append(hostName).append('\n');
sb.append("【告警详情】: ").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("设备").append(hostName).append("上的Kafka Broker磁盘使用率为").append(String.format("%.2f", diskUseRate * 100)).append("%.");
AlertManager.sendMessage(sb.toString(), level);
System.out.println(sb);
}
KafkaBrokerDiskInfo diskInfo = new KafkaBrokerDiskInfo(split[0], hostName, diskFree, diskUsed, diskTotal, diskUseRate);
list.add(diskInfo);
diskFree = 0.0;
diskTotal = 0.0;
diskUseRate = 0.0;
}
putKafkaBrokerDiskInfo(list);
return diskUseRate;
}
private static String getKafkaBrokerHostName(String roleName) {
Connection conn = null;
PreparedStatement sta = null;
ResultSet rs = null;
String hostName = null;
try {
conn = JdbcUtils.getConnection();
String sql = "select host_name\n" +
"from kafka_broker_host_map\n" +
"where role_name = ?\n" +
"limit 1";
sta = conn.prepareStatement(sql);
sta.setString(1, roleName);
rs = sta.executeQuery();
while (rs.next()) {
hostName = rs.getString("host_name");
}
} catch (ClassNotFoundException | SQLException e) {
throw new RuntimeException(e);
} finally {
JdbcUtils.releaseConnection(conn, sta, rs);
}
return hostName;
}
public static void putKafkaBrokerDiskInfo(List<KafkaBrokerDiskInfo> list) {
Connection conn = null;
PreparedStatement sta = null;
ResultSet rs = null;
try {
conn = JdbcUtils.getConnection();
String sql = "insert into kafka_broker_disc_info(role_name, host_name, disk_free, disk_used, diskTotal, disk_use_rate, create_time) value(?,?,?,?,?,?,?) ";
sta = conn.prepareStatement(sql);
for (KafkaBrokerDiskInfo info : list) {
sta.setString(1, info.getRoleName());
sta.setString(2, info.getHostName());
sta.setDouble(3, info.getDiskFree());
sta.setDouble(4, info.getDiskUsed());
s