Hadoop öğrenme (4) - mapreduce için bazı hususlar
Mapreduce ile ilgili ayrıntılara biraz dikkat
Mapreduce programı paketlenmişse ve çalıştırmak için liux'a yerleştirilmişse,
Command java -cp xxx.jar ana sınıf adı
Bir hata rapor edilirse bu, ilgili bağımlı jar paketinin eksik olduğu anlamına gelir
Hadoop jar xxx.jar sınıf adı komutunu kullanın çünkü küme makinesinde istemci ana yöntemini başlatmak için hadoop jar xx.jar mr.wc.JobSubmitter komutunu kullandığınızda, hadoop jar komutu makinedeki hadoop kurulum dizinindeki kavanozu değiştirecektir. Paket ve yapılandırma dosyaları, çalışma zamanında sınıf yoluna eklenir
Daha sonra, istemcimizin ana yöntemindeki yeni Configuration () ifadesi, yapılandırma dosyasını sınıf yoluna yükleyecek ve doğal olarak
fs.defaultFS ve mapreduce.framework.name ve iplik.resourcemanager.hostname bu parametrelerin konfigürasyonu
Yerel hadoop ile ilgili tüm kavanoz paketlerine referans verilecektir
Mapreduce'un ayrıca çalıştırılması gereken yerel bir işi vardır, yani ipliğe gönderilmesi gerekmez ve bağımsız modda çalışabilir ve birden fazla iplik ile simüle edilebilir.
Yani, Linux veya Windows altında bir iş gönderirseniz, varsayılan olarak çalıştırılmak üzere yerelde gönderilecektir.
Eğer iplik varsayılan olarak linux üzerinde çalışacaksa, yapılandırma dosyası hadoop / etc / mapred-site.xml dosyasını yazmanız gerekir.
mapreduce.framework.name
iplik
Anahtar, değer çifti, eğer bu kendi sınıfınızsa, bu sınıf Writable'ı uygulamalı ve aynı zamanda, serileştirmek istediğiniz verileri ikiliye dönüştürmeli ve sonra onu rewrite yöntemi wirte parametresinin DataOutput'una ve başka bir readFields rewrite yöntemine koymalıdır. Seriyi kaldırma için kullanılır,
Seri durumdan çıkarılırken, bu sınıfın parametresiz yapım yöntemi kullanılarak bir nesne oluşturulacağını ve daha sonra nesnenin readFields yöntemi aracılığıyla geri yükleneceğini unutmayın.
DataOutput aynı zamanda bir tür akıştır, ancak hadoop tarafından kapsüllenir.Kendiniz kullandığınızda, bir FileOutputStream nesnesi eklemeniz gerekir.
DataOutput bir dize yazdığında, writeUTF ("dize") kullanılır.Bu şekilde kodlandığında, dizenin uzunluğu dizenin önüne eklenir. Bu, karakter kodlama sorunundan kaynaklanır. Hadoop ayrıştırır O zaman, dizenin ne kadar uzun olduğunu görmek için ilk önce ilk iki baytı okuyacaktır, aksi takdirde write (string.getBytes ()) kullanılıyorsa, dizenin kaç bayta sahip olduğunu bilmez.
Azaltma aşamasında, hdfs'ye bir nesne yazarsanız, dizenin toString yöntemi çağrılır ve bu sınıfın toString yöntemini geçersiz kılabilirsiniz.
Örneğin, aşağıdaki sınıf hadoop'ta serileştirilebilir
paket mapreduce2;
import java.io.DataInput;
import java.io.DataOutput;
java.io.IOException ithalatı;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
org.apache.hadoop.io.Writable içe aktarımı;
import org.apache.hadoop.util.Waitable;
public class FlowBean, Yazılabilir {
private int up; // Yukarı akış trafiği private int down; // Aşağı akış trafiği private int sum; // Toplam trafik private String phone; // Telefon numarası public FlowBean (int up, int down, String phone) { this.up = yukarı; this.down = aşağı; this.sum = yukarı + aşağı; this.phone = telefon; } public int getUp () { geri dönmek; } public void setUp (int up) { this.up = yukarı; } public int getDown () { aşağı dönüş; } public void setDown (int down) { this.down = aşağı; } public int getSum () { getiri toplamı; } public void setSum (int sum) { this.sum = toplam; } public String getPhone () { telefona dönüş; } public void setPhone (String phone) { this.phone = telefon; } @Override public void readFields (DataInput di) IOException { // Buradaki okuma sırasının yazma sırası ile aynı olduğuna dikkat edin this.up = di.readInt (); this.down = di.readInt (); this.sum = this.up + this.down; this.phone = di.readUTF (); } @Override public void write (DataOutput Do), IOException { Do.writeInt (this.up); Do.writeInt (this.down); Do.writeInt (this.sum); Do.writeUTF (this.phone); } @Override public String toString () { "telefon numarası" + this.phone + "toplam trafik" + this.sum döndür; }}
Tüm azaltma görevleri çalıştırıldığında, bir temizleme yöntemi çağrılacaktır.
Uygulama alıştırması: Bir sayfadaki toplam n veriyi sayın
1.Çözüm: Yalnızca bir indirgeme görevini kullanın, azaltma görevi aşamasında temizleme yöntemini kullanın, doğrudan hdf'lere koymayın, ancak bir Ağaç Haritası'nda saklayın
İndirgeme görevi sona erdikten sonra, temizlemede, Ağaç Haritasındaki ilk beşi HDFS'ye çıkarın;
paket cn.edu360.mr.page.topn;
public class PageCount, Comparable {
özel Dize sayfası;
özel int sayısı;
public void set (Dize sayfası, int sayısı) {
this.page = sayfa;
this.count = sayım;
}
public String getPage () {
dönüş sayfası;
}
public void setPage (Dize sayfası) {
this.page = sayfa;
}
public int getCount () {
dönüş sayısı;
}
public void setCount (int count) {
this.count = sayım;
}
@Override
public int CompareTo (PageCount o) {
return o.getCount () - this.count == 0? this.page.compareTo (o.getPage ()): o.getCount () - this.count;
}
}
harita sınıfı
java.io.IOException'ı içe aktarın;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.Text içe aktarın;
import org.apache.hadoop.mapreduce.Mapper;
public class PageTopnMapper, Mapper'ı genişletir {
@Override korumalı boşluk haritası (LongWritable anahtarı, Metin değeri, Bağlam bağlamı) IOException, InterruptedException { Dize satırı = value.toString (); Dize bölme = line.split (""); context.write (yeni Metin (bölünmüş), yeni IntWritable (1)); }}
sınıfı azaltmak
paket cn.edu360.mr.page.topn;
java.io.IOException ithalatı;
import java.util.Map.Entry;
java.util.Set'i içe aktarın;
java.util.TreeMap'i içe aktar;
org.apache.hadoop.conf.Configuration'ı içe aktarın;
import org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.Text içe aktarın;
import org.apache.hadoop.mapreduce.Reducer;
public class PageTopnReducer Reducer'ı genişletir {
Ağaç Haritası < PageCount, Nesne > treeMap = yeni TreeMap < > (); @Override korumalı boşluk azaltma (Metin tuşu, Tekrarlanabilir < IntWritable > değerler, Redüktör < Metin, Yazılabilir, Metin, Yazılabilir > .Context bağlamı) IOException, InterruptedException { int count = 0; for (IntWritable değer: değerler) { sayı + = değer.get (); } PageCount pageCount = new PageCount (); pageCount.set (key.toString (), sayım); treeMap.put (pageCount, null); } @Override korumalı boşluk temizleme (Bağlam bağlamı) IOException, InterruptedException { Yapılandırma conf = context.getConfiguration ();// Yapılandırmayı temizlemede alabilirsiniz, ilk birkaç veri parçasını almak için ondan okuyabilirsiniz
int topn = conf.getInt ("top.n", 5); Ayarlamak < Giriş < PageCount, Nesne > > entrySet = treeMap.entrySet (); int i = 0; giriş için < PageCount, Nesne > entry: entrySet) { context.write (yeni Metin (entry.getKey (). getPage ()), yeni IntWritable (entry.getKey (). getCount ())); i ++; eğer (i == topn) return; } }}
Sonra jobSubmit sınıfı, Yapılandırmayı ayarlamak için buna dikkat edin, birkaç yöntem var
Birincisi, yapılandırma dosyasını yüklemek
Yapılandırma conf = new Configuration (); conf.addResource ("xx-oo.xml");Ardından xx-oo.xml dosyasına yazın
< Emlak > < isim > top.n < / isim > < değer > 6 < / değer > < /Emlak >İkinci yol
// Doğrudan ayarlayarak
conf.setInt ("top.n", 3); // Java ana programına doğrudan aktarılan parametreleri iletin conf.setInt ("top.n", Tamsayı.parseInt (args));Üçüncü yol, yapılandırma dosyası parametrelerini elde etmektir
Özellikler props = yeni Özellikler ();
props.load (JobSubmitter.class.getClassLoader (). getResourceAsStream ("topn.properties")); conf.setInt ("top.n", Integer.parseInt (props.getProperty ("top.n")));Ardından topn.properties içindeki parametreleri yapılandırın
top.n = 5
Varsayılan olarak makinede simüle edilen alt gönderme sınıfı
org.apache.hadoop.conf.Configuration'ı içe aktarın;
org.apache.hadoop.fs.Path'i içe aktar;
import org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.Text içe aktarın;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobSubmitter {
public static void main (String args) Exception {atar / ** * Sınıf yolu altındaki * -site.xml dosyasını yükleyerek parametreleri ayrıştırın * / Yapılandırma conf = new Configuration (); conf.addResource ("xx-oo.xml"); / ** * Parametreleri koda göre ayarlayın * / //conf.setInt("top.n ", 3); //conf.setInt("top.n ", Integer.parseInt (args)); / ** * Özellik yapılandırma dosyası aracılığıyla parametreleri alın * / / * Özellikler props = yeni Özellikler (); props.load (JobSubmitter.class.getClassLoader (). getResourceAsStream ("topn.properties")); conf.setInt ("top.n", Integer.parseInt (props.getProperty ("top.n"))); * / Job job = Job.getInstance (conf); job.setJarByClass (JobSubmitter.class); job.setMapperClass (PageTopnMapper.class); job.setReducerClass (PageTopnReducer.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (IntWritable.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (IntWritable.class); FileInputFormat.setInputPaths (iş, yeni Yol ("F: \ mrdata \ url \ input")); FileOutputFormat.setOutputPath (iş, yeni Yol ("F: \ mrdata \ url \ output")); job.waitForCompletion (true); }}
Ek Java bilgi puanı ekleri
Ağaç haritası, yerleştirilen şeyler otomatik olarak sıralanacaktır
Treemap için iki özel yöntem, ilki bir Karşılaştırıcıda geçirmektir
public class TreeMapTest {
public static void main (String args) { Ağaç Haritası < FlowBean, Dize > tm1 = yeni TreeMap < > (yeni Karşılaştırıcı < FlowBean > () { @Override public int karşılaştırma (FlowBean o1, FlowBean o2) { // İki kategorinin toplam trafiği aynıysa, telefon numaraları karşılaştırılacaktır eğer (o2.getAmountFlow () - o1.getAmountFlow () == 0) { o1.getPhone (). CompareTo (o2.getPhone ()); } // Trafik farklıysa, en küçükten en büyüğe sıralayın return o2.getAmountFlow () - o1.getAmountFlow (); } }); FlowBean b1 = new FlowBean ("1367788", 500, 300); FlowBean b2 = new FlowBean ("1367766", 400, 200); FlowBean b3 = new FlowBean ("1367755", 600, 400); FlowBean b4 = yeni FlowBean ("1367744", 300, 500); tm1.put (b1, null); tm1.put (b2, null); tm1.put (b3, null); tm1.put (b4, null); // ağaç kümesi geçişi Ayarlamak < Giriş < FlowBean, Dize > > entrySet = tm1.entrySet (); giriş için < FlowBean, Dize > entry: entrySet) { System.out.println (entry.getKey () + "\ t" + entry.getValue ()); } }}
İkincisi, bu sınıfta Karşılaştırılabilir bir arayüz uygulamaktır
paket cn.edu360.mr.page.topn;
public class PageCount, Comparable {
özel Dize sayfası; özel int sayısı; public void set (Dize sayfası, int sayısı) { this.page = sayfa; this.count = sayım; } public String getPage () { dönüş sayfası; } public void setPage (Dize sayfası) { this.page = sayfa; } public int getCount () { dönüş sayısı; } public void setCount (int count) { this.count = sayım; } @Override public int CompareTo (PageCount o) { return o.getCount () - this.count == 0? this.page.compareTo (o.getPage ()): o.getCount () - this.count; }} Büyük veri geliştirme ve yüksek maaşlar için eksiksiz bir gerekli kaynak seti [ücretsiz erişim]
Oracle'ın kıdemli teknik direktörü, büyük verilerin geliştirilmesine tam olarak yardımcı olmak için uzun yıllar boyunca eksiksiz bir müfredat sistemi [büyük veri ve yapay zeka geliştirme için görülmesi gereken bir] oluşturdu Sıfır temel + giriş + iyileştirme + proje = yüksek maaş !
Söylenecek son şey, yukarıdaki öğreticinin nasıl alınacağıdır!
Nasıl alınır:
Binlerce yıldır değişmeyen hala eski kurallar
1. Yorum yazıları, kelime sınırı yoktur, tek kelime yeterlidir!
2. Xiaobian hayranı olun!
3. Özel Mesaj Editörü: "Büyük Veri Geliştirme Eğitimi"!