package com.bitz.coin.benifit.handle;
import com.alibaba.fastjson.JSONObject;
import com.bitz.coin.benifit.utils.JsonUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
/**
* 数据的处理
*/
public class Handle {
private static DataStream<String> dataStream;
private Handle(DataStream<String> dataStream1) {
dataStream = dataStream1;
}
private static Handle handle;
public static Handle getInstance(DataStream<String> dataStream) {
if (handle == null) {
handle = new Handle(dataStream);
}
return handle;
}
public DataStream<Num> getmap() {
return dataStream
.flatMap(new FlatMapFunction<String, Num>() {
@Override
public void flatMap(String in, Collector<Num> out) throws Exception {
if (in.contains("order_coin")) {
ArrayList<Num> arrayList = parseJson(in);
for (Num numline : arrayList) {
out.collect(numline);
}
}
}
})
//此处key为Num中的字段key
.keyBy("key")
.reduce(new ReduceFunction<Num>() {
@Override
//value1上一次累加的num, value2这一次的num
public Num reduce(Num value1, Num value2) throws Exception {
return new Num(value1.getKey(), value1.getNum().add(value2.getNum()));
}
});
}
/**
* 此处为解析程序
* */
private static ArrayList parseJson(String in) {
ArrayList<Num> arrayList = new ArrayList();
JSONObject jsonObject1 = JsonUtil.getData(in);
//买方
Object buy_uid1 = jsonObject1.get("buy_uid");
Object coin_from1 = jsonObject1.get("coin_from");
Object number3 = jsonObject1.get("number");
Object sale_uid1 = jsonObject1.get("sale_uid");
Object coin_to1 = jsonObject1.get("coin_to");
BigDecimal number1 = new BigDecimal(0);
if (null != buy_uid1 && null != coin_from1 && null != number3) {
String buy_uid = buy_uid1.toString();
String coin_from = coin_from1.toString();
number1 = new BigDecimal(number3.toString());
String buy = buy_uid + "_" + coin_from;
Num buyNum = new Num(buy, number1);
arrayList.add(buyNum);
}
//卖方
if (null != sale_uid1 && null != coin_to1) {
String sale_uid = sale_uid1.toString();
String coin_to = coin_to1.toString();
BigDecimal price = new BigDecimal(jsonObject1.get("price").toString());
BigDecimal number2 = number1.multiply(price);
Num saleNum = new Num(sale_uid + "_" + coin_to, number2);
arrayList.add(saleNum);
}
return arrayList;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
flink真实案例-读取kafka下沉redis-中间逻辑处理带原始数据.zip
共72个文件
xml:59个
java:10个
iml:1个
需积分: 50 44 下载量 155 浏览量
2019-12-06
17:42:50
上传
评论 3
收藏 48KB ZIP 举报
温馨提示
java语言实现的flink真实开发案例,从kafka获取数据,中间有逻辑处理,最终下沉到redis中,封装有内部方法,方便根据自己的情况使用。 -- 要求:jdk1.8 ,kafka ,redis 资源珍贵,喜欢的话可以下载。
资源推荐
资源详情
资源评论
收起资源包目录
flink真实案例-读取kafka下沉redis-中间逻辑处理带原始数据.zip (72个子文件)
combitzcoinbenifit
pom.xml 3KB
combitzcoinbenifit.iml 6KB
src
test
java
main
resources
application.properties 205B
java
com
bitz
coin
benifit
MainJob.java 938B
handle
Num.java 701B
Handle.java 3KB
send
MyKafkaProducer.java 3KB
sink
MyRedisMapper.java 1KB
SinkData.java 923B
file
order_coin 1KB
datasource
KafkaDatasource.java 2KB
OtherDatasource.java 408B
utils
ConfigUtil.java 900B
JsonUtil.java 712B
.idea
uiDesigner.xml 9KB
misc.xml 439B
modules.xml 276B
compiler.xml 648B
workspace.xml 49KB
libraries
Maven__org_apache_commons_commons_compress_1_4_1.xml 591B
Maven__org_apache_flink_flink_runtime_2_11_1_7_0.xml 597B
Maven__org_apache_flink_flink_shaded_asm_5_0_4_5_0.xml 611B
Maven__org_tukaani_xz_1_0.xml 451B
Maven__org_scala_lang_modules_scala_parser_combinators_2_11_1_0_4.xml 698B
Maven__commons_io_commons_io_2_4.xml 503B
Maven__org_apache_flink_force_shading_1_7_0.xml 562B
Maven__org_scala_lang_modules_scala_java8_compat_2_11_0_7_0.xml 656B
Maven__org_apache_flink_flink_connector_kafka_base_2_11_1_7_0.xml 688B
Maven__com_typesafe_akka_akka_stream_2_11_2_4_20.xml 594B
Maven__com_typesafe_ssl_config_core_2_11_0_2_1.xml 595B
Maven__org_apache_flink_flink_shaded_jackson_2_7_9_5_0.xml 639B
Maven__org_apache_flink_flink_queryable_state_client_java_2_11_1_7_0.xml 737B
Maven__org_javassist_javassist_3_19_0_GA.xml 550B
Maven__org_xerial_snappy_snappy_java_1_1_4.xml 552B
Maven__com_esotericsoftware_kryo_kryo_2_24_0.xml 542B
Maven__org_apache_flink_flink_clients_2_11_1_7_0.xml 597B
Maven__org_apache_commons_commons_pool2_2_3.xml 556B
Maven__com_typesafe_akka_akka_protobuf_2_11_2_4_20.xml 608B
Maven__com_google_code_findbugs_jsr305_1_3_9.xml 545B
Maven__org_slf4j_slf4j_nop_1_7_24.xml 513B
Maven__org_apache_commons_commons_lang3_3_3_2.xml 570B
Maven__org_apache_flink_flink_streaming_java_2_11_1_7_0.xml 646B
Maven__org_apache_bahir_flink_connector_redis_2_11_1_0.xml 639B
Maven__org_apache_flink_flink_shaded_netty_4_1_24_Final_5_0.xml 674B
Maven__com_typesafe_akka_akka_slf4j_2_11_2_4_20.xml 587B
Maven__org_apache_flink_flink_connector_kafka_2_11_1_7_0.xml 653B
Maven__org_apache_flink_flink_annotations_1_7_0.xml 590B
Maven__org_apache_flink_flink_java_1_7_0.xml 541B
Maven__com_typesafe_config_1_3_0.xml 497B
Maven__com_alibaba_fastjson_1_2_62.xml 514B
Maven__org_apache_commons_commons_math3_3_5.xml 556B
Maven__com_github_scopt_scopt_2_11_3_5_0.xml 541B
Maven__org_apache_flink_flink_metrics_core_1_7_0.xml 597B
Maven__commons_cli_commons_cli_1_3_1.xml 528B
Maven__redis_clients_jedis_2_8_0.xml 494B
Maven__org_lz4_lz4_java_1_4_1.xml 491B
Maven__com_typesafe_akka_akka_actor_2_11_2_4_20.xml 587B
Maven__org_reactivestreams_reactive_streams_1_0_0.xml 595B
Maven__org_scala_lang_scala_library_2_11_12.xml 568B
Maven__commons_collections_commons_collections_3_2_2.xml 616B
Maven__org_apache_flink_flink_shaded_guava_18_0_5_0.xml 618B
Maven__com_twitter_chill_java_0_7_6.xml 521B
Maven__org_clapper_grizzled_slf4j_2_11_1_3_2.xml 584B
Maven__org_objenesis_objenesis_2_1.xml 508B
Maven__org_slf4j_slf4j_api_1_7_15.xml 513B
Maven__org_apache_flink_flink_core_1_7_0.xml 541B
Maven__org_apache_flink_flink_optimizer_2_11_1_7_0.xml 611B
Maven__com_twitter_chill_2_11_0_7_6.xml 521B
Maven__com_esotericsoftware_minlog_minlog_1_2.xml 543B
Maven__org_apache_kafka_kafka_clients_2_0_1.xml 562B
Maven__org_apache_flink_flink_hadoop_fs_1_7_0.xml 576B
inspectionProfiles
Project_Default.xml 1KB
共 72 条
- 1
资源评论
weixin_42333583
- 粉丝: 350
- 资源: 19
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功