/*
* Copyright 2017 StreamSets Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.streamsets.pipeline.stage.processor.fieldtypeconverter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.streamsets.pipeline.api.Field;
import com.streamsets.pipeline.api.FileRef;
import com.streamsets.pipeline.api.OnRecordError;
import com.streamsets.pipeline.api.ProtoConfigurableEntity;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.base.OnRecordErrorException;
import com.streamsets.pipeline.config.DateFormat;
import com.streamsets.pipeline.config.DecimalScaleRoundingStrategy;
import com.streamsets.pipeline.config.ZonedDateTimeFormat;
import com.streamsets.pipeline.sdk.ProcessorRunner;
import com.streamsets.pipeline.sdk.RecordCreator;
import com.streamsets.pipeline.sdk.StageRunner;
import com.streamsets.pipeline.stage.common.HeaderAttributeConstants;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import static com.streamsets.pipeline.stage.processor.fieldtypeconverter.Errors.CONVERTER_04;
import static com.streamsets.testing.Matchers.mapFieldWithEntry;
import static org.junit.Assert.assertThat;
public class TestFieldTypeConverterProcessorFields {
@Test
public void testStringToNonExistentField() throws StageException {
FieldTypeConverterConfig fieldTypeConverterConfig =
new FieldTypeConverterConfig();
fieldTypeConverterConfig.fields = ImmutableList.of("/nonExistent", "/beginner", "/expert", "/skilled");
fieldTypeConverterConfig.targetType = Field.Type.BOOLEAN;
fieldTypeConverterConfig.dataLocale = "en";
ProcessorRunner runner = new ProcessorRunner.Builder(FieldTypeConverterDProcessor.class)
.addConfiguration("convertBy", ConvertBy.BY_FIELD)
.addConfiguration("fieldTypeConverterConfigs", ImmutableList.of(fieldTypeConverterConfig))
.addOutputLane("a").build();
runner.runInit();
try {
Map<String, Field> map = new LinkedHashMap<>();
map.put("beginner", Field.create("false"));
map.put("intermediate", Field.create("yes"));
map.put("advanced", Field.create("no"));
map.put("expert", Field.create("true"));
map.put("skilled", Field.create("122345566"));
map.put("null", Field.create(Field.Type.STRING, null));
Record record = RecordCreator.create("s", "s:1");
record.set(Field.create(map));
StageRunner.Output output = runner.runProcess(ImmutableList.of(record));
Assert.assertEquals(1, output.getRecords().get("a").size());
Field field = output.getRecords().get("a").get(0).get();
Assert.assertTrue(field.getValue() instanceof Map);
Map<String, Field> result = field.getValueAsMap();
Assert.assertTrue(result.size() == 6);
Assert.assertTrue(result.containsKey("beginner"));
Assert.assertEquals(false, result.get("beginner").getValue());
Assert.assertTrue(result.containsKey("intermediate"));
Assert.assertEquals("yes", result.get("intermediate").getValue());
Assert.assertTrue(result.containsKey("advanced"));
Assert.assertEquals("no", result.get("advanced").getValue());
Assert.assertTrue(result.containsKey("expert"));
Assert.assertEquals(true, result.get("expert").getValue());
Assert.assertTrue(result.containsKey("skilled"));
Assert.assertEquals(false, result.get("skilled").getValue());
Assert.assertTrue(result.containsKey("null"));
Assert.assertEquals(null, result.get("null").getValue());
} finally {
runner.runDestroy();
}
}
@Test
public void testStringToBoolean() throws StageException {
FieldTypeConverterConfig fieldTypeConverterConfig =
new FieldTypeConverterConfig();
fieldTypeConverterConfig.fields = ImmutableList.of("/beginner", "/intermediate", "/skilled", "/advanced", "/expert"
, "/null");
fieldTypeConverterConfig.targetType = Field.Type.BOOLEAN;
fieldTypeConverterConfig.dataLocale = "en";
ProcessorRunner runner = new ProcessorRunner.Builder(FieldTypeConverterDProcessor.class)
.addConfiguration("convertBy", ConvertBy.BY_FIELD)
.addConfiguration("fieldTypeConverterConfigs", ImmutableList.of(fieldTypeConverterConfig))
.addOutputLane("a").build();
runner.runInit();
try {
Map<String, Field> map = new LinkedHashMap<>();
map.put("beginner", Field.create("false"));
map.put("intermediate", Field.create("yes"));
map.put("advanced", Field.create("no"));
map.put("expert", Field.create("true"));
map.put("skilled", Field.create("122345566"));
map.put("null", Field.create(Field.Type.STRING, null));
Record record = RecordCreator.create("s", "s:1");
record.set(Field.create(map));
StageRunner.Output output = runner.runProcess(ImmutableList.of(record));
Assert.assertEquals(1, output.getRecords().get("a").size());
Field field = output.getRecords().get("a").get(0).get();
Assert.assertTrue(field.getValue() instanceof Map);
Map<String, Field> result = field.getValueAsMap();
Assert.assertTrue(result.size() == 6);
Assert.assertTrue(result.containsKey("beginner"));
Assert.assertEquals(false, result.get("beginner").getValue());
Assert.assertTrue(result.containsKey("intermediate"));
Assert.assertEquals(false, result.get("intermediate").getValue());
Assert.assertTrue(result.containsKey("advanced"));
Assert.assertEquals(false, result.get("advanced").getValue());
Assert.assertTrue(result.containsKey("expert"));
Assert.assertEquals(true, result.get("expert").getValue());
Assert.assertTrue(result.containsKey("skilled"));
Assert.assertEquals(false, result.get("skilled").getValue());
Assert.assertTrue(result.containsKey("null"));
Assert.assertEquals(null, result.get("null").getValue());
} finally {
runner.runDestroy();
}
}
@Test
public void testBooleanToInt() throws StageException {
FieldTypeConverterConfig fieldTypeConverterConfig = new FieldTypeConverterConfig();
fieldTypeConverterConfig.fields = ImmutableList.of("/t", "/f");
fieldTypeConverterConfig.targetType = Field.Type.INTEGER;
ProcessorRunner runner = new ProcessorRunner.Builder(FieldTypeConverterDProcessor.class)
.addConfiguration("convertBy", ConvertBy.BY_FIELD)
.addConfiguration("fieldTypeConverterConfigs", ImmutableList.of(fieldTypeConverterConfig))
.addOutputLane("a").build();
runner.runInit();
try {
Map<String, Field> map = new LinkedHashMap<>();
map.put("t", Field.create(true));
map.put("f", Field.create(false));
Record record = RecordCreator.create
没有合适的资源?快使用搜索试试~ 我知道了~
datacollector:StreamSets Data Collector-连续的大数据和云平台获取基础架构
共2000个文件
java:5396个
js:857个
png:729个
需积分: 50 7 下载量 59 浏览量
2021-03-17
17:24:46
上传
评论
收藏 84.05MB ZIP 举报
温馨提示
什么是StreamSets数据收集器? StreamSets Data Collector是企业级的,开源的,连续的大数据摄取基础架构。 它具有先进且易于使用的用户界面,使数据科学家,开发人员和数据基础架构团队可以轻松地在创建复杂摄取场景所需的短时间内创建数据管道。 开箱即用的StreamSets Data Collector可以读写大量端点,包括S3,JDBC,Hadoop,Kafka,Cassandra等。 除了大量的预建阶段之外,您还可以使用Python,Javascript和Java Expression Language来即时转换和处理数据。 为了实现容错和横向扩展,可以在集群模式下设置数据管道,并在管道的每个阶段执行细粒度的监视。 要了解更多信息,请访问 构建StreamSets数据收集器 要从源代码构建StreamSets数据收集器,。 执照 StreamSets Dat
资源详情
资源评论
资源推荐
收起资源包目录
datacollector:StreamSets Data Collector-连续的大数据和云平台获取基础架构 (2000个子文件)
bootstrap.min.css 115KB
fonts.css 34KB
webhelp.css 27KB
skin.css 24KB
bootstrap-theme.min.css 19KB
jquery-ui.min.css 17KB
p-side-notes.css 11KB
commonltr.css 10KB
commonltr.css 7KB
light.css 7KB
commonrtl.css 6KB
elements-styles.css 4KB
tooltip.css 2KB
nav-links.css 2KB
indexterms.css 2KB
expand.css 1022B
topic-page.css 585B
topic.css 424B
print.css 412B
main-page.css 370B
indexterms-page.css 340B
search-page.css 252B
WhatsNew_Title.html 621KB
indexTerms.html 532KB
HTTPClient.html 261KB
Functions.html 255KB
MultiTableJDBCConsumer.html 228KB
HDFSStandalone.html 224KB
HTTPClient.html 214KB
ADLS-G2.html 214KB
ADLS-G1.html 210KB
ConfiguringAPipeline.html 209KB
AmazonS3.html 207KB
MapRFSStandalone.html 204KB
SFTP.html 201KB
Directory.html 197KB
SQLServerBDCMultitable.html 194KB
RESTService.html 192KB
Teradata.html 191KB
Snowflake.html 186KB
TCPServer.html 180KB
DataFormat_Title.html 180KB
AddtionalStageLibs.html 179KB
CredentialStores.html 176KB
OracleCDC.html 175KB
WebSocketServer.html 173KB
KinConsumer.html 173KB
HadoopFS-destination.html 172KB
WebSocketClient.html 172KB
EventFramework-Title.html 172KB
JDBCConsumer.html 169KB
HTTPClient.html 168KB
MQTTSubscriber.html 165KB
index.html 164KB
PulsarConsumer.html 163KB
ADLS-G2-D.html 161KB
AzureSynapse.html 160KB
MapRFS.html 159KB
AmazonSQS.html 157KB
KafkaMultiConsumer.html 157KB
DCConfig.html 157KB
ADLS-G1-D.html 156KB
RabbitMQ.html 156KB
DeltaLake.html 155KB
KConsumer.html 152KB
HTTPServer.html 149KB
DataLakeStore.html 148KB
GCS.html 147KB
Salesforce.html 147KB
JMS.html 144KB
CoAPServer.html 143KB
SQLServerCDC.html 142KB
PubSub.html 142KB
AmazonS3.html 142KB
LocalFS.html 141KB
Administration_title.html 141KB
MapRStreamsMultiConsumer.html 136KB
JDBCLookup.html 127KB
Redis.html 125KB
MapRStreamsCons.html 124KB
DataParser.html 123KB
SQLServerChange.html 123KB
FileTail.html 120KB
RabbitMQ.html 120KB
GCS.html 118KB
Couchbase.html 118KB
KProducer.html 118KB
PostUpgrade.html 117KB
SQLServerBDCBulk.html 117KB
SAPHana.html 116KB
HadoopFS-origin.html 115KB
Syslog.html 112KB
SupportedSystemVersions.html 109KB
CloudInstall.html 107KB
HiveMetadata.html 107KB
PublishMetadata.html 105KB
MapRFS.html 105KB
DeltaLake.html 105KB
JMSProducer.html 104KB
PubSubPublisher.html 104KB
共 2000 条
- 1
- 2
- 3
- 4
- 5
- 6
- 20
逸格草草
- 粉丝: 34
- 资源: 4592
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0