package com.alibaba.datax.plugin.reader.influxdbreader;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
public class InfluxDBReaderTask
{
private static final Logger LOG = LoggerFactory.getLogger(InfluxDBReaderTask.class);
private static final int CONNECT_TIMEOUT_SECONDS_DEFAULT = 15;
private static final int SOCKET_TIMEOUT_SECONDS_DEFAULT = 20;
private String querySql;
private final String database;
private final String endpoint;
private final String username;
private final String password;
private final int connTimeout;
private final int socketTimeout;
public InfluxDBReaderTask(Configuration configuration)
{
List<Object> connList = configuration.getList(InfluxDBKey.CONNECTION);
Configuration conn = Configuration.from(connList.get(0).toString());
this.querySql = configuration.getString(InfluxDBKey.QUERY_SQL, null);
this.database = conn.getString(InfluxDBKey.DATABASE);
this.endpoint = conn.getString(InfluxDBKey.ENDPOINT);
this.username = configuration.getString(InfluxDBKey.USERNAME);
this.password = configuration.getString(InfluxDBKey.PASSWORD, null);
this.connTimeout = configuration.getInt(InfluxDBKey.CONNECT_TIMEOUT_SECONDS, CONNECT_TIMEOUT_SECONDS_DEFAULT) * 1000;
this.socketTimeout = configuration.getInt(InfluxDBKey.SOCKET_TIMEOUT_SECONDS, SOCKET_TIMEOUT_SECONDS_DEFAULT) * 1000;
}
public void post()
{
//
}
public void destroy()
{
//
}
public void startRead(RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("connect influxdb: {} with username: {}", endpoint, username);
String tail = "/query";
String enc = "utf-8";
String result;
try {
String url = endpoint + tail + "?db=" + URLEncoder.encode(database, enc);
if (!"".equals(username)) {
url += "&u=" + URLEncoder.encode(username, enc);
}
if (!"".equals(password)) {
url += "&p=" + URLEncoder.encode(password, enc);
}
if (querySql.contains("#lastMinute#")) {
this.querySql = querySql.replace("#lastMinute#", getLastMinute());
}
url += "&q=" + URLEncoder.encode(querySql, enc);
result = get(url);
}
catch (Exception e) {
throw DataXException.asDataXException(
InfluxDBReaderErrorCode.ILLEGAL_VALUE, "Failed to get data point!", e);
}
if (StringUtils.isBlank(result)) {
throw DataXException.asDataXException(
InfluxDBReaderErrorCode.ILLEGAL_VALUE, "Get nothing!", null);
}
try {
JSONObject jsonObject = JSONObject.parseObject(result);
JSONArray results = (JSONArray) jsonObject.get("results");
JSONObject resultsMap = (JSONObject) results.get(0);
if (resultsMap.containsKey("series")) {
JSONArray series = (JSONArray) resultsMap.get("series");
JSONObject seriesMap = (JSONObject) series.get(0);
if (seriesMap.containsKey("values")) {
JSONArray values = (JSONArray) seriesMap.get("values");
for (Object row : values) {
JSONArray rowArray = (JSONArray) row;
Record record = recordSender.createRecord();
for (Object s : rowArray) {
if (null != s) {
record.addColumn(new StringColumn(s.toString()));
}
else {
record.addColumn(new StringColumn());
}
}
recordSender.sendToWriter(record);
}
}
}
else if (resultsMap.containsKey("error")) {
throw DataXException.asDataXException(
InfluxDBReaderErrorCode.ILLEGAL_VALUE, "Error occurred in data sets!", null);
}
}
catch (Exception e) {
throw DataXException.asDataXException(
InfluxDBReaderErrorCode.ILLEGAL_VALUE, "Failed to send data", e);
}
}
public String get(String url)
throws Exception
{
Content content = Request.Get(url)
.connectTimeout(this.connTimeout)
.socketTimeout(this.socketTimeout)
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(StandardCharsets.UTF_8);
}
private String post(String url, Map<String, Object> params)
throws Exception
{
return post(url, JSON.toJSONString(params), this.connTimeout, this.socketTimeout);
}
private String post(String url, String params)
throws Exception
{
return post(url, params, this.connTimeout, this.socketTimeout);
}
private String post(String url, Map<String, Object> params,
int connectTimeoutInMill, int socketTimeoutInMill)
throws Exception
{
return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
}
private String post(String url, String params,
int connectTimeoutInMill, int socketTimeoutInMill)
throws Exception
{
Content content = Request.Post(url)
.connectTimeout(connectTimeoutInMill)
.socketTimeout(socketTimeoutInMill)
.addHeader("Content-Type", "application/json")
.bodyString(params, ContentType.APPLICATION_JSON)
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(StandardCharsets.UTF_8);
}
@SuppressWarnings("JavaTimeDefaultTimeZone")
private String getLastMinute()
{
long lastMinuteMilli = LocalDateTime.now().plusMinutes(-1).toInstant(ZoneOffset.of("+8")).toEpochMilli();
return String.valueOf(lastMinuteMilli);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
influxdbreader.zip (10个子文件)
influxdbreader
.classpath 2KB
pom.xml 3KB
src
main
assembly
package.xml 2KB
resources
plugin.json 191B
plugin_job_template.json 503B
java
com
alibaba
datax
plugin
reader
influxdbreader
InfluxDBKey.java 630B
InfluxDBReader.java 3KB
InfluxDBReaderTask.java 7KB
InfluxDBReaderErrorCode.java 872B
.project 566B
共 10 条
- 1
资源评论
qq_30024063
- 粉丝: 18
- 资源: 6
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功