在这里详述 linux/hadoop python


python连接hadoop方式 1. Hadoop流 1. mrjob 1. dumbo 1. hadoopy 1. pydoop 1. 其它

=== hadoop流:===

在http://books.google.com/ngrams取得数据, 在hdfs上建立/ngrams 目录,CSV上传到/ngrams目录。

mapper.py {{{#!highlight python

! /usr/bin/env python

import os import re import sys

determine value of n in the current block of ngrams by parsing the filename

input_file = os.environ['map_input_file'] expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])

for line in sys.stdin: data = line.split('\t')

# perform some error checking
if len(data) < 3:

# unpack data
ngram = data[0].split()
year = data[1]
count = data[2]

# more error checking
if len(ngram) != expected_tokens:

# build key and emit
pair = sorted([ngram[0], ngram[expected_tokens - 1]])
print >>sys.stdout, "%s\t%s\t%s\t%s" % (pair[0], pair[1], year, count)


reducer.py {{{#!highlight python

! /usr/bin/env python

import sys

total = 0 prev_key = False for line in sys.stdin: data = line.split('\t') curr_key = '\t'.join(data[:3]) count = int(data[3])

# found a boundary; emit current sum
if prev_key and curr_key != prev_key:
    print >>sys.stdout, "%s\t%i" % (prev_key, total)
    prev_key = curr_key
    total = count
# same key; accumulate sum
    prev_key = curr_key
    total += count

emit last key

if prev_key: print >>sys.stdout, "%s\t%i" % (prev_key, total) }}}

exec: {{{ ../hadoop64/bin/hadoop jar ../hadoop64/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /ngrams -output /output-streaming -mapper mapper.py -combiner reducer.py -reducer reducer.py -jobconf stream.num.map.output.key.fields=3 -jobconf stream.num.reduce.output.key.fields=3 -jobconf mapred.reduce.tasks=10 -file mapper.py -file reducer.py }}}

