Apache Flume
What Is Flume?
Apache Flume is a distributed log and event ingestion service designed to collect, aggregate, and move large volumes of streaming data into HDFS. It was built specifically for the use case of continuously streaming log files from application servers into a Hadoop cluster.
Application Servers (logs, events)
│
▼
Flume Source ──► Channel ──► Sink
(receives data) (buffers) (writes to HDFS/HBase/Kafka)
Flume is reliable — if the sink is unavailable, the channel buffers events until delivery is possible.
Core Concepts
| Component | Role |
|---|---|
| Source | Receives events (Avro, Syslog, Exec, Spooling Directory, Kafka, HTTP) |
| Channel | Buffers events between source and sink (Memory, File, Kafka) |
| Sink | Writes events to destination (HDFS, HBase, Kafka, Logger, Avro) |
| Agent | A JVM process running one or more source-channel-sink pipelines |
| Event | A unit of data: byte payload + optional headers |
| Interceptor | Transforms or filters events in-flight (add timestamp, regex filter, etc.) |
Basic Configuration
Flume agents are configured via .properties files. Each agent defines its sources, channels, and sinks:
Example: Tail a log file → HDFS
# flume-hdfs.conf
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1
# Source: exec (run a command and treat stdout as events)
agent1.sources.src1.type = exec
agent1.sources.src1.command = tail -F /var/log/app/application.log
agent1.sources.src1.channels = ch1
# Channel: file-based (survives agent restart)
agent1.channels.ch1.type = file
agent1.channels.ch1.checkpointDir = /var/flume/checkpoint
agent1.channels.ch1.dataDirs = /var/flume/data
# Sink: HDFS
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.channel = ch1
agent1.sinks.sink1.hdfs.path = /flume/logs/%Y/%m/%d/%H
agent1.sinks.sink1.hdfs.filePrefix = app-log
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.rollInterval = 300 # roll file every 5 minutes
agent1.sinks.sink1.hdfs.rollSize = 67108864 # roll at 64 MB
agent1.sinks.sink1.hdfs.rollCount = 0 # disable roll by event count
agent1.sinks.sink1.hdfs.batchSize = 1000
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
Example: Spooling directory → HDFS
Watch a directory for new files and ingest them:
agent1.sources.src1.type = spooldir
agent1.sources.src1.spoolDir = /data/incoming
agent1.sources.src1.fileHeader = true
agent1.sources.src1.deletePolicy = immediate # or 'never' to keep originals
agent1.sources.src1.channels = ch1
Example: Syslog UDP → HDFS
agent1.sources.src1.type = syslogudp
agent1.sources.src1.host = 0.0.0.0
agent1.sources.src1.port = 5140
agent1.sources.src1.channels = ch1
Adding Interceptors
Interceptors modify events before they reach the sink:
# Add a timestamp header to every event
agent1.sources.src1.interceptors = ts
agent1.sources.src1.interceptors.ts.type = timestamp
# Add a static header (e.g., hostname)
agent1.sources.src1.interceptors = host
agent1.sources.src1.interceptors.host.type = static
agent1.sources.src1.interceptors.host.key = hostname
agent1.sources.src1.interceptors.host.value = web01.example.com
# Filter out events matching a regex
agent1.sources.src1.interceptors = filter
agent1.sources.src1.interceptors.filter.type = regex_filter
agent1.sources.src1.interceptors.filter.regex = .*DEBUG.*
agent1.sources.src1.interceptors.filter.excludeEvents = true
Multi-Agent Fan-In (Log Aggregation)
A common pattern: many application servers each run a Flume agent that forwards to a central aggregator agent:
App Server 1 ──► Flume Agent (Avro Source + Avro Sink) ──┐
App Server 2 ──► Flume Agent (Avro Source + Avro Sink) ──┼──► Aggregator Agent ──► HDFS
App Server 3 ──► Flume Agent (Avro Source + Avro Sink) ──┘
Collector agent (on each app server):
collector.sources.src1.type = exec
collector.sources.src1.command = tail -F /var/log/app/app.log
collector.sources.src1.channels = ch1
collector.channels.ch1.type = memory
collector.sinks.sink1.type = avro
collector.sinks.sink1.channel = ch1
collector.sinks.sink1.hostname = aggregator.example.com
collector.sinks.sink1.port = 41414
Aggregator agent (central server):
aggregator.sources.src1.type = avro
aggregator.sources.src1.bind = 0.0.0.0
aggregator.sources.src1.port = 41414
aggregator.sources.src1.channels = ch1
aggregator.channels.ch1.type = file
aggregator.channels.ch1.checkpointDir = /var/flume/checkpoint
aggregator.channels.ch1.dataDirs = /var/flume/data
aggregator.sinks.sink1.type = hdfs
aggregator.sinks.sink1.channel = ch1
aggregator.sinks.sink1.hdfs.path = /logs/%Y/%m/%d
aggregator.sinks.sink1.hdfs.fileType = DataStream
aggregator.sinks.sink1.hdfs.rollInterval = 300
Starting an Agent
flume-ng agent \
--name agent1 \
--conf /etc/flume/conf \
--conf-file /etc/flume/conf/flume-hdfs.conf \
-Dflume.root.logger=INFO,console
HDFS Sink File Rolling Options
| Property | Description | Recommended |
|---|---|---|
hdfs.rollInterval | Roll file after N seconds | 300–600 |
hdfs.rollSize | Roll file after N bytes | 67108864 (64 MB) |
hdfs.rollCount | Roll file after N events | 0 (disable) |
hdfs.fileType | DataStream (text) or SequenceFile | DataStream for logs |
Set at least one roll trigger — never set all three to 0 or files stay open indefinitely and appear as .tmp in HDFS.