package org.example.service;
import com.alibaba.fastjson.JSON;
import com.cloudera.api.model.ApiTimeSeries;
import com.cloudera.api.model.ApiTimeSeriesData;
import com.cloudera.api.model.ApiTimeSeriesResponse;
import com.cloudera.api.model.ApiTimeSeriesResponseList;
import com.cloudera.api.v11.TimeSeriesResourceV11;
import com.cloudera.api.v42.RootResourceV42;
import org.example.bean.Data;
import org.example.bean.KafkaBrokerDiskInfo;
import org.example.bean.Metric;
import org.example.bean.Metrics;
import org.example.utils.GetCMCli;
import org.example.utils.JdbcUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ClouderaManagerServiceMetrics {
public static RootResourceV42 apiRoot = GetCMCli.getApiRoot();
private static List<Metric> formatApiTimeSeriesResponse(List<ApiTimeSeriesResponse> apiTimeSeriesResponseList) {
List<Metric> metrics = new ArrayList<>();
for (ApiTimeSeriesResponse apiTimeSeriesResponse : apiTimeSeriesResponseList) {
List<ApiTimeSeries> apiTimeSeriesList = apiTimeSeriesResponse.getTimeSeries();
for (ApiTimeSeries apiTimeSeries : apiTimeSeriesList) {
Metric metric = new Metric();
metric.setMetricName(apiTimeSeries.getMetadata().getMetricName());
metric.setEntityName(apiTimeSeries.getMetadata().getEntityName());
metric.setStartTime(apiTimeSeries.getMetadata().getStartTime().toString());
metric.setEndTime(apiTimeSeries.getMetadata().getEndTime().toString());
List<ApiTimeSeriesData> timeSeriesDataList = apiTimeSeries.getData();
//他默认按照时间排序的,我只取最新的,即最后一条
//ps:cdh最细的粒度是按照10分钟聚合的
ApiTimeSeriesData apiTimeSeriesData = timeSeriesDataList.get(timeSeriesDataList.size() - 1);
List<Data> dataList = new ArrayList<>();
Data data = new Data();
data.setTimestamp(apiTimeSeriesData.getTimestamp().toString());
data.setType(apiTimeSeriesData.getType());
data.setValue(apiTimeSeriesData.getValue());
dataList.add(data);
metric.setData(dataList);
metrics.add(metric);
}
}
return metrics;
}
private static List<Metric> getServiceMetrics(String query, String startTime, String endTime) throws ParseException {
TimeSeriesResourceV11 timeSeriesResourceV11 = apiRoot.getTimeSeriesResource();
ApiTimeSeriesResponseList response = timeSeriesResourceV11.queryTimeSeries(query, startTime, endTime);
List<ApiTimeSeriesResponse> apiTimeSeriesResponseList = response.getResponses();
List<Metric> metrics = formatApiTimeSeriesResponse(apiTimeSeriesResponseList);
System.out.println(JSON.toJSON(metrics).toString());
return metrics;
}
public static double getKafkaBrokerDiskUseRate() throws ParseException {
String diskQuery=String.format("select %s,%s",Metrics.KAFKA_LOG_DIRECTORY_DISK_FREE_SPACE,Metrics.KAFKA_LOG_DIRECTORY_DISK_TOTAL_SPACE);
//.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
List<Metric> serviceMetrics = getServiceMetrics(diskQuery, LocalDateTime.now().minusHours(2).toString(), LocalDateTime.now().toString());
Map<String, List<Metric>> listMap = serviceMetrics.stream().collect(Collectors.groupingBy(Metric::getEntityName));
Double diskUsed = 0.0;
Double diskFree = 0.0;
Double diskTotal = 0.0;
Double diskUseRate = 0.0;
List<KafkaBrokerDiskInfo> list = new ArrayList<>();
for(Map.Entry<String, List<Metric>> metrics : listMap.entrySet()) {
String entityName = metrics.getKey();
for(Metric metric : metrics.getValue()) {
Double tempDisk = metric.getData().get(0).getValue();
if (metric.getMetricName().contains("free")) {
diskFree += tempDisk;
} else {
diskTotal += tempDisk;
}
}
diskUsed = diskTotal - diskFree;
diskUseRate = (Math.round(diskUsed * 1000 / diskTotal) / 1000.0);
String[] split = entityName.split(":");
String hostName = getKafkaBrokerHostName(split[0]);
if (diskUseRate > 0.2) {
String level = "C2";
String info = "Kafka Broker磁盘使用率超过预设阈值";
StringBuilder sb = new StringBuilder();
sb.append("【告警源】: GTMC大数据分析平台(DMP) \n");
sb.append("【告警服务】: Kafka \n");
sb.append("【告警等级】: ").append(level).append("\n");
sb.append("【告警信息】: ").append(info).append("\n");
sb.append("【告警时间】: ").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\n");
// sb.append("【告警次数】:1 \n");
sb.append("【关联设备】: ").append(hostName).append('\n');
sb.append("【告警详情】: ").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("设备").append(hostName).append("上的Kafka Broker磁盘使用率为").append(String.format("%.2f", diskUseRate * 100)).append("%.");
AlertManager.sendMessage(sb.toString(), level);
System.out.println(sb);
}
KafkaBrokerDiskInfo diskInfo = new KafkaBrokerDiskInfo(split[0], hostName, diskFree, diskUsed, diskTotal, diskUseRate);
list.add(diskInfo);
diskFree = 0.0;
diskTotal = 0.0;
diskUseRate = 0.0;
}
putKafkaBrokerDiskInfo(list);
return diskUseRate;
}
private static String getKafkaBrokerHostName(String roleName) {
Connection conn = null;
PreparedStatement sta = null;
ResultSet rs = null;
String hostName = null;
try {
conn = JdbcUtils.getConnection();
String sql = "select host_name\n" +
"from kafka_broker_host_map\n" +
"where role_name = ?\n" +
"limit 1";
sta = conn.prepareStatement(sql);
sta.setString(1, roleName);
rs = sta.executeQuery();
while (rs.next()) {
hostName = rs.getString("host_name");
}
} catch (ClassNotFoundException | SQLException e) {
throw new RuntimeException(e);
} finally {
JdbcUtils.releaseConnection(conn, sta, rs);
}
return hostName;
}
public static void putKafkaBrokerDiskInfo(List<KafkaBrokerDiskInfo> list) {
Connection conn = null;
PreparedStatement sta = null;
ResultSet rs = null;
try {
conn = JdbcUtils.getConnection();
String sql = "insert into kafka_broker_disc_info(role_name, host_name, disk_free, disk_used, diskTotal, disk_use_rate, create_time) value(?,?,?,?,?,?,?) ";
sta = conn.prepareStatement(sql);
for (KafkaBrokerDiskInfo info : list) {
sta.setString(1, info.getRoleName());
sta.setString(2, info.getHostName());
sta.setDouble(3, info.getDiskFree());
sta.setDouble(4, info.getDiskUsed());
s
java解析excel、cloudera manger api用法
需积分: 0 145 浏览量
更新于2023-03-29
收藏 101KB ZIP 举报
在IT领域,Java是一种广泛应用的编程语言,而Excel解析是数据处理中常见的一环。Cloudera Manager API则是大数据管理中的重要工具,特别是在Hadoop生态系统中。以下将详细阐述这两个主题。
让我们来探讨Java解析Excel。在Java中,有多种库可以用来读取和操作Excel文件,其中最常用的是Apache POI。Apache POI是一个开源项目,它提供了对Microsoft Office格式(包括Excel)的读写支持。使用Apache POI,开发者可以创建新的Excel工作簿,修改现有工作表,添加公式,以及处理各种单元格类型(如字符串、数字、日期等)。以下是一个简单的示例,展示了如何使用Apache POI读取Excel文件:
```java
import org.apache.poi.ss.usermodel.*;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class ExcelReader {
public static void main(String[] args) throws IOException {
FileInputStream fis = new FileInputStream(new File("path_to_your_excel_file.xlsx"));
Workbook workbook = new XSSFWorkbook(fis);
Sheet sheet = workbook.getSheetAt(0); // 获取第一个工作表
for (Row row : sheet) {
for (Cell cell : row) {
switch (cell.getCellType()) {
case STRING:
System.out.print(cell.getStringCellValue() + "\t");
break;
case NUMERIC:
System.out.print(cell.getNumericCellValue() + "\t");
break;
// 其他类型的处理...
}
}
System.out.println();
}
workbook.close();
fis.close();
}
}
```
接下来,我们转向Cloudera Manager API。Cloudera Manager是一个全面的数据平台管理工具,它提供了RESTful API,使得管理员和开发人员可以通过编程方式与集群进行交互。这些API可以用于监控集群状态、配置服务、管理资源分配、部署和升级软件等。使用Cloudera Manager API时,通常需要通过HTTP客户端发送请求,并处理返回的JSON或XML响应。以下是一个使用Java和Apache HttpClient调用Cloudera Manager API的简单示例:
```java
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
public class ClouderaManagerAPIExample {
public static void main(String[] args) throws Exception {
CloseableHttpClient httpclient = HttpClients.createDefault();
try {
HttpGet httpGet = new HttpGet("http://your_cloudera_manager_host:7180/api/v6/clusters");
httpGet.setHeader("Authorization", "Basic " + Base64.encodeBase64String(("admin:admin_password").getBytes()));
CloseableHttpResponse response = httpclient.execute(httpGet);
try {
System.out.println(response.getStatusLine());
HttpEntity entity = response.getEntity();
if (entity != null) {
String result = EntityUtils.toString(entity);
System.out.println(result);
}
EntityUtils.consume(entity);
} finally {
response.close();
}
} finally {
httpclient.close();
}
}
}
```
在这个例子中,我们向Cloudera Manager发送了一个GET请求,获取集群列表,同时进行了基本的HTTP认证。
总结来说,Java解析Excel是数据处理场景中的常见需求,通过Apache POI库可以轻松实现。而Cloudera Manager API则为Hadoop集群的管理和自动化提供了强大工具,允许通过编程方式执行各种集群操作。在实际应用中,这些技术结合使用,可以构建出高效的数据处理和管理系统。