# Storm HDFS
Storm components for interacting with HDFS file systems
## Usage
The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
reach 5 megabytes in size.
```java
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
```
### Packaging a Topology
When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
[maven-assembly-plugin]().
The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme
resolution.
If you experience errors such as the following:
```
java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
```
it's an indication that your topology jar file isn't packaged properly.
If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to
create your topology jar:
```xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
```
### Specifying a Hadoop Version
By default, storm-hdfs uses the following Hadoop dependencies:
```xml
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
```
If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency
and add the dependencies for your preferred version in your pom.
Hadoop client version incompatibilites can manifest as errors like:
```
com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
```
## Customization
### Record Formats
Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat`
interface:
```java
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
}
```
The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and
tab-delimited files.
### File Naming
File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat`
interface:
```java
public interface FileNameFormat extends Serializable {
void prepare(Map conf, TopologyContext topologyContext);
String getName(long rotation, long timeStamp);
String getPath();
}
```
The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat` will create file names with the following format:
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
For example:
MyBolt-5-7-1390579837830.txt
By default, prefix is empty and extenstion is ".txt".
### Sync Policies
Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available
to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface:
```java
public interface SyncPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
```
The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt`
to perform a sync/flush, after which it will call the `reset()` method.
The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have
been processed.
### File Rotation Policies
Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a
`org.apache.storm.hdfs.rotation.FileRotation` interface:
```java
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
```
The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when
data files reach a specific file size:
```java
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
```
### File Rotation Actions
Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
example, moving a file to a different location or renaming it.
```java
public interface RotationAction extends Serializable {
void execute(FileSystem fileSystem, Path filePath) throws IOException;
}
```
Storm-HDFS includes a simple action that will move a file after rotation:
```java
public class MoveFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
private String destination;
public MoveFileAction withDestination(String destDir){
destination = destDir;
return this;
}
@Override
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
Path destPath = new Path(destination, filePath.getName());
LOG.info("Moving file {} to {}", filePath, destPath);
boolean success = fileSystem.rename(filePath, destPath);
return;
}
}
```
If you are using Trident and sequence files you can do something like this:
```java
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
.withFileNameFormat(fileNameFormat)
.withSequenceFormat(new DefaultSequenceFormat("key", "data"))
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://localhost:54310")
.addRotationAction(new MoveFileAction().withDestination("/dest2/"));
```
## Support for HDFS Sequence Files
The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files:
```java
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRot
![avatar](https://profile-avatar.csdnimg.cn/324ae4f4eff04b688916c93982f04d5a_shijinxin3907837.jpg!1)
醉无吟
- 粉丝: 19
- 资源: 19
最新资源
- 【JCR一区级】秃鹰算法BES-Transformer-GRU负荷数据回归预测【含Matlab源码 6347期】.zip
- 【独家首发】开普勒算法KOA优化Transformer-BiLSTM负荷数据回归预测【含Matlab源码 6560期】.zip
- 【JCR一区级】雾凇算法RIME-Transformer-GRU负荷数据回归预测【含Matlab源码 6348期】.zip
- 【JCR1区】雪融算法SAO-CNN-SVM故障诊断分类预测【含Matlab源码 5823期】.zip
- 【JCR1区】蚁狮算法ALO-CNN-SVM故障诊断分类预测【含Matlab源码 5825期】.zip
- 【JCR一区级】鹈鹕算法POA-Transformer-GRU负荷数据回归预测【含Matlab源码 6345期】.zip
- 【JCR一区级】金豺算法GJO-Transformer-GRU负荷数据回归预测【含Matlab源码 6326期】.zip
- 【JCR一区级】天鹰算法AO-Transformer-GRU负荷数据回归预测【含Matlab源码 6346期】.zip
- 【LSTM时序预测】鲸鱼算法优化卷积长短期记忆神经网络WOA-CNN-LSTM股价序列预测【含Matlab源码 3008期】.zip
- 【独家首发】粒子群算法PSO优化Transformer-LSTM负荷数据回归预测【含Matlab源码 6388期】.zip
- 【JCR1区】遗传算法GA-CNN-SVM故障诊断分类预测【含Matlab源码 5824期】.zip
- 【JCR1区】飞蛾扑火算法MFO-CNN-SVM故障诊断分类预测【含Matlab源码 5784期】.zip
- 【JCR1区】引力搜索算法GSA-CNN-SVM故障诊断分类预测【含Matlab源码 5826期】.zip
- 【JCR一区级】金枪鱼算法TSO-Transformer-GRU负荷数据回归预测【含Matlab源码 6327期】.zip
- 【JCR一区级】鲸鱼算法WOA-Transformer-GRU负荷数据回归预测【含Matlab源码 6328期】.zip
- 【JCR一区级】淘金算法GRO-Transformer-GRU负荷数据回归预测【含Matlab源码 6344期】.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
![feedback](https://img-home.csdnimg.cn/images/20220527035711.png)
![feedback](https://img-home.csdnimg.cn/images/20220527035711.png)
![feedback-tip](https://img-home.csdnimg.cn/images/20220527035111.png)