티스토리 뷰

1. 개요

  • 로그 데이터를 중앙으로 전달하고 저장하는 툴 비교

2. Apache Flume

  • https://flume.apache.org/
  • Flume은 수로라는 뜻으로 로그를 수로를 통해서 한곳에 모은 다는 뜻으로 풀이 될 수 있다.
  • Flume은 과거 Cloudera에서 제작하여, 지금은 Apache 톱레벨 프로젝트가 되었다.
  • Flume OG(v0.9.x 이하)의 경우 master 노드를 통해서, 제어되는 방식이었으나, NG(v1.x 이상)로 변경되면서, Agent에서 직접 제어 방식으로 변경되었다.
  • JAVA로 만들어졌고, 이벤트 방식으로 제작되었다.(로그 한 줄이 하나의 이벤트)
  • tail 명령어로 데이터를 읽어올 수 있고, Syslog, Scribe 등을 통해서 로그를 가져올 수 있다.
  • Thrift를 이용해서 직접 어플리케이션에서 로그 저장이 가능하다.(언어별 모듈 존재함)
  • 다양한 Source와 Sink 데이터를 지원하고(Fluentd 보단 많지 않음), 커스텀 Source 및 Sink를 지원한다.
  • 싱글 머신에서 무손실 70,000 events/sec 성능
  • 높은 신뢰성(실패시 재전송), HA 구성이 간편하다.
  • 재시작 없이 설정 변경이 된다.
  • 다양한 입/출력 포멧 지원(Json으로 변경 하지 않아도 됨)
  • 하나의 Source에서 멀티 Sink가 가능하다.

2.1 아키텍처

  • Source

    • 이벤트(로그)를 입력 받아 입력된 정보를 Sink로 전달

  • Channel

    • Source와 Sink의 의존성을 제거하고 장애에 대비하기 위하여, 중간 채널을 제공한다.

    • Source는 Channel에 이벤트 정보를 저장하고, Sink는 채널로 부터 정보를 전달 받아 처리한다.

  • Sink

    • Channel로 부터 Source가 전달한 이벤트 정보를 HDFS에 저장하거나, 다음 tier의 Agent 또는 DB로 전달한다.

    • 지정된 프로토콜 타임에 다른 처리를 진행한다.

 

Type

Comment

Source

 Avro

 Avro(하둡에서 통신용으로 사용중인) 프로토콜로 수집

 Thrift

 Thrift 프로토콜로 수집

 Syslog

 Syslog 프로토콜로 수집(Syslog TCP, Multiport Syslog TCP, Syslog UDP)

 HTTP

 HTTP 프로토콜로 수집

 JMS

 JMS 프로토콜로 수집(Pluggable converter 지원)

 NetCat

 TCP/IP 데이터 수집

 Exec

 Linux 명령어로 수집

 Spooling Directory

 폴더에 신규로 추가된 파일 수집

 Sequence Generator 0부터 1씩 증가하는 event 생성

 Legacy

 이전 버전의 Flume으로 부터 데이터 수집(Avro Legacy, Thrift legacy)

 Custom

 자체 개발 커스텀 모듈로 수집

Channel

 Memory

 Memroy 사용(데이터 유실 가능성 있음)

 JDBC

 DB 사용

 File File 사용

Sink

 Avro

 Avro 프로토콜로 전송
 Thrift Thrift 프로토콜로 전송
 IRC

 IRC로 전송

 ElasticSearch ElasticSearch에 저장

 MorphlineSolr

 Solr에 저장
 HDFS

 HDFS에 저장

 HBase

 HBase에 저장(HbaseSink, AsynkHBaseSink)

 Logger 테스트 또는 디버깅을 위해 로깅
 File Role 파일 저장
 Null 아무 일도 하지 않음
 Custom 자체 개발 커스텀 모듈로 전송

2.2. 설정

# list sources, sinks and channels in the agent
agent1.sources = tail-file
agent1.channels = c1
agent1.sinks= avro-sink
  
# define the flow
agent1.sources.tail-file.channels = c1
agent1.sinks.avro-sink.channel = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
  
# define source
agent1.sources.tail-file.type = exec
agent1.sources.tail-file.command = tail -F /var/log/test.log
agent1.sources.tail-file.batchSize = 100
agent1.sources.tail-file.batchTimeout = 300000
agent1.sources.tail-file.channels = c1
# hostname interceptor
agent1.sources.tail-file.interceptors = host-interceptor time-interceptor
agent1.sources.tail-file.interceptors.host-interceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent1.sources.tail-file.interceptors.host-interceptor.preserveExisting = true
agent1.sources.tail-file.interceptors.host-interceptor.useIP = false
# timestamp interceptor
agent1.sources.tail-file.interceptors.time-interceptor.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
  
