빅데이터
Zeppline을 활용한 httpd 엑세스 로그 분석
warpmemory
2016. 10. 27. 14:57
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 %pysparkimport refrom pyspark.sql import Rowfrom pyspark.sql.types import *APACHE_ACCESS_LOG_PATTERN = '(\S+) (\S+) (\S+) \[([^\]]*)\] \"([^\"]*)\"(?: (\S+) (\S+) ([^ \"]*|\"[^\"]*\") (\".*\")?)? (\S+) (\S+)'def parse_apache_log_line(logline):match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)if match is None:raise Error("Invalid logline: %s" % (logline))return Row(host = match.group(1),identity = match.group(2),user = match.group(3),time = match.group(4),request = match.group(5),status = match.group(6),size = match.group(7),referer = match.group(8),agent = match.group(9),ptime = match.group(10),pid = match.group(11))access_logs = (sc.textFile("hdfs:///access/file.log").map(parse_apache_log_line).cache())print access_logs.count()access_logs_data_frame = sqlContext.createDataFrame(access_logs)access_logs_data_frame.registerTempTable("access_logs")sizes = (access_logs.filter(lambda row: row.size != "-").map(lambda row: int(row.size)).cache())average_size = sizes.reduce(lambda x, y: x + y) / sizes.count()print average_sizeresponse_code_to_count_pair_rdd = (access_logs.map(lambda row: (row.status, 1)).reduceByKey(lambda x, y: x + y))response_code_to_count_array = response_code_to_count_pair_rdd.take(100)print response_code_to_count_arrayresponse_code_to_count_row_rdd = response_code_to_count_pair_rdd.map(lambda (x, y): Row(status=x, cnt=y))response_code_to_count_data_frame = sqlContext.createDataFrame(response_code_to_count_row_rdd)response_code_to_count_data_frame.registerTempTable("response_status_counts")#fields = [StructField('status', StringType(), True), StructField('count', IntegerType(), True)]#structure = StructType(fields)#response_status_counts = sqlContext.createDataFrame(response_code_to_count_row_rdd, structure)#response_status_counts.registerTempTable("response_status_counts")host_to_count_pair_rdd = (access_logs.map(lambda row: (row.host, 1)).reduceByKey(lambda x, y: x + y))host_to_count_array = host_to_count_pair_rdd.take(100)print host_to_count_arrayhost_to_count_row_rdd = host_to_count_pair_rdd.map(lambda (x, y): Row(host=x, cnt=y))host_to_count_data_frame = sqlContext.createDataFrame(host_to_count_row_rdd)host_to_count_data_frame.registerTempTable("host_counts")cs