# Storm HDFS
Storm components for interacting with HDFS file systems
- [HDFS Bolt](#hdfs-bolt)
- [HDFS Spout](#hdfs-spout)
---
# HDFS Bolt
## Usage
The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
reach 5 megabytes in size.
```java
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
```
### Packaging a Topology
When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
[maven-assembly-plugin]().
The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme
resolution.
If you experience errors such as the following:
```
java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
```
it's an indication that your topology jar file isn't packaged properly.
If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to
create your topology jar:
```xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
```
### Specifying a Hadoop Version
By default, storm-hdfs uses the following Hadoop dependencies:
```xml
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
```
If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency
and add the dependencies for your preferred version in your pom.
Hadoop client version incompatibilites can manifest as errors like:
```
com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
```
## HDFS Bolt Customization
### Record Formats
Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat`
interface:
```java
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
}
```
The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and
tab-delimited files.
### File Naming
File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat`
interface:
```java
public interface FileNameFormat extends Serializable {
void prepare(Map conf, TopologyContext topologyContext);
String getName(long rotation, long timeStamp);
String getPath();
}
```
The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat` will create file names with the following format:
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
For example:
MyBolt-5-7-1390579837830.txt
By default, prefix is empty and extenstion is ".txt".
**New FileNameFormat:**
The new provided `org.apache.storm.hdfs.format.SimpleFileNameFormat` and `org.apache.storm.hdfs.trident.format.SimpleFileNameFormat` are more flexible, and the `withName` method support parameters as following:
* $TIME - current time. use `withTimeFormat` to format.
* $NUM - rotation number
* $HOST - local host name
* $PARTITION - partition index (`org.apache.storm.hdfs.trident.format.SimpleFileNameFormat` only)
* $COMPONENT - component id (`org.apache.storm.hdfs.format.SimpleFileNameFormat` only)
* $TASK - task id (`org.apache.storm.hdfs.format.SimpleFileNameFormat` only)
eg: `seq.$TIME.$HOST.$COMPONENT.$NUM.dat`
The default file `name` is `$TIME.$NUM.txt`, and the default `timeFormat` is `yyyyMMddHHmmss`.
### Sync Policies
Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available
to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface:
```java
public interface SyncPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
```
The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt`
to perform a sync/flush, after which it will call the `reset()` method.
The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have
been processed.
### File Rotation Policies
Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a
`org.apache.storm.hdfs.rotation.FileRotation` interface:
```java
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
FileRotationPolicy copy();
}
```
The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when
data files reach a specific file size:
```java
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
```
### File Rotation Actions
Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
example, moving a file to a different location or renaming it.
```java
public interface RotationAction extends Serializable {
void execute(FileSystem fileSystem, Path filePath) throws IOException;
}
```
Storm-HDFS includes a simple action that will move a file after rotation:
```java
public class MoveFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
private String destination;
public MoveFileAction withDestination(String destDir){
destination = destDir;
return this;
}
@Override
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
Path destPath = new Path(destination, filePath.getName());
LOG.info("Moving file {} to {}", filePath, destPath);
boolean success = fileSystem.rename(fi
没有合适的资源?快使用搜索试试~ 我知道了~
Apache Storm(apache-storm-2.3.0.tar.gz)
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 27 浏览量
2022-02-17
09:39:04
上传
评论
收藏 304.62MB GZ 举报
温馨提示
共921个文件
jar:537个
java:212个
html:28个
Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言一起使用,而且使用起来非常有趣! Apache Storm 有很多用例:实时分析、在线机器学习、连续计算、分布式 RPC、ETL 等等。Apache Storm 速度很快:基准测试显示每个节点每秒处理超过一百万个元组。它具有可扩展性、容错性,保证您的数据将得到处理,并且易于设置和操作。 Apache Storm 与您已经使用的队列和数据库技术集成。Apache Storm 拓扑使用数据流并以任意复杂的方式处理这些流,根据需要在计算的每个阶段之间对流进行重新分区。在教程中阅读更多内容。
资源推荐
资源详情
资源评论
收起资源包目录
Apache Storm(apache-storm-2.3.0.tar.gz) (921个子文件)
flight.bash 5KB
bolts_test.clj 5KB
bolts.clj 3KB
word_count.clj 3KB
rolling_top_words.clj 2KB
exclamation.clj 2KB
bootstrap-3.3.1.min.css 111KB
vis.min.css 22KB
jquery.dataTables.1.10.4.min.css 15KB
dataTables.bootstrap.css 7KB
style.css 5KB
jsonFormatter.min.css 2KB
Audit.50.csv 5KB
DEPENDENCY-LICENSES 74KB
seccomp.json.example 7KB
spinner.gif 23KB
javax.annotation CDDL 1.1.html 229KB
Jersey CDDL 1.1.html 147KB
BSD 2-clause.html 30KB
The MIT License.html 30KB
topology-page-template.html 28KB
mpl 1.1 - mpl-1.1.html 27KB
component-page-template.html 25KB
component.html 24KB
topology.html 19KB
index-page-template.html 18KB
mozilla public license version 2.0 - 2.0.html 17KB
index.html 10KB
owner.html 9KB
owner-page-template.html 9KB
deep_search_result.html 9KB
supervisor-page-template.html 7KB
search_result.html 6KB
supervisor.html 5KB
ANTLR V4 BSD license.html 5KB
visualize.html 5KB
flux.html 4KB
logviewer_search.html 4KB
deep-search-result-page-template.html 3KB
search-result-page-template.html 2KB
user-template.html 2KB
logviewer-search-page-template.html 2KB
logviewer.html 1KB
json-error-template.html 884B
favicon.ico 18KB
storm_env.ini 1KB
hive-exec-2.3.4.jar 32.63MB
fastutil-6.5.6.jar 16.12MB
rocksdbjni-5.18.4.jar 14.97MB
storm-shaded-deps-2.3.0.jar 14.95MB
storm-shaded-deps-2.3.0.jar 14.95MB
hive-metastore-2.3.4.jar 7.74MB
groovy-all-2.4.4.jar 6.67MB
hbase-protocol-shaded-2.1.3.jar 5.82MB
avatica-1.8.0.jar 4.98MB
storm-client-2.3.0.jar 4.87MB
storm-client-2.3.0.jar 4.87MB
hadoop-hdfs-2.8.5.jar 4.63MB
hbase-server-2.1.3.jar 4.61MB
calcite-core-1.14.0.jar 4.39MB
calcite-core-1.14.0.jar 4.39MB
hadoop-hdfs-client-2.8.5.jar 3.9MB
hadoop-common-2.8.5.jar 3.81MB
hbase-shaded-netty-2.1.0.jar 3.79MB
hbase-shaded-miscellaneous-2.1.0.jar 3.72MB
clojure-1.10.0.jar 3.72MB
netty-all-4.1.30.Final.jar 3.71MB
hbase-protocol-2.1.3.jar 3.08MB
curator-client-4.2.0.jar 2.84MB
curator-client-4.2.0.jar 2.84MB
curator-client-4.2.0.jar 2.84MB
parquet-hadoop-bundle-1.8.1.jar 2.77MB
derby-10.10.2.0.jar 2.71MB
guava-27.0.1-jre.jar 2.62MB
guava-27.0.1-jre.jar 2.62MB
guava-27.0.1-jre.jar 2.62MB
guava-27.0.1-jre.jar 2.62MB
guava-27.0.1-jre.jar 2.62MB
hadoop-yarn-api-2.8.5.jar 2.34MB
guava-16.0.1.jar 2.12MB
guava-16.0.1.jar 2.12MB
commons-math3-3.6.1.jar 2.11MB
datanucleus-core-4.1.17.jar 1.92MB
ant-1.9.1.jar 1.9MB
datanucleus-rdbms-4.1.19.jar 1.82MB
hadoop-yarn-common-2.8.5.jar 1.72MB
hbase-client-2.1.3.jar 1.71MB
slider-core-0.90.2-incubating.jar 1.7MB
jetty-all-7.6.0.v20120127.jar 1.6MB
log4j-core-2.11.2.jar 1.55MB
log4j-core-2.11.2.jar 1.55MB
log4j-core-2.11.2.jar 1.55MB
log4j-core-2.11.2.jar 1.55MB
hadoop-yarn-server-resourcemanager-2.8.5.jar 1.54MB
jcodings-1.0.18.jar 1.53MB
hadoop-mapreduce-client-core-2.8.5.jar 1.5MB
hive-service-rpc-2.3.4.jar 1.48MB
htrace-core4-4.2.0-incubating.jar 1.44MB
hbase-shaded-protobuf-2.1.0.jar 1.43MB
kafka-clients-0.11.0.3.jar 1.35MB
共 921 条
- 1
- 2
- 3
- 4
- 5
- 6
- 10
资源评论
YunFeiDong
- 粉丝: 33
- 资源: 3849
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功