# define sink
agent1.sinks.avro-sink.type = avro
agent1.sinks.avro-sink.hostname = 10.21.21.10
agent1.sinks.avro-sink.port = 10000

# global

agent3.sources = httpSource

agent3.channels = fileChannel fileChannel2

agent3.sinks = hdfsSink hdfsSink2

  

# sources

agent3.sources.httpSource.type = http

agent3.sources.httpSource.port = 5140

agent3.sources.httpSource.handler = org.apache.flume.source.http.JSONHandler

agent3.sources.httpSource.channels = fileChannel fileChannel2

agent3.sources.httpSource.selector.type = replicating

  

# channels

agent3.channels.fileChannel.type = file

agent3.channels.fileChannel.checkpointDir = /home/hadoop_ecosystem/data/dfs/flume/checkpoint

agent3.channels.fileChannel.dataDirs = /home/hadoop_ecosystem/data/dfs/flume/data

agent3.channels.fileChannel.capacity = 99999999

agent3.channels.fileChannel.transactionCapacity = 10000000

 

agent3.channels.fileChannel2.type = memory

agent3.channels.fileChannel2.capacity = 99999999

agent3.channels.fileChannel2.transactionCapacity = 10000000

  

# sinks

agent3.sinks.hdfsSink.type = hdfs

agent3.sinks.hdfsSink.channel = fileChannel

agent3.sinks.hdfsSink.hdfs.path = hdfs://weblog-az-001.cafe24.com:9000/user/flume/%{log_type}/tmp

agent3.sinks.hdfsSink.hdfs.fileType = DataStream

agent3.sinks.hdfsSink.hdfs.rollCount = 0

agent3.sinks.hdfsSink.hdfs.rollInterval = 150

agent3.sinks.hdfsSink.hdfs.rollSize = 0

agent3.sinks.hdfsSink.hdfs.inUsePrefix = .tmp.

agent3.sinks.hdfsSink.hdfs.filePrefix = %{host}.%Y%m%d

agent3.sinks.hdfsSink.hdfs.batchSize = 1000

agent3.sinks.hdfsSink.hdfs.callTimeout = 30000

 

agent3.sinks.hdfsSink2.type = hdfs

agent3.sinks.hdfsSink2.channel = fileChannel2

agent3.sinks.hdfsSink2.hostname = 10.21.21.10

agent3.sinks.hdfsSink2.port = 10002


...

 

#Use the AsyncHBaseSink

agnet3.sinks.hbaseSink.type = org.apache.flume.sink.hbase.AsyncHBaseSink

#Use the HBaseSink

#agnet3.sinks.hbaseSink.type = org.apache.flume.sink.hbase.HBaseSink

agnet3.sinks.hbaseSink.channel = fileChannel3

agnet3.sinks.hbaseSink.table = transactions

agnet3.sinks.hbaseSink.columnFamily = clients

agnet3.sinks.hbaseSink.column = charges

agnet3.sinks.hbaseSink.batchSize = 5000

  

#Use the SimpleAsyncHbaseEventSerializer that comes with Flume

agnet3.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

#Use the SimpleHbaseEventSerializer that comes with Flume

#agnet3.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

agnet3.sinks.hbaseSink.serializer.incrementColumn = icol

agnet3.channels.fileChannel3.type=memory


# list sources, sinks and channels in the agent

agent1.sources = tail-file

agent1.channels = c1

agent1.sinks= avro-sink1 avro-sink2 avro-sink3 avro-sink4

  

# define the flow

agent1.sources.tail-file.channels = c1

agent1.sinks.avro-sink1.channel = c1

agent1.sinks.avro-sink2.channel = c1

agent1.sinks.avro-sink3.channel = c1

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

  

# define source and sink

agent1.sources.tail-file.type = exec

agent1.sources.tail-file.command = tail -F /var/log/test.log

agent1.sources.tail-file.batchSize = 100

agent1.sources.tail-file.batchTimeout = 300000

agent1.sources.tail-file.channels = c1

# hostname interceptor

agent1.sources.tail-file.interceptors = host-interceptor time-interceptor

agent1.sources.tail-file.interceptors.host-interceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder

agent1.sources.tail-file.interceptors.host-interceptor.preserveExisting = true

agent1.sources.tail-file.interceptors.host-interceptor.useIP = false

# timestamp interceptor

agent1.sources.tail-file.interceptors.time-interceptor.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

  

# define sink & sink group

agent1.sinks.avro-sink1.type = avro

agent1.sinks.avro-sink1.hostname = 10.21.21.10

agent1.sinks.avro-sink1.port = 10000

agent1.sinks.avro-sink2.type = avro

agent1.sinks.avro-sink2.hostname = 10.21.21.11

agent1.sinks.avro-sink2.port = 10000

agent1.sinks.avro-sink3.type = avro

agent1.sinks.avro-sink3.hostname = 10.21.21.12

agent1.sinks.avro-sink3.port = 10000

agent1.sinks.avro-sink4.type = avro

agent1.sinks.avro-sink4.hostname = 10.21.21.13

agent1.sinks.avro-sink4.port = 10000

agent1.sinkgroups = g1

agent1.sinkgroups.g1.sinks = avro-sink1 avro-sink2 avro-sink3 avro-sink4

agent1.sinkgroups.g1.processor.type = failover

agent1.sinkgroups.g1.processor.priority.avro-sink1 = 5

agent1.sinkgroups.g1.processor.priority.avro-sink2 = 10

agent1.sinkgroups.g1.processor.priority.avro-sink3 = 15

agent1.sinkgroups.g1.processor.priority.avro-sink4 = 20

agent1.sinkgroups.g1.processor.maxpenalty = 10000



# list sources, sinks and channels in the agent

agent1.sources = tail-file

agent1.channels = c1

agent1.sinks= avro-sink1 avro-sink2 avro-sink3 avro-sink4

  

# define the flow

agent1.sources.tail-file.channels = c1

agent1.sinks.avro-sink1.channel = c1

agent1.sinks.avro-sink2.channel = c1

agent1.sinks.avro-sink3.channel = c1

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

  

# define source and sink

agent1.sources.tail-file.type = exec

agent1.sources.tail-file.command = tail -F /var/log/test.log

agent1.sources.tail-file.batchSize = 100

agent1.sources.tail-file.batchTimeout = 300000

agent1.sources.tail-file.channels = c1

# hostname interceptor

agent1.sources.tail-file.interceptors = host-interceptor time-interceptor

agent1.sources.tail-file.interceptors.host-interceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder

agent1.sources.tail-file.interceptors.host-interceptor.preserveExisting = true

agent1.sources.tail-file.interceptors.host-interceptor.useIP = false

# timestamp interceptor

agent1.sources.tail-file.interceptors.time-interceptor.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

  

# define sink & sink group

agent1.sinks.avro-sink1.type = avro

agent1.sinks.avro-sink1.hostname = 10.21.21.10

agent1.sinks.avro-sink1.port = 10000

agent1.sinks.avro-sink2.type = avro

agent1.sinks.avro-sink2.hostname = 10.21.21.11

agent1.sinks.avro-sink2.port = 10000

agent1.sinks.avro-sink3.type = avro

agent1.sinks.avro-sink3.hostname = 10.21.21.12

agent1.sinks.avro-sink3.port = 10000

agent1.sinks.avro-sink4.type = avro

agent1.sinks.avro-sink4.hostname = 10.21.21.13

agent1.sinks.avro-sink4.port = 10000

agent1.sinkgroups = g1

agent1.sinkgroups.g1.sinks = avro-sink1 avro-sink2 avro-sink3 avro-sink4

agent1.sinkgroups.g1.processor.type = load_balance

agent1.sinkgroups.g1.processor.backoff = true

# round_robin or random

agent1.sinkgroups.g1.processor.selector = round_robin


a1.sources.avroSrc.interceptors = search-replace

a1.sources.avroSrc.interceptors.search-replace.type = search_replace

 

# Remove leading alphanumeric characters in an event body.

a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+

a1.sources.avroSrc.interceptors.search-replace.replaceString =


a1.sources.avroSrc.interceptors = search-replace

a1.sources.avroSrc.interceptors.search-replace.type = search_replace

 

# Use grouping operators to reorder and munge words on a line.

a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)

a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1


a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)

a1.sources.r1.interceptors.i1.serializers = s1 s2 s3

a1.sources.r1.interceptors.i1.serializers.s1.name = one

a1.sources.r1.interceptors.i1.serializers.s2.name = two

a1.sources.r1.interceptors.i1.serializers.s3.name = three


a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)

a1.sources.r1.interceptors.i1.serializers = s1

a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp

a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

2.3. 제약사항

  • tail 로 수집시 데이터 누락 가능성 존재: http://h391106.tistory.com/326
  • 커스텀 모듈을 지원 하지만, JAVA로 만들거나 Thrift 를 사용해서 처리해야한다.

2.4. 참고 사이트


댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
페이지
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함