# Storm HDFS
Storm components for interacting with HDFS file systems
## Usage
The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
reach 5 megabytes in size.
```java
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
```
### Packaging a Topology
When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
[maven-assembly-plugin]().
The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme
resolution.
If you experience errors such as the following:
```
java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
```
it's an indication that your topology jar file isn't packaged properly.
If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to
create your topology jar:
```xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
```
### Specifying a Hadoop Version
By default, storm-hdfs uses the following Hadoop dependencies:
```xml
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
```
If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency
and add the dependencies for your preferred version in your pom.
Hadoop client version incompatibilites can manifest as errors like:
```
com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
```
## Customization
### Record Formats
Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat`
interface:
```java
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
}
```
The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and
tab-delimited files.
### File Naming
File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat`
interface:
```java
public interface FileNameFormat extends Serializable {
void prepare(Map conf, TopologyContext topologyContext);
String getName(long rotation, long timeStamp);
String getPath();
}
```
The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat` will create file names with the following format:
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
For example:
MyBolt-5-7-1390579837830.txt
By default, prefix is empty and extenstion is ".txt".
### Sync Policies
Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available
to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface:
```java
public interface SyncPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
```
The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt`
to perform a sync/flush, after which it will call the `reset()` method.
The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have
been processed.
### File Rotation Policies
Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a
`org.apache.storm.hdfs.rotation.FileRotation` interface:
```java
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
```
The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when
data files reach a specific file size:
```java
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
```
### File Rotation Actions
Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
example, moving a file to a different location or renaming it.
```java
public interface RotationAction extends Serializable {
void execute(FileSystem fileSystem, Path filePath) throws IOException;
}
```
Storm-HDFS includes a simple action that will move a file after rotation:
```java
public class MoveFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
private String destination;
public MoveFileAction withDestination(String destDir){
destination = destDir;
return this;
}
@Override
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
Path destPath = new Path(destination, filePath.getName());
LOG.info("Moving file {} to {}", filePath, destPath);
boolean success = fileSystem.rename(filePath, destPath);
return;
}
}
```
If you are using Trident and sequence files you can do something like this:
```java
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
.withFileNameFormat(fileNameFormat)
.withSequenceFormat(new DefaultSequenceFormat("key", "data"))
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://localhost:54310")
.addRotationAction(new MoveFileAction().withDestination("/dest2/"));
```
## Support for HDFS Sequence Files
The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files:
```java
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRot
没有合适的资源?快使用搜索试试~ 我知道了~
apache-storm-0.9.5.tar
需积分: 10 48 下载量 37 浏览量
2015-12-11
10:55:08
上传
评论
收藏 19.25MB GZ 举报
温馨提示
apache-storm-0.9.5.tar
资源推荐
资源详情
资源评论
收起资源包目录
apache-storm-0.9.5.tar (134个子文件)
word_count.clj 3KB
storm.cmd 8KB
storm-config.cmd 3KB
bootstrap-1.4.0.css 55KB
style.css 3KB
DISCLAIMER 538B
spinner.gif 23KB
component-page-template.html 17KB
topology-page-template.html 12KB
component.html 6KB
topology.html 6KB
index-page-template.html 6KB
index.html 4KB
json-error-template.html 884B
storm_env.ini 1KB
storm-core-0.9.5.jar 10.37MB
clojure-1.5.1.jar 3.42MB
storm-starter-topologies-0.9.5.jar 3.1MB
joda-time-2.0.jar 556KB
jetty-6.1.26.jar 527KB
logback-core-1.0.13.jar 409KB
kryo-2.21.jar 355KB
jgrapht-core-0.9.0.jar 325KB
commons-lang-2.5.jar 273KB
snakeyaml-1.11.jar 264KB
logback-classic-1.0.13.jar 258KB
commons-codec-1.6.jar 227KB
jline-2.11.jar 204KB
commons-io-2.4.jar 181KB
jetty-util-6.1.26.jar 173KB
servlet-api-2.5.jar 103KB
storm-kafka-0.9.5.jar 78KB
reflectasm-1.07-shaded.jar 64KB
commons-logging-1.1.3.jar 61KB
commons-fileupload-1.2.1.jar 56KB
storm-hdfs-0.9.5.jar 52KB
commons-exec-1.1.jar 51KB
disruptor-2.10.1.jar 50KB
chill-java-0.3.5.jar 46KB
storm-hbase-0.9.5.jar 46KB
asm-4.0.jar 45KB
objenesis-1.2.jar 35KB
carbonite-1.4.0.jar 26KB
slf4j-api-1.7.5.jar 25KB
ring-core-1.1.5.jar 21KB
log4j-over-slf4j-1.6.6.jar 20KB
json-simple-1.1.jar 16KB
clj-time-0.4.1.jar 9KB
hiccup-0.3.6.jar 8KB
tools.logging-0.2.3.jar 7KB
compojure-1.1.3.jar 6KB
ring-devel-0.3.11.jar 6KB
clj-stacktrace-0.2.2.jar 5KB
tools.macro-0.1.0.jar 5KB
minlog-1.2.jar 5KB
math.numeric-tower-0.0.1.jar 5KB
tools.cli-0.2.4.jar 3KB
ring-servlet-0.3.11.jar 3KB
core.incubator-0.1.0.jar 3KB
clout-1.0.1.jar 3KB
ring-jetty-adapter-0.3.11.jar 2KB
RankingsTest.java 13KB
RankableObjectWithFieldsTest.java 10KB
TransactionalWords.java 8KB
ReachTopology.java 7KB
RollingCountBolt.java 6KB
SlotBasedCounterTest.java 6KB
TransactionalGlobalCount.java 6KB
TridentReach.java 5KB
NthLastModifiedTimeTrackerTest.java 5KB
RollingTopWords.java 5KB
SlidingWindowCounter.java 5KB
IntermediateRankingsBoltTest.java 5KB
TotalRankingsBoltTest.java 5KB
RankableObjectWithFields.java 4KB
SingleJoinBolt.java 4KB
TwitterSampleSpout.java 4KB
Rankings.java 4KB
SlidingWindowCounterTest.java 4KB
AbstractRankerBolt.java 4KB
RollingCountBoltTest.java 4KB
TridentWordCount.java 4KB
WordCountTopologyNode.java 4KB
WordCountTopology.java 3KB
SlotBasedCounter.java 3KB
BasicDRPCTopology.java 3KB
ExclamationTopology.java 3KB
ManualDRPC.java 2KB
NthLastModifiedTimeTracker.java 2KB
SingleJoinExample.java 2KB
PrintSampleStream.java 2KB
RandomSentenceSpout.java 2KB
TotalRankingsBolt.java 2KB
IntermediateRankingsBolt.java 2KB
StormRunner.java 2KB
MockTupleHelpers.java 1KB
PrinterBolt.java 1KB
TupleHelpers.java 1KB
Rankable.java 1KB
jquery-1.6.2.min.js 89KB
共 134 条
- 1
- 2
资源评论
醉无吟
- 粉丝: 6
- 资源: 19
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 量化交易-RSI策略(vectorbt实现)
- Java答题期末考试必须考
- 组播报文转发原理的及图解实例
- 青龙燕铁衣-数据集.zip
- 指针扫描和内存遍历二合一工具
- 基于JavaScript的在线考试系统(编号:65965158)(1).zip
- 五相电机双闭环矢量控制模型-采用邻近四矢量SVPWM-MATLAB-Simulink仿真模型包括: (1)原理说明文档(重要):包括扇区判断、矢量作用时间计算、矢量作用顺序及切时间计算、PWM波的生成
- Linux下的cursor安装包
- springboot-教务管理系统(编号:62528147).zip
- 3dmmods_倾城系列月白_by_白嫖萌新.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功