http://www.oschina.net/translate/a-guide-to-python-frameworks-for-hadoop
python连接hadoop方式 1. Hadoop流 1. mrjob 1. dumbo 1. hadoopy 1. pydoop 1. 其它
=== hadoop流:===
在http://books.google.com/ngrams取得数据, 在hdfs上建立/ngrams 目录,CSV上传到/ngrams目录。
mapper.py {{{#!highlight python
! /usr/bin/env python
import os import re import sys
determine value of n in the current block of ngrams by parsing the filename
input_file = os.environ['map_input_file'] expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0])
for line in sys.stdin: data = line.split('\t')
# perform some error checking
if len(data) < 3:
continue
# unpack data
ngram = data[0].split()
year = data[1]
count = data[2]
# more error checking
if len(ngram) != expected_tokens:
continue
# build key and emit
pair = sorted([ngram[0], ngram[expected_tokens - 1]])
print >>sys.stdout, "%s\t%s\t%s\t%s" % (pair[0], pair[1], year, count)
}}}
reducer.py {{{#!highlight python
! /usr/bin/env python
import sys
total = 0 prev_key = False for line in sys.stdin: data = line.split('\t') curr_key = '\t'.join(data[:3]) count = int(data[3])
# found a boundary; emit current sum
if prev_key and curr_key != prev_key:
print >>sys.stdout, "%s\t%i" % (prev_key, total)
prev_key = curr_key
total = count
# same key; accumulate sum
else:
prev_key = curr_key
total += count
emit last key
if prev_key: print >>sys.stdout, "%s\t%i" % (prev_key, total) }}}
exec: {{{ ../hadoop64/bin/hadoop jar ../hadoop64/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /ngrams -output /output-streaming -mapper mapper.py -combiner reducer.py -reducer reducer.py -jobconf stream.num.map.output.key.fields=3 -jobconf stream.num.reduce.output.key.fields=3 -jobconf mapred.reduce.tasks=10 -file mapper.py -file reducer.py }}}
Comments !