티스토리 뷰
1. 개요
- 로그 데이터를 중앙으로 전달하고 저장하는 툴 비교
2. Apache Flume
- https://flume.apache.org/
- Flume은 수로라는 뜻으로 로그를 수로를 통해서 한곳에 모은 다는 뜻으로 풀이 될 수 있다.
- Flume은 과거 Cloudera에서 제작하여, 지금은 Apache 톱레벨 프로젝트가 되었다.
- Flume OG(v0.9.x 이하)의 경우 master 노드를 통해서, 제어되는 방식이었으나, NG(v1.x 이상)로 변경되면서, Agent에서 직접 제어 방식으로 변경되었다.
- JAVA로 만들어졌고, 이벤트 방식으로 제작되었다.(로그 한 줄이 하나의 이벤트)
- tail 명령어로 데이터를 읽어올 수 있고, Syslog, Scribe 등을 통해서 로그를 가져올 수 있다.
- Thrift를 이용해서 직접 어플리케이션에서 로그 저장이 가능하다.(언어별 모듈 존재함)
- 다양한 Source와 Sink 데이터를 지원하고(Fluentd 보단 많지 않음), 커스텀 Source 및 Sink를 지원한다.
- 싱글 머신에서 무손실 70,000 events/sec 성능
- 높은 신뢰성(실패시 재전송), HA 구성이 간편하다.
- 재시작 없이 설정 변경이 된다.
- 다양한 입/출력 포멧 지원(Json으로 변경 하지 않아도 됨)
- 하나의 Source에서 멀티 Sink가 가능하다.
2.1 아키텍처
Source
이벤트(로그)를 입력 받아 입력된 정보를 Sink로 전달
Channel
Source와 Sink의 의존성을 제거하고 장애에 대비하기 위하여, 중간 채널을 제공한다.
Source는 Channel에 이벤트 정보를 저장하고, Sink는 채널로 부터 정보를 전달 받아 처리한다.
Sink
Channel로 부터 Source가 전달한 이벤트 정보를 HDFS에 저장하거나, 다음 tier의 Agent 또는 DB로 전달한다.
지정된 프로토콜 타임에 다른 처리를 진행한다.
|
Type |
Comment |
Source |
Avro |
Avro(하둡에서 통신용으로 사용중인) 프로토콜로 수집 |
Thrift |
Thrift 프로토콜로 수집 |
|
Syslog |
Syslog 프로토콜로 수집(Syslog TCP, Multiport Syslog TCP, Syslog UDP) |
|
HTTP |
HTTP 프로토콜로 수집 |
|
JMS |
JMS 프로토콜로 수집(Pluggable converter 지원) |
|
NetCat |
TCP/IP 데이터 수집 |
|
Exec |
Linux 명령어로 수집 |
|
Spooling Directory |
폴더에 신규로 추가된 파일 수집 |
|
Sequence Generator | 0부터 1씩 증가하는 event 생성 | |
Legacy |
이전 버전의 Flume으로 부터 데이터 수집(Avro Legacy, Thrift legacy) |
|
Custom | 자체 개발 커스텀 모듈로 수집 | |
Channel | Memory | Memroy 사용(데이터 유실 가능성 있음) |
JDBC | DB 사용 | |
File | File 사용 | |
Sink | Avro | Avro 프로토콜로 전송 |
Thrift | Thrift 프로토콜로 전송 | |
IRC | IRC로 전송 | |
ElasticSearch | ElasticSearch에 저장 | |
MorphlineSolr | Solr에 저장 | |
HDFS | HDFS에 저장 | |
HBase | HBase에 저장(HbaseSink, AsynkHBaseSink) | |
Logger | 테스트 또는 디버깅을 위해 로깅 | |
File Role | 파일 저장 | |
Null | 아무 일도 하지 않음 | |
Custom | 자체 개발 커스텀 모듈로 전송 |
2.2. 설정
# global
agent3.sources = httpSource
agent3.channels = fileChannel fileChannel2
agent3.sinks = hdfsSink hdfsSink2
# sources
agent3.sources.httpSource.type = http
agent3.sources.httpSource.port = 5140
agent3.sources.httpSource.handler = org.apache.flume.source.http.JSONHandler
agent3.sources.httpSource.channels = fileChannel fileChannel2
agent3.sources.httpSource.selector.type = replicating
# channels
agent3.channels.fileChannel.type = file
agent3.channels.fileChannel.checkpointDir = /home/hadoop_ecosystem/data/dfs/flume/checkpoint
agent3.channels.fileChannel.dataDirs = /home/hadoop_ecosystem/data/dfs/flume/data
agent3.channels.fileChannel.capacity = 99999999
agent3.channels.fileChannel.transactionCapacity = 10000000
agent3.channels.fileChannel2.type = memory
agent3.channels.fileChannel2.capacity = 99999999
agent3.channels.fileChannel2.transactionCapacity = 10000000
# sinks
agent3.sinks.hdfsSink.type = hdfs
agent3.sinks.hdfsSink.channel = fileChannel
agent3.sinks.hdfsSink.hdfs.path = hdfs://weblog-az-001.cafe24.com:9000/user/flume/%{log_type}/tmp
agent3.sinks.hdfsSink.hdfs.fileType = DataStream
agent3.sinks.hdfsSink.hdfs.rollCount = 0
agent3.sinks.hdfsSink.hdfs.rollInterval = 150
agent3.sinks.hdfsSink.hdfs.rollSize = 0
agent3.sinks.hdfsSink.hdfs.inUsePrefix = .tmp.
agent3.sinks.hdfsSink.hdfs.filePrefix = %{host}.%Y%m%d
agent3.sinks.hdfsSink.hdfs.batchSize = 1000
agent3.sinks.hdfsSink.hdfs.callTimeout = 30000
agent3.sinks.hdfsSink2.type = hdfs
agent3.sinks.hdfsSink2.channel = fileChannel2
agent3.sinks.hdfsSink2.hostname = 10.21.21.10
agent3.sinks.hdfsSink2.port = 10002
...
#Use the AsyncHBaseSink
agnet3.sinks.hbaseSink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
#Use the HBaseSink
#agnet3.sinks.hbaseSink.type = org.apache.flume.sink.hbase.HBaseSink
agnet3.sinks.hbaseSink.channel = fileChannel3
agnet3.sinks.hbaseSink.table = transactions
agnet3.sinks.hbaseSink.columnFamily = clients
agnet3.sinks.hbaseSink.column = charges
agnet3.sinks.hbaseSink.batchSize = 5000
#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
agnet3.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
#Use the SimpleHbaseEventSerializer that comes with Flume
#agnet3.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
agnet3.sinks.hbaseSink.serializer.incrementColumn = icol
agnet3.channels.fileChannel3.type=memory
# list sources, sinks and channels in the agent
agent1.sources = tail-file
agent1.channels = c1
agent1.sinks= avro-sink1 avro-sink2 avro-sink3 avro-sink4
# define the flow
agent1.sources.tail-file.channels = c1
agent1.sinks.avro-sink1.channel = c1
agent1.sinks.avro-sink2.channel = c1
agent1.sinks.avro-sink3.channel = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
# define source and sink
agent1.sources.tail-file.type = exec
agent1.sources.tail-file.command = tail -F /var/log/test.log
agent1.sources.tail-file.batchSize = 100
agent1.sources.tail-file.batchTimeout = 300000
agent1.sources.tail-file.channels = c1
# hostname interceptor
agent1.sources.tail-file.interceptors = host-interceptor time-interceptor
agent1.sources.tail-file.interceptors.host-interceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent1.sources.tail-file.interceptors.host-interceptor.preserveExisting = true
agent1.sources.tail-file.interceptors.host-interceptor.useIP = false
# timestamp interceptor
agent1.sources.tail-file.interceptors.time-interceptor.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
# define sink & sink group
agent1.sinks.avro-sink1.type = avro
agent1.sinks.avro-sink1.hostname = 10.21.21.10
agent1.sinks.avro-sink1.port = 10000
agent1.sinks.avro-sink2.type = avro
agent1.sinks.avro-sink2.hostname = 10.21.21.11
agent1.sinks.avro-sink2.port = 10000
agent1.sinks.avro-sink3.type = avro
agent1.sinks.avro-sink3.hostname = 10.21.21.12
agent1.sinks.avro-sink3.port = 10000
agent1.sinks.avro-sink4.type = avro
agent1.sinks.avro-sink4.hostname = 10.21.21.13
agent1.sinks.avro-sink4.port = 10000
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = avro-sink1 avro-sink2 avro-sink3 avro-sink4
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.avro-sink1 = 5
agent1.sinkgroups.g1.processor.priority.avro-sink2 = 10
agent1.sinkgroups.g1.processor.priority.avro-sink3 = 15
agent1.sinkgroups.g1.processor.priority.avro-sink4 = 20
agent1.sinkgroups.g1.processor.maxpenalty = 10000
# list sources, sinks and channels in the agent
agent1.sources = tail-file
agent1.channels = c1
agent1.sinks= avro-sink1 avro-sink2 avro-sink3 avro-sink4
# define the flow
agent1.sources.tail-file.channels = c1
agent1.sinks.avro-sink1.channel = c1
agent1.sinks.avro-sink2.channel = c1
agent1.sinks.avro-sink3.channel = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
# define source and sink
agent1.sources.tail-file.type = exec
agent1.sources.tail-file.command = tail -F /var/log/test.log
agent1.sources.tail-file.batchSize = 100
agent1.sources.tail-file.batchTimeout = 300000
agent1.sources.tail-file.channels = c1
# hostname interceptor
agent1.sources.tail-file.interceptors = host-interceptor time-interceptor
agent1.sources.tail-file.interceptors.host-interceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent1.sources.tail-file.interceptors.host-interceptor.preserveExisting = true
agent1.sources.tail-file.interceptors.host-interceptor.useIP = false
# timestamp interceptor
agent1.sources.tail-file.interceptors.time-interceptor.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
# define sink & sink group
agent1.sinks.avro-sink1.type = avro
agent1.sinks.avro-sink1.hostname = 10.21.21.10
agent1.sinks.avro-sink1.port = 10000
agent1.sinks.avro-sink2.type = avro
agent1.sinks.avro-sink2.hostname = 10.21.21.11
agent1.sinks.avro-sink2.port = 10000
agent1.sinks.avro-sink3.type = avro
agent1.sinks.avro-sink3.hostname = 10.21.21.12
agent1.sinks.avro-sink3.port = 10000
agent1.sinks.avro-sink4.type = avro
agent1.sinks.avro-sink4.hostname = 10.21.21.13
agent1.sinks.avro-sink4.port = 10000
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = avro-sink1 avro-sink2 avro-sink3 avro-sink4
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
# round_robin or random
agent1.sinkgroups.g1.processor.selector = round_robin
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
2.3. 제약사항
- tail 로 수집시 데이터 누락 가능성 존재: http://h391106.tistory.com/326
- 커스텀 모듈을 지원 하지만, JAVA로 만들거나 Thrift 를 사용해서 처리해야한다.
2.4. 참고 사이트
- 커스텀 : https://questforthought.wordpress.com/2014/01/13/using-flume-interceptor-multiplexing/
- https://gigsda.wordpress.com/2013/04/07/flume-1-3-0-user-guide-%EB%B0%9C%EB%B2%88%EC%97%AD/
- https://flume.apache.org/FlumeUserGuide.html
- https://flume.apache.org/FlumeDeveloperGuide.html
- https://cwiki.apache.org/confluence/display/FLUME/Flume+NG+Syslog+Performance+Test+2012-04-30
- https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase
'빅데이터' 카테고리의 다른 글
대용량 로그 분석 시스템 (0) | 2016.09.12 |
---|---|
로깅을 위한 엘라스틱서치 설정(ElasticSearch for Logging) (0) | 2016.09.02 |
Apache Storm 성능 설정 (0) | 2016.08.09 |
CDH5.8 업데이트 후 Zeppelin 에러 (0) | 2016.08.03 |
Ceph 오브젝트 스토리지 (0) | 2016.07.26 |
- Total
- Today
- Yesterday
- apache
- Ansible
- configuration
- 예제
- limits
- 이슈처리
- Windows
- mysql
- check
- MariaDB
- code
- File
- client
- deview
- Module
- example
- Web
- engineering
- command
- monitoring
- 코드
- 번역
- error
- PowerShell
- 명령어
- httpd
- RESTful
- Linux
- 외부링크
- Python
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |