# Storm HDFS
Storm components for interacting with HDFS file systems
## Usage
The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
reach 5 megabytes in size.
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
HdfsBolt bolt = new HdfsBolt()
### Packaging a Topology
When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme
If you experience errors such as the following:
java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
it's an indication that your topology jar file isn't packaged properly.
If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to
create your topology jar:
### Specifying a Hadoop Version
By default, storm-hdfs uses the following Hadoop dependencies:
If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency
and add the dependencies for your preferred version in your pom.
Hadoop client version incompatibilites can manifest as errors like:
com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
## Customization
### Record Formats
Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat`
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and
tab-delimited files.
### File Naming
File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat`
public interface FileNameFormat extends Serializable {
void prepare(Map conf, TopologyContext topologyContext);
String getName(long rotation, long timeStamp);
String getPath();
The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat` will create file names with the following format:
For example:
By default, prefix is empty and extenstion is ".txt".
### Sync Policies
Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available
to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface:
public interface SyncPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt`
to perform a sync/flush, after which it will call the `reset()` method.
The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have
been processed.
### File Rotation Policies
Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a
`org.apache.storm.hdfs.rotation.FileRotation` interface:
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when
data files reach a specific file size:
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
### File Rotation Actions
Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
example, moving a file to a different location or renaming it.
public interface RotationAction extends Serializable {
void execute(FileSystem fileSystem, Path filePath) throws IOException;
Storm-HDFS includes a simple action that will move a file after rotation:
public class MoveFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
private String destination;
public MoveFileAction withDestination(String destDir){
destination = destDir;
return this;
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
Path destPath = new Path(destination, filePath.getName());
LOG.info("Moving file {} to {}", filePath, destPath);
boolean success = fileSystem.rename(filePath, destPath);
If you are using Trident and sequence files you can do something like this:
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
.withSequenceFormat(new DefaultSequenceFormat("key", "data"))
.addRotationAction(new MoveFileAction().withDestination("/dest2/"));
## Support for HDFS Sequence Files
The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files:
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
