티스토리 뷰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
%pyspark
 
import re
from pyspark.sql import Row
from pyspark.sql.types import *
 
APACHE_ACCESS_LOG_PATTERN = '(\S+) (\S+) (\S+) \[([^\]]*)\] \"([^\"]*)\"(?: (\S+) (\S+) ([^ \"]*|\"[^\"]*\") (\".*\")?)? (\S+) (\S+)'
 
def parse_apache_log_line(logline):
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        raise Error("Invalid logline: %s" % (logline))
    return Row(
        host = match.group(1),
        identity = match.group(2),
        user = match.group(3),
        time = match.group(4),
        request = match.group(5),
        status = match.group(6),
        size = match.group(7),
        referer = match.group(8),
        agent = match.group(9),
        ptime = match.group(10),
        pid = match.group(11)
    )
 
access_logs = (
    sc.textFile("hdfs:///access/file.log")
    .map(parse_apache_log_line)
    .cache()
)
 
print access_logs.count()
 
access_logs_data_frame = sqlContext.createDataFrame(access_logs)
access_logs_data_frame.registerTempTable("access_logs")
 
sizes = (
    access_logs
    .filter(lambda row: row.size != "-")
    .map(lambda row: int(row.size))
    .cache()
)
 
average_size = sizes.reduce(lambda x, y: x + y) / sizes.count()
print average_size
 
response_code_to_count_pair_rdd = (
    access_logs
    .map(lambda row: (row.status, 1))
    .reduceByKey(lambda x, y: x + y)
)
 
response_code_to_count_array = response_code_to_count_pair_rdd.take(100)
print response_code_to_count_array
 
response_code_to_count_row_rdd = response_code_to_count_pair_rdd.map(lambda (x, y): Row(status=x, cnt=y))
response_code_to_count_data_frame = sqlContext.createDataFrame(response_code_to_count_row_rdd)
response_code_to_count_data_frame.registerTempTable("response_status_counts")
 
#fields = [StructField('status', StringType(), True), StructField('count', IntegerType(), True)]
#structure = StructType(fields)
#response_status_counts = sqlContext.createDataFrame(response_code_to_count_row_rdd, structure)
#response_status_counts.registerTempTable("response_status_counts")
 
host_to_count_pair_rdd = (
    access_logs
    .map(lambda row: (row.host, 1))
    .reduceByKey(lambda x, y: x + y)
)
 
host_to_count_array = host_to_count_pair_rdd.take(100)
print host_to_count_array
 
host_to_count_row_rdd = host_to_count_pair_rdd.map(lambda (x, y): Row(host=x, cnt=y))
host_to_count_data_frame = sqlContext.createDataFrame(host_to_count_row_rdd)
host_to_count_data_frame.registerTempTable("host_counts")
cs


댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
페이지
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함