官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#using-a-
tablesource给的例子如果你照抄的话,是不能运行的。需要先按照下述方法进行改造
使用flink自定义事件流
/**
* procTime
处
理
时
间
流
*/
class
ProcTimeUserActionAttribute
implements
StreamTableSource<Row>, DefinedProctimeAttribute
{
private
String[]
names
=
new
String[]{
"name"
,
"data"
,
"UserActionTime"
};
private
TypeInformation[]
types
=
new
TypeInformation[]{Types.
STRING
, Types.
INT
,
Types.
SQL_TIMESTAMP
};
@Nullable
@Override
public
String getProctimeAttribute() {
return "UserActionTime"
;
}
@Override
public
DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
Properties properties =
new
Properties();
properties.setProperty(ConsumerConfig.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
properties.setProperty(ConsumerConfig.
GROUP_ID_CONFIG
,
"flink-test-group"
);
FlinkKafkaConsumer<String> flinkKafkaConsumer =
new
FlinkKafkaConsumer<>(
"test-
flink"
,
new
SimpleStringSchema(), properties);
DataStreamSource<String> flinkDataStream = execEnv.addSource(flinkKafkaConsumer);
SingleOutputStreamOperator<Row> outputStream = flinkDataStream.map(
new
MapFunction<String, Row>() {
@Override
public
Row map(String value)
throws
Exception {
String[] splitArray = value.split(SymbolConstants.
SYMBOL_DH
);
String name = splitArray[0];
Integer data = Integer.
parseInt
(splitArray[1]);
Long timeMills = System.
currentTimeMillis
();
//这里需要将处理时间定义,虽然是系统的处理时间可以自动生成,但是在flink1.8中,
这个时间还是需要显著给出
Timestamp timestamp =
new
Timestamp(timeMills);
return
Row.
of
(name, data, timestamp);
}
}).returns(Types.
ROW_NAMED
(
names
,
types
));
评论0
最新资源