package bhz.trident.example;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.tuple.TridentTuple;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* <B>系统名称:</B>测试TridentFunction使用<BR>
* <B>模块名称:</B><BR>
* <B>中文类名:</B><BR>
* <B>概要说明:</B><BR>
* @author bhz(Alienware)
* @since 2013年2月15日
*/
public class TridentFunction {
//继承BaseFunction类,重新execute方法
public static class SumFunction extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println("传入进来的内容为:" + tuple);
//获取a、b俩个域
int a = tuple.getInteger(0);
int b = tuple.getInteger(1);
int sum = a + b;
//发射数据
collector.emit(new Values(sum));
}
}
//继承BaseFunction类,重新execute方法
public static class Result extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//获取tuple输入内容
System.out.println();
Integer a = tuple.getIntegerByField("a");
Integer b = tuple.getIntegerByField("b");
Integer c = tuple.getIntegerByField("c");
Integer d = tuple.getIntegerByField("d");
System.out.println("a: "+ a + ", b: " + b + ", c: " + c + ", d: " + d);
Integer sum = tuple.getIntegerByField("sum");
System.out.println("sum: "+ sum);
}
}
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
//设定数据源
FixedBatchSpout spout = new FixedBatchSpout(
new Fields("a", "b", "c", "d"), //声明输入的域字段为"a"、"b"、"c"、"d"
4, //设置批处理大小为1
//设置数据源内容
//测试数据
new Values(1, 4, 7, 10),
new Values(1, 1, 3, 11),
new Values(2, 2, 7, 1),
new Values(2, 5, 7, 2));
//指定是否循环
spout.setCycle(false);
//指定输入源spout
Stream inputStream = topology.newStream("spout", spout);
/**
* 要实现流sqout - bolt的模式 在trident里是使用each来做的
* each方法参数:
* 1.输入数据源参数名称:"a", "b", "c", "d"
* 2.需要流转执行的function对象(也就是bolt对象):new SumFunction()
* 3.指定function对象里的输出参数名称:sum
*/
inputStream.each(new Fields("a", "b", "c", "d"), new SumFunction(), new Fields("sum"))
/**
* 继续使用each调用下一个function(bolt)
* 第一个输入参数为:"a", "b", "c", "d", "sum"
* 第二个参数为:new Result() 也就是执行函数,第三个参数为没有输出
*/
.each(new Fields("a", "b", "c", "d", "sum"), new Result(), new Fields());
return topology.build(); //利用这种方式,我们返回一个StormTopology对象,进行提交
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
//设置batch最大处理
conf.setNumWorkers(2);
conf.setMaxSpoutPending(20);
if(args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-function", conf, buildTopology());
Thread.sleep(10000);
cluster.shutdown();
} else {
StormSubmitter.submitTopology(args[0], conf, buildTopology());
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
storm之TridentFilter使用示例代码.zip
共19个文件
java:13个
prefs:3个
xml:1个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 145 浏览量
2023-12-28
12:18:46
上传
评论
收藏 27KB ZIP 举报
温馨提示
storm之TridentFilter使用示例代码
资源推荐
资源详情
资源评论
收起资源包目录
storm之TridentFilter使用示例代码.zip (19个子文件)
storm之TridentFilter使用示例代码
storm05
.classpath 1024B
.settings
org.eclipse.jdt.core.prefs 243B
org.eclipse.core.resources.prefs 119B
org.eclipse.m2e.core.prefs 90B
pom.xml 2KB
src
test
java
main
java
bhz
trident
example
TridentFilter.java 3KB
TridentFunction.java 4KB
wordcount
SplitFunction.java 923B
SubjectsSpout.java 2KB
ResultFunction.java 807B
WordCountTopology.java 3KB
strategy
WriteFunction.java 1KB
StrategyTopology.java 3KB
example
FakeTweetSpout.java 2KB
DistributedRPC.java 2KB
TridentUtility.java 1KB
CountryRepartition.java 1KB
TridentHelloWorldTopology.java 1KB
.project 559B
共 19 条
- 1
资源评论
小小哭包
- 粉丝: 1900
- 资源: 3864
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- ### 1、项目介绍 本项目Scrapy进行数据爬取,并使用Django框架+PyEcharts实现可视化大屏 效果如下:
- # 微信小程序-健康菜谱 基于微信小程序的一个查找检索菜谱的应用 ### 效果 !动态图(./res/gif/demo
- zabbix-get命令包资源
- 毕业设计,基于PyQt5实现的可视化界面的Python车牌自动识别系统源码
- 26-朴素贝叶斯分类.rar
- 没有安Matlab 也可以 生成FIR抽头系数工具.py
- python烟花代码.rar
- 实验目的: 1.构建基于verilog语言的组合逻辑电路和时序逻辑电路; 2.掌握verilog语言的电路设计技巧 3.完成如
- 扩展卡尔曼滤波matlab仿真
- 3_base.apk.1
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功