티스토리 뷰
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 %pysparkimport refrom pyspark.sql import Rowfrom pyspark.sql.types import *APACHE_ACCESS_LOG_PATTERN = '(\S+) (\S+) (\S+) \[([^\]]*)\] \"([^\"]*)\"(?: (\S+) (\S+) ([^ \"]*|\"[^\"]*\") (\".*\")?)? (\S+) (\S+)'def parse_apache_log_line(logline):match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)if match is None:raise Error("Invalid logline: %s" % (logline))return Row(host = match.group(1),identity = match.group(2),user = match.group(3),time = match.group(4),request = match.group(5),status = match.group(6),size = match.group(7),referer = match.group(8),agent = match.group(9),ptime = match.group(10),pid = match.group(11))access_logs = (sc.textFile("hdfs:///access/file.log").map(parse_apache_log_line).cache())print access_logs.count()access_logs_data_frame = sqlContext.createDataFrame(access_logs)access_logs_data_frame.registerTempTable("access_logs")sizes = (access_logs.filter(lambda row: row.size != "-").map(lambda row: int(row.size)).cache())average_size = sizes.reduce(lambda x, y: x + y) / sizes.count()print average_sizeresponse_code_to_count_pair_rdd = (access_logs.map(lambda row: (row.status, 1)).reduceByKey(lambda x, y: x + y))response_code_to_count_array = response_code_to_count_pair_rdd.take(100)print response_code_to_count_arrayresponse_code_to_count_row_rdd = response_code_to_count_pair_rdd.map(lambda (x, y): Row(status=x, cnt=y))response_code_to_count_data_frame = sqlContext.createDataFrame(response_code_to_count_row_rdd)response_code_to_count_data_frame.registerTempTable("response_status_counts")#fields = [StructField('status', StringType(), True), StructField('count', IntegerType(), True)]#structure = StructType(fields)#response_status_counts = sqlContext.createDataFrame(response_code_to_count_row_rdd, structure)#response_status_counts.registerTempTable("response_status_counts")host_to_count_pair_rdd = (access_logs.map(lambda row: (row.host, 1)).reduceByKey(lambda x, y: x + y))host_to_count_array = host_to_count_pair_rdd.take(100)print host_to_count_arrayhost_to_count_row_rdd = host_to_count_pair_rdd.map(lambda (x, y): Row(host=x, cnt=y))host_to_count_data_frame = sqlContext.createDataFrame(host_to_count_row_rdd)host_to_count_data_frame.registerTempTable("host_counts")cs
'빅데이터' 카테고리의 다른 글
Elasticsearch 모든 노드 빠른 재시작 (0) | 2020.03.07 |
---|---|
카프카 설치 시 가장 중요한 설정 4가지 (0) | 2020.02.23 |
대용량 로그 분석 시스템 (0) | 2016.09.12 |
로깅을 위한 엘라스틱서치 설정(ElasticSearch for Logging) (0) | 2016.09.02 |
Apache Storm 성능 설정 (0) | 2016.08.09 |
댓글
warpmemory
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
페이지
TAG
- mysql
- command
- Module
- Python
- 외부링크
- configuration
- example
- Ansible
- limits
- monitoring
- httpd
- Web
- engineering
- 번역
- 명령어
- client
- Windows
- MariaDB
- Linux
- apache
- PowerShell
- RESTful
- code
- 예제
- 이슈처리
- error
- deview
- check
- 코드
- File
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
글 보관함