package flink.demo.jld.kafka;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
/**
* JSONKeyValueDeserializationSchema(true)
*
*/
public class KafkaConsumerWindow {
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties proterties = new Properties();
proterties.setProperty("bootstrap.servers", "10.68.xx.xx:9092");
proterties.setProperty("group.id", "test");
proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// proterties.setProperty("auto.offset.reset", "latest");
FlinkKafkaConsumer<ObjectNode> consumerYjzt = new FlinkKafkaConsumer<>("gwctest", new JSONKeyValueDeserializationSchema(false), proterties);
DataStreamSource<ObjectNode> streamSource = env.addSource(consumerYjzt);
System.out.println(streamSource.getTransformation().getClass());
// streamSource.map(mapper)
SingleOutputStreamOperator<Row> rowSource=streamSource.map(new MapFunction<ObjectNode, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row map(ObjectNode value) throws Exception {
Row row =null;
for(Iterator<JsonNode> elements = value.elements();elements.hasNext();){
JsonNode next = elements.next();
System.out.println(next);
row = new Row(next.size());
int i=0;
for(Iterator<Map.Entry<String, JsonNode>> next0 = next.fields();next0.hasNext();){
Map.Entry<String, JsonNode> entry = next0.next();
// System.out.println(entry.getKey());
// ValueNode valueNode=(ValueNode)entry.getValue().is;
JsonNode nodeValue=entry.getValue();
if(nodeValue.isDouble()) {
row.setField(i, nodeValue.asDouble());
}else if(nodeValue.isInt()) {
row.setField(i, nodeValue.asInt());
}else if(nodeValue.isNull()){
row.setField(i, null);
}else if(nodeValue.isBoolean()){
row.setField(i, nodeValue.asBoolean());
}else {
row.setField(i, nodeValue.asText());
}
i++;
}
}
return row;
}
});
// }).returns(getRowTypeInfo());
// System.out.println(rowSource.getType().getClass());
DataStream<List<Row>> dataBaseStream = rowSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.process(new ProcessAllWindowFunction<Row, List<Row>, TimeWindow>() {
@Override
public void process(Context context, Iterable<Row> iterable, Collector<List<Row>> collector) throws Exception {
System.out.println("进入时间窗口:"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
List<Row> arrayList = new ArrayList<Row>();
iterable.forEach(single -> {
arrayList.add(single);
});
if (arrayList.size() > 0) {
collector.collect(arrayList);
}
}
});
System.out.println(dataBaseStream.getType().getClass());
dataBaseStream.print("userStream");
dataBaseStream.addSink(new DbSinkFunction<List<Row>>());
env.execute("kafkaTest");
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
Flink Kafka数据批量写入到数据库.zip (2个子文件)
Flink Kafka数据批量写入到数据库
DbSinkFunction.java 4KB
KafkaConsumerWindow.java 5KB
共 2 条
- 1
资源评论
shandongwill
- 粉丝: 3370
- 资源: 389
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功