package com.xyf.bigdata.connector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import com.xyf.bigdata.connector.source.RabbitmqDynamicTableSource;
import org.apache.flink.formats.json.JsonFormatFactory;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import java.util.HashSet;
import java.util.Set;
public class RabbitmqTableSourceFactory implements DynamicTableSourceFactory {
private static final String FACTORY_IDENTIFIER = "rmq";
public static final ConfigOption<String> QUEUE = ConfigOptions.key("queue")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> EXCHANGE_NAME = ConfigOptions.key("exchange-name")
.stringType()
.noDefaultValue();
public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
.intType()
.noDefaultValue();
public static final ConfigOption<Integer> QOS = ConfigOptions.key("qos")
.intType()
.defaultValue(100);
public static final ConfigOption<String> HOSTS = ConfigOptions.key("hosts")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> VIRTUAL_HOST = ConfigOptions.key("virtual-host")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> FORMAT = ConfigOptions.key("format")
.stringType()
.noDefaultValue();
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
JsonFormatFactory.class,
FactoryUtil.FORMAT);
helper.validate();
final ReadableConfig options = helper.getOptions();
final int port = options.get(PORT);
final String hosts = options.get(HOSTS);
final String virtualHost = options.get(VIRTUAL_HOST);
final String useName = options.get(USERNAME);
final String password = options.get(PASSWORD);
final String exchangeName = options.get(EXCHANGE_NAME);
final String queue = options.get(QUEUE);
final int qos = options.get(QOS);
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
return new RabbitmqDynamicTableSource(hosts, port, virtualHost, useName, password, queue, exchangeName, qos,decodingFormat,producedDataType);
}
@Override
public String factoryIdentifier() {
return FACTORY_IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTS);
options.add(PORT);
options.add(QUEUE);
options.add(VIRTUAL_HOST);
options.add(USERNAME);
options.add(PASSWORD);
options.add(EXCHANGE_NAME);
options.add(FORMAT);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(QOS);
return options;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
flink-sql-rabbitmq.zip (17个子文件)
pom.xml 5KB
dependency-reduced-pom.xml 5KB
src
main
resources
flink-sql-connector-pulsar-1.15.1.4.jar 43.26MB
META-INF
services
org.apache.flink.table.factories.Factory 52B
java
com
xyf
bigdata
connector
source
RabbitmqDynamicTableSource.java 3KB
RabbitmqTableSourceFactory.java 4KB
functions
RabbitmqSourceFunction.java 3KB
.idea
jarRepositories.xml 1KB
codeStyles
codeStyleConfig.xml 153B
Project.xml 269B
uiDesigner.xml 9KB
libraries
flink_sql_connector_pulsar_1_15_1_4.xml 283B
workspace.xml 4KB
misc.xml 541B
compiler.xml 551B
.gitignore 50B
encodings.xml 512B
共 17 条
- 1
资源评论
一个小小bug
- 粉丝: 26
- 资源: 5
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功