Skip to main content

Apache Flume

What Is Flume?

Apache Flume is a distributed log and event ingestion service designed to collect, aggregate, and move large volumes of streaming data into HDFS. It was built specifically for the use case of continuously streaming log files from application servers into a Hadoop cluster.

Application Servers (logs, events)


Flume Source ──► Channel ──► Sink
(receives data) (buffers) (writes to HDFS/HBase/Kafka)

Flume is reliable — if the sink is unavailable, the channel buffers events until delivery is possible.

Core Concepts

ComponentRole
SourceReceives events (Avro, Syslog, Exec, Spooling Directory, Kafka, HTTP)
ChannelBuffers events between source and sink (Memory, File, Kafka)
SinkWrites events to destination (HDFS, HBase, Kafka, Logger, Avro)
AgentA JVM process running one or more source-channel-sink pipelines
EventA unit of data: byte payload + optional headers
InterceptorTransforms or filters events in-flight (add timestamp, regex filter, etc.)

Basic Configuration

Flume agents are configured via .properties files. Each agent defines its sources, channels, and sinks:

Example: Tail a log file → HDFS

# flume-hdfs.conf

agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1

# Source: exec (run a command and treat stdout as events)
agent1.sources.src1.type = exec
agent1.sources.src1.command = tail -F /var/log/app/application.log
agent1.sources.src1.channels = ch1

# Channel: file-based (survives agent restart)
agent1.channels.ch1.type = file
agent1.channels.ch1.checkpointDir = /var/flume/checkpoint
agent1.channels.ch1.dataDirs = /var/flume/data

# Sink: HDFS
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.channel = ch1
agent1.sinks.sink1.hdfs.path = /flume/logs/%Y/%m/%d/%H
agent1.sinks.sink1.hdfs.filePrefix = app-log
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.rollInterval = 300 # roll file every 5 minutes
agent1.sinks.sink1.hdfs.rollSize = 67108864 # roll at 64 MB
agent1.sinks.sink1.hdfs.rollCount = 0 # disable roll by event count
agent1.sinks.sink1.hdfs.batchSize = 1000
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

Example: Spooling directory → HDFS

Watch a directory for new files and ingest them:

agent1.sources.src1.type              = spooldir
agent1.sources.src1.spoolDir = /data/incoming
agent1.sources.src1.fileHeader = true
agent1.sources.src1.deletePolicy = immediate # or 'never' to keep originals
agent1.sources.src1.channels = ch1

Example: Syslog UDP → HDFS

agent1.sources.src1.type        = syslogudp
agent1.sources.src1.host = 0.0.0.0
agent1.sources.src1.port = 5140
agent1.sources.src1.channels = ch1

Adding Interceptors

Interceptors modify events before they reach the sink:

# Add a timestamp header to every event
agent1.sources.src1.interceptors = ts
agent1.sources.src1.interceptors.ts.type = timestamp

# Add a static header (e.g., hostname)
agent1.sources.src1.interceptors = host
agent1.sources.src1.interceptors.host.type = static
agent1.sources.src1.interceptors.host.key = hostname
agent1.sources.src1.interceptors.host.value = web01.example.com

# Filter out events matching a regex
agent1.sources.src1.interceptors = filter
agent1.sources.src1.interceptors.filter.type = regex_filter
agent1.sources.src1.interceptors.filter.regex = .*DEBUG.*
agent1.sources.src1.interceptors.filter.excludeEvents = true

Multi-Agent Fan-In (Log Aggregation)

A common pattern: many application servers each run a Flume agent that forwards to a central aggregator agent:

App Server 1 ──► Flume Agent (Avro Source + Avro Sink) ──┐
App Server 2 ──► Flume Agent (Avro Source + Avro Sink) ──┼──► Aggregator Agent ──► HDFS
App Server 3 ──► Flume Agent (Avro Source + Avro Sink) ──┘

Collector agent (on each app server):

collector.sources.src1.type         = exec
collector.sources.src1.command = tail -F /var/log/app/app.log
collector.sources.src1.channels = ch1

collector.channels.ch1.type = memory

collector.sinks.sink1.type = avro
collector.sinks.sink1.channel = ch1
collector.sinks.sink1.hostname = aggregator.example.com
collector.sinks.sink1.port = 41414

Aggregator agent (central server):

aggregator.sources.src1.type        = avro
aggregator.sources.src1.bind = 0.0.0.0
aggregator.sources.src1.port = 41414
aggregator.sources.src1.channels = ch1

aggregator.channels.ch1.type = file
aggregator.channels.ch1.checkpointDir = /var/flume/checkpoint
aggregator.channels.ch1.dataDirs = /var/flume/data

aggregator.sinks.sink1.type = hdfs
aggregator.sinks.sink1.channel = ch1
aggregator.sinks.sink1.hdfs.path = /logs/%Y/%m/%d
aggregator.sinks.sink1.hdfs.fileType = DataStream
aggregator.sinks.sink1.hdfs.rollInterval = 300

Starting an Agent

flume-ng agent \
--name agent1 \
--conf /etc/flume/conf \
--conf-file /etc/flume/conf/flume-hdfs.conf \
-Dflume.root.logger=INFO,console

HDFS Sink File Rolling Options

PropertyDescriptionRecommended
hdfs.rollIntervalRoll file after N seconds300–600
hdfs.rollSizeRoll file after N bytes67108864 (64 MB)
hdfs.rollCountRoll file after N events0 (disable)
hdfs.fileTypeDataStream (text) or SequenceFileDataStream for logs

Set at least one roll trigger — never set all three to 0 or files stay open indefinitely and appear as .tmp in HDFS.