Eşle-İndirge Mimarisi (Map/Reduce -MR- Architecture)
MR bir veri analiz sürecini iki evreye ayırır. Bunlardan birincisi eşleme evresidir, ki bu evrede eline geçen veri parçasını alan her analiz servisi (bunlardan pek çok var, farklı makinalarda), süreçler kendilerine verilen dosyaya bakarak anahtar-değer ikilileri üretirler. Kelime sayma örneği için bu anahtarlar kelimeler, sayılar ise hep 1. Mesela "A A B" için tek bir eşleyici A: 1, A: 1, B: 1 üretir.
Diğer evre, indirgeme sırasında her servisten alınan anahtarlardaki değerler "birleştirilir" yani daha aza "indirgenir". Fakat dikkat: birleştirme denince tek bir nihai servisin her şeyi tek başına indirgediği zannedilmesin; bu evre de birden fazla makinada işleyebilir.
Anahtarlar birleştirme "birimini" oluştururlar [1]. İndirgeme işlemi ise iki, üç hatta birden fazla girdiyi işleyebilecek türden bir kod olmalıdır. Fakat zaten kelime sayma problemin özünde zaten bu vardır, herşey ayrı ayrı, parça parça toplanabilir, bir İndirgeyici iki eşlenmiş dosya alıyor diyelim, biri A: 1, A: 1, B: 1, diğeri A: 1, B:1; bunları A: 3, B: 2 olarak indirgeyebilir / birleştirebiliriz. Diğer makinalarda başkaları başka veriler almıştır, onlar kendi çaplarında kendi birleştirimlerini yapmaktadırlar. İndirgeme ve yük dağıtım birimi anahtardır, indirgeme safhasında mesela A anahtarı tek bir makinaya gönderilecektir, o anahtarın birleşiminden o makina sorumlu olacaktır.
İşin mekaniğine bakınca oldukça basit gibi gelebilir. Fakat şaşırtıcı kadar çok analiz işlemi üstteki şekilde temsil edilebilmektedir ve sonuç olarak paralelize edilebilmektedir. Tabii işin mekanığı derken sadece anahtarlara ayırmaktan bahsetmiyoruz, analiz işleminin, her ne ise, "azar azar, ayrı ayrı üst üste koya koya sonuca ilerleyebilecek" şekilde tekrar tasarlanması gerekiyor. Kelime toplama, hatta toplamanın niye MR ile doğal bir uyumunun olduğunu görüyoruz herhalde, 2,3,4,5 eklerlerken 2+3=5,4+5=9 ardından 5+9=14 elde edebiliriz, ama diğer bir şekilde 3+4=7,2+5=7 arkasından tekrar 7+7=14 yani aynı sonucu elde edebiliyoruz. Toplama işleminin ruhunda "sıradan bağımsız olmak" var, ve bu bağımsızlık paralelize etmekte işimize yarıyor.
Hadoop ile Patent Verisi İşlemek
75-99 yılları arasında hangi patentin hangi hangi patentlere referans verdiği ve patentler hakkında detaylı verileri Hadoop ile işleyeceğiz. Veriler alttaki bağlantıdan alınabilir, gerekli dosyalar cite7599.txt ve apat6399.txt (baglantilar altta) Bu dosyaları açıp biz diyelim ki /data altına koyduk.
Referans verisine bakarsak,
head -10 /data/cite75_99.txt
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
3858241,3634889
3858242,1515701
3858242,3319261
3858242,3668705
3858242,3707004
Bu veri, hangi patentin hangi diğer patenti kullandığını "tek" patent bazında göstermekte. Detaylı patent verisine bakalım
head -10 /data/apat63_99.txt
"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,
3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,
3070806,1963,1096,,"US","PA",,1,,2,6,63,,0,,,,,,,,,
3070807,1963,1096,,"US","OH",,1,,623,3,39,,3,,0.4444,,,,,,,
3070808,1963,1096,,"US","IA",,1,,623,3,39,,4,,0.375,,,,,,,
3070809,1963,1096,,"US","AZ",,1,,4,6,65,,0,,,,,,,,,
Şimdi patent detay verisinden bir örneklem (sample) alalım. Daha ufak bir veri kümesiyle çalışmak ilk başta faydalı olabilir, geliştirme test etme sürecini hızlandırır.
chmod a+r /data/apat63_99.txt
head -1 /data/apat63_99.txt > /data/apat63_99_sampled.txt
cat /data/apat63_99.txt | perl -n -e 'print if (rand() < .05)' >> /data/apat63_99_sampled.txt
Hadoop başlatalım
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/stop-all.sh
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/start-all.sh
no jobtracker to stop
localhost: no tasktracker to stop
no namenode to stop
localhost: no datanode to stop
localhost: no secondarynamenode to stop
starting namenode, logging to /home/hduser/Downloads/hadoop-1.0.4/libexec/../logs/hadoop-hduser-namenode-burak-Aspire-S3.out
localhost: starting datanode, logging to /home/hduser/Downloads/hadoop-1.0.4/libexec/../logs/hadoop-hduser-datanode-burak-Aspire-S3.out
localhost: starting secondarynamenode, logging to /home/hduser/Downloads/hadoop-1.0.4/libexec/../logs/hadoop-hduser-secondarynamenode-burak-Aspire-S3.out
starting jobtracker, logging to /home/hduser/Downloads/hadoop-1.0.4/libexec/../logs/hadoop-hduser-jobtracker-burak-Aspire-S3.out
localhost: starting tasktracker, logging to /home/hduser/Downloads/hadoop-1.0.4/libexec/../logs/hadoop-hduser-tasktracker-burak-Aspire-S3.out
/home/hduser/Downloads/hadoop*/bin/hadoop dfs -mkdir /user/hduser/patent
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
dfs -ls /user/hduser/patent
Found 2 items
-rw-r--r-- 1 hduser supergroup ... /user/hduser/patent/apat63_99.txt
-rw-r--r-- 1 hduser supergroup ... /user/hduser/patent/apat63_99_sampled.txt
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
dfs -copyFromLocal /data/apat63_99_sampled.txt \
/user/hduser/patent/apat63_99_sampled.txt
copyFromLocal: Target /user/hduser/patent/apat63_99_sampled.txt already exists
Amacımız patent verisindeki ülke (country) kodunu kullanarak her ülke basına ortalama ne kadar patent üretildiğini hesaplamak. Eşleme-İndirgeme (Map-Reduce) döngüsünde eşleme kısmını yapacak program aşağıda.
#!/usr/bin/python
import os,sys
os.environ['MPLCONFIGDIR']='/tmp'
import pandas as pd
data = pd.read_csv(sys.stdin,sep=",",index_col=0,usecols=[0,4,8])
df = data[pd.notnull(data.ix[:,0]) \& pd.notnull(data.ix[:,1])].ix[:,0:2]
df.to_csv(sys.stdout,sep="\t",index=False,header=False)
İndirgeyici yazmadan önce programımızı iki şekilde test edelim. Bu şekillerden birisi hiç indirgeyici olmadan, ikincisi IdentityReducer denen kendisine geçilen veriyi olduğu gibi dışarı atan (ama yine de ortada bir indirgeyici olduğu için sonradan bazı işlemlerin yine de yapıldığı) şeklinde. Bu iki kullanım Hadoop kodlarında hata bulma / temizleme için faydalı olabiliyor.
cp mapper.py /tmp/
chmod a+r /tmp/mapper.py
chmod a+x /tmp/mapper.py
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
dfs -rmr /user/hduser/output
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
jar \$HOME/Downloads/hadoop*/contrib/streaming/hadoop-*streaming*.jar\
-input patent/apat63_99_sampled.txt -output output \
-mapper /tmp/mapper.py -numReduceTasks 0
Deleted hdfs://localhost:54310/user/hduser/output
packageJobJar: [/app/hadoop/tmp/hadoop-unjar2555196345671652661/] [] /tmp/streamjob5013687273729997973.jar tmpDir=null
13/02/24 16:30:26 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/02/24 16:30:26 WARN snappy.LoadSnappy: Snappy native library not loaded
13/02/24 16:30:26 INFO mapred.FileInputFormat: Total input paths to process : 1
13/02/24 16:30:27 INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
13/02/24 16:30:27 INFO streaming.StreamJob: Running job: job_201302241611_0012
13/02/24 16:30:27 INFO streaming.StreamJob: To kill this job, run:
13/02/24 16:30:27 INFO streaming.StreamJob: /home/hduser/Downloads/hadoop-1.0.4/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201302241611_0012
13/02/24 16:30:27 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201302241611_0012
13/02/24 16:30:28 INFO streaming.StreamJob: map 0% reduce 0%
13/02/24 16:30:43 INFO streaming.StreamJob: map 100% reduce 0%
13/02/24 16:30:49 INFO streaming.StreamJob: map 100% reduce 100%
13/02/24 16:30:49 INFO streaming.StreamJob: Job complete: job_201302241611_0012
13/02/24 16:30:49 INFO streaming.StreamJob: Output: output
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop dfs -copyToLocal output /tmp/
head -20 /tmp/output/part-00000
AD 14.0
AE 15.4
AG 13.25
AI 10.0
AM 18.0
AN 9.625
AR 9.188990825688073
AT 10.683988393563704
AU 12.291563832174107
AW 15.5
AZ 11.0
BB 11.0
BE 11.945544554455445
BG 4.9899497487437188
BH 6.5
BM 10.076923076923077
BN 9.0
BO 11.75
BR 9.358426966292134
BS 15.778846153846153
Üstteki sonuçta görüyoruz ki anahtarlar üretilmiş, ama çıktılar anahtara göre sıralanmamışlar. Hatta üstteki sıra girdi sırasıyla tıpatıp aynı. Şimdi IdentityReducer üzerinden.
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
dfs -rmr /user/hduser/output
!ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
jar /home/hduser/Downloads/hadoop*/contrib/streaming/hadoop-*streaming*.jar \
-input patent/apat63_99_sampled.txt -output output \
-mapper /tmp/mapper.py -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-numReduceTasks 1
Deleted hdfs://localhost:54310/user/hduser/output
packageJobJar: [/app/hadoop/tmp/hadoop-unjar2314287838929839696/] [] /tmp/streamjob5231815242060775825.jar tmpDir=null
13/02/24 18:03:14 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/02/24 18:03:14 WARN snappy.LoadSnappy: Snappy native library not loaded
13/02/24 18:03:14 INFO mapred.FileInputFormat: Total input paths to process : 1
13/02/24 18:03:14 INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
13/02/24 18:03:14 INFO streaming.StreamJob: Running job: job_201302241759_0004
13/02/24 18:03:14 INFO streaming.StreamJob: To kill this job, run:
13/02/24 18:03:14 INFO streaming.StreamJob: /home/hduser/Downloads/hadoop-1.0.4/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201302241759_0004
13/02/24 18:03:14 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201302241759_0004
13/02/24 18:03:15 INFO streaming.StreamJob: map 0% reduce 0%
13/02/24 18:03:28 INFO streaming.StreamJob: map 50% reduce 0%
13/02/24 18:03:31 INFO streaming.StreamJob: map 100% reduce 0%
13/02/24 18:03:40 INFO streaming.StreamJob: map 100% reduce 100%
13/02/24 18:03:46 INFO streaming.StreamJob: Job complete: job_201302241759_0004
13/02/24 18:03:46 INFO streaming.StreamJob: Output: output
ssh localhost -l hduser rm -rf /tmp/output
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
dfs -copyToLocal output /tmp/
head -10 /tmp/output/part-00000
FR 12.0
US 5.0
US 1.0
US 4.0
US 4.0
US 21.0
US 4.0
US 8.0
US 7.0
US 11.0
Üstteki sonuçta anahtarların sıralanmış olduğunu görüyoruz.
Şimdi bir indirgeyici (reducer) ekleyelim. Bu indirgeyici ülke bazındaki veriler üzerinen ortalama alacak.
#!/usr/bin/python
import os,sys
os.environ['MPLCONFIGDIR']='/tmp'
import pandas as pd
data = pd.read_csv(sys.stdin,sep="\t",names=['country','count'])
grouped = data.groupby('country').mean()
grouped.to_csv(sys.stdout,sep="\t",header=False)
Eğer indirgeyiciyi direk işletirsek (Hadoop dışından)
cat /tmp/output/part-00000 | ./reducer.py | tail -10
SE 9.0021739130434781
SG 14.0
SU 6.4136125654450264
SV 6.5
TR 8.0
TW 6.2037037037037033
US 10.964136780650541
VE 9.3333333333333339
YU 5.75
ZA 11.170212765957446
Bu kodu hduser'in bulabileceği bir yere koyuyoruz.
cp reducer.py /tmp/
chmod a+r /tmp/reducer.py
chmod a+x /tmp/reducer.py
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
dfs -rmr /user/hduser/output
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
jar \$HOME/Downloads/hadoop*/contrib/streaming/hadoop-*streaming*.jar\
-input patent/apat63_99_sampled.txt -output output \
-mapper /tmp/mapper.py -reducer /tmp/reducer.py -numReduceTasks 1
Deleted hdfs://localhost:54310/user/hduser/output
packageJobJar: [/app/hadoop/tmp/hadoop-unjar3358838375062006941/] [] /tmp/streamjob3628714134396896316.jar tmpDir=null
13/02/24 20:33:10 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/02/24 20:33:10 WARN snappy.LoadSnappy: Snappy native library not loaded
13/02/24 20:33:10 INFO mapred.FileInputFormat: Total input paths to process : 1
13/02/24 20:33:10 INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
13/02/24 20:33:10 INFO streaming.StreamJob: Running job: job_201302241759_0005
13/02/24 20:33:10 INFO streaming.StreamJob: To kill this job, run:
13/02/24 20:33:10 INFO streaming.StreamJob: /home/hduser/Downloads/hadoop-1.0.4/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201302241759_0005
13/02/24 20:33:10 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201302241759_0005
13/02/24 20:33:11 INFO streaming.StreamJob: map 0% reduce 0%
13/02/24 20:33:23 INFO streaming.StreamJob: map 50% reduce 0%
13/02/24 20:33:26 INFO streaming.StreamJob: map 100% reduce 0%
13/02/24 20:33:35 INFO streaming.StreamJob: map 100% reduce 100%
13/02/24 20:33:42 INFO streaming.StreamJob: Job complete: job_201302241759_0005
13/02/24 20:33:42 INFO streaming.StreamJob: Output: output
ssh localhost -l hduser rm -rf /tmp/output
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
dfs -copyToLocal output /tmp/
Ve sonuç altta olduğu gibi
head -10 /tmp/output/part-00000
AE 12.0
AG 24.0
AN 15.0
AR 10.359999999999999
AT 11.077306733167083
AU 12.789029535864978
BE 11.442748091603054
BG 4.5
BM 8.0
BO 25.0
Tabii bu sonuçlar bir örneklem üzerinden alındı. Tüm veriyi işlemek için
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop dfs \
-copyFromLocal /data/apat63_99.txt /user/hduser/patent/apat63_99.txt
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop \
dfs -rmr /user/hduser/output
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop jar \
/home/hduser/Downloads/hadoop*/contrib/streaming/hadoop-*streaming*.jar \
-input patent/apat63_99.txt -output output -mapper /tmp/mapper.py \
-reducer /tmp/reducer.py -numReduceTasks 1
Deleted hdfs://localhost:54310/user/hduser/output
packageJobJar: [/app/hadoop/tmp/hadoop-unjar938213738183646678/] [] /tmp/streamjob7433279407208888397.jar tmpDir=null
13/02/24 23:50:12 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/02/24 23:50:12 WARN snappy.LoadSnappy: Snappy native library not loaded
13/02/24 23:50:12 INFO mapred.FileInputFormat: Total input paths to process : 1
13/02/24 23:50:12 INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
13/02/24 23:50:12 INFO streaming.StreamJob: Running job: job_201302241759_0006
13/02/24 23:50:12 INFO streaming.StreamJob: To kill this job, run:
13/02/24 23:50:12 INFO streaming.StreamJob: /home/hduser/Downloads/hadoop-1.0.4/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201302241759_0006
13/02/24 23:50:12 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201302241759_0006
13/02/24 23:50:13 INFO streaming.StreamJob: map 0% reduce 0%
13/02/24 23:50:28 INFO streaming.StreamJob: map 50% reduce 0%
13/02/24 23:50:40 INFO streaming.StreamJob: map 75% reduce 8%
13/02/24 23:50:52 INFO streaming.StreamJob: map 100% reduce 8%
13/02/24 23:50:55 INFO streaming.StreamJob: map 100% reduce 25%
13/02/24 23:51:04 INFO streaming.StreamJob: map 100% reduce 100%
13/02/24 23:51:17 INFO streaming.StreamJob: Job complete: job_201302241759_0006
13/02/24 23:51:17 INFO streaming.StreamJob: Output: output
ssh localhost -l hduser rm -rf /tmp/output
ssh localhost -l hduser /home/hduser/Downloads/hadoop*/bin/hadoop dfs \
-copyToLocal output /tmp/
head -7 /tmp/output/part-00000
AD 14.0
AE 15.4
AG 13.25
AI 10.0
AM 18.0
AN 9.625
AR 9.188990825688073
www.nber.org/patents/
Referans, Kaynaklar
Hadoop - Ilk Ornek (Python), Esleme-Indirgeme
Yukarı