Akış bilgi işlem fırtınası temel bileşenleri, özellikleri ve vakaları

Fırtınaya giriş

Storm, ücretsiz ve açık kaynak kodlu, gerçek zamanlı bir bilgi işlem sistemidir. Storm'u kullanarak sınırsız veri akışını güvenilir bir şekilde kolayca işleyebilir. Büyük verileri toplu işleyen Hadoop gibi, Storm da verileri gerçek zamanlı olarak işleyebilir. Fırtına basittir ve herhangi bir programlama dilini kullanabilir.

Fırtına çekirdek bileşenleri

1. Nimbus

Kaynak tahsisi ve görev planlamasından sorumlu olan fırtına ustasına eşdeğer, sıradan bir fırtına kümesinde yalnızca bir nimbus vardır (JD, nimbus'ın aniden asılı kalmasını önlemek için seçimler gibi kavramlar ekleyen bir nimbus kümesidir)

2. Yönetici

Fırtına kölesine eşdeğer, Nimbus tarafından atanan görevleri almaktan, tüm çalışanları yönetmek ve başlatmaktan sorumlu

3. İşçi

Çalışan, bir Topoloji programına karşılık gelen bir JVM sürecidir, birden fazla Yürütücü olabilir

4. yürütücü

Yürütücü, varsayılan olarak bir göreve karşılık gelen bir iş parçacığıdır veya birden fazla göreve karşılık gelecek şekilde ayarlanabilir

5. Görev

Bir görev bir örnektir (spot / cıvata), yeni cıvatalar kadar çok görev vardır, görev fırtınada hesaplama için en küçük işletim birimidir

6. Topoloji

Topolojik yapı, bir hesaplama görevi senaryosu, bir topolojik yapıya karşılık gelir ve ağız ve cıvata arasındaki doğrudan ilişki topolojik yapıda bildirilir

7. Çıkış

Topolojideki veri kaynağıdır.Çok sayıda cıvataya veri gönderebilir. Spout, güvenilir bir veri kaynağı veya güvenilmez bir veri kaynağı olarak tanımlanabilir

8. Cıvata

Gerçek veri işleme bölümünde, bir cıvata birden fazla cıvataya gönderilebilir ve birden fazla cıvata da bir cıvataya gönderilebilir.

9. Bileşen

Hem Spout hem de Bolt bir Bileşendir ve Storm, IComponent adlı genel bir arabirimi tanımlar

Soy ağacı şu şekildedir: Yeşil kısım en sık kullanılan ve nispeten basit olan kısımdır. Kırmızı kısım işlerle ilgilidir

Emzik ve cıvata arasındaki ilişki:

Genel topoloji yapısı:

Storm, kümedeki birden fazla düğümü koordine etmek için zookeeper kullanıyor, ancak mesajları göndermek için zookeeper kullanmıyor

zookeeper bunu izleyebilir

Hem Nimbus hem de Süpervizör vatansızdır ve kalp atışları hayvanat bahçesi görevlisi tarafından koordine edilir

fırtına avantajları

1. Kullanımı basit ve kullanımı kolay

2. Ölçeklenebilir, çalışan topolojilerin paralelliğini ayarlayabilir

3. Hataya dayanıklı ve güvenilir. Bir çalışan düğümü çalışmadığında, Storm başka bir düğümü yeniden başlatmayı dener ve Nimbus ve Süpervizörler durumsuzdur ve yeniden başlatma onu etkilemez.

4. Veri kaybı yok, Storm'un soyut bileşenleri verilerin en az bir kez işlenmesini sağlar, mesaj kuyruk sistemi başarısız olsa bile mesajın işlenmesini sağlayabilir.

5. Birden çok programlama dilini destekleyen Storm, topolojileri tanımlamak ve sunmak için Thrift kullanır.Thrift herhangi bir programlama dili tarafından kullanılabileceğinden, topolojiler herhangi bir programlama dili tarafından da tanımlanabilir ve kullanılabilir.

6. Kurulumu ve çalıştırması kolay

7. Yüksek performans, düşük gecikme

Fırtına girişi durumu (tek bir sayının gerçek zamanlı istatistikleri)

İlk içe aktarma maven bağımlılıkları

1 < bağımlılık > 2 < Grup kimliği > org.apache.storm < /Grup kimliği > 3 < artifactId > fırtına çekirdeği < / artifactId > 4 < versiyon > 1.0.4 < / version > 5 < /bağımlılık > 1. İlk önce bir Spout yazın ve veri kaynağını belirleyin. Gerçek uygulamalarda genellikle kafka gibi mesajlara erişmek içindir. Giriş durumu bunun yerine rastgele dizeler kullanır / ** * Tuple veri akışını arka uca iletin * @ yazar ruhu * * / public class PunSpout, BaseRichSpout'u genişletir { // BaseRichSpout, ISpout arabiriminin ve IComponent arabiriminin basit bir uygulamasıdır. Arabirim, kullanılmayan yöntemler için varsayılan uygulamalar sağlar özel SpoutOutputCollector toplayıcı; private String cümleleri = { "benim adım ruh", "Ben bir erkeğim", "bir köpeğim var", "köpeğimde pireler var", "kız arkadaşım güzel" }; özel int indeksi = 0; / ** * Open () yöntemi ISpout arayüzünde tanımlanır ve Spout bileşeni başlatıldığında çağrılır. * open () üç parametreyi kabul eder: Storm yapılandırmasını içeren bir Harita, topolojideki bileşenler hakkında bilgi sağlayan bir TopologyContext nesnesi ve SpoutOutputCollector nesnesi, tuple'ları yaymak için bir yöntem sağlar. * Bu örnekte, başlatma işlemi gerçekleştirmemize gerek yoktur, bunu bir SpoutOutputCollector örnek değişkeninde saklamamız yeterlidir. * / @Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Otomatik oluşturulan yöntem saplaması this.collector = toplayıcı; } / ** * NextTuple () yöntemi, herhangi bir Spout uygulamasının temelidir. * Storm bu yöntemi çıktı toplayıcıya tuple göndermek için çağırır. * Burada sadece mevcut dizinin cümlesini gönderiyoruz ve bir sonraki cümleyi göndermeye hazırlanmak için indeksi artırıyoruz. * / @Override public void nextTuple () { //collector.emit(new Values ("merhaba dünya bu bir testtir")); // TODO Otomatik oluşturulan yöntem saplaması this.collector.emit (yeni Değerler (cümleler)); indeks ++; eğer (dizin > = cümleler.length) { indeks = 0; } Utils.sleep (1000); } / ** * declareOutputFields, IComponent arayüzünde tanımlanmıştır, tüm Storm bileşenleri (ağız ve cıvata) bu arayüzü uygulamalıdır * Storm akış bileşenlerinin bu veri akışlarını göndereceğini ve her akışın demetinin içereceği alanları anlatmak için kullanılır * / @Override public void declareOutputFields (OutputFieldsDeclarer deklaratörü) { // TODO Otomatik oluşturulan yöntem saplaması declarer.declare (new Fields ("cümle")); // Bileşene cümle alanını içeren veri akışını göndermesini söyle } } 2. İlk cıvatayı yazın, Spout'tan geçen Tuple'ı tek bir cıvataya bölün ve bir döngüde sonraki cıvataya gönderin / ** * Kelimeleri bölmek için cümle ağzı tarafından yayılan tuple akışına abone olun * @ yazar ruhu * * / public class SplitSentenceBolt, BaseRichBolt'u genişletir { // BaseRichBolt, IComponent ve IBolt arayüzünün uygulamasıdır // Bu sınıfı devralın, bu örneğin umursamadığı yöntemleri uygulamanıza gerek yoktur özel OutputCollector toplayıcı; / ** * Prepar () yöntemi, ISpout'un open () yöntemine benzer. * Bu yöntem, leke başlatıldığında çağrılır ve veritabanı bağlantıları gibi cıvata tarafından kullanılan kaynakları hazırlamak için kullanılabilir. * Bu örnek, CümleSpout sınıfı ile aynıdır SplitSentenceBolt sınıfı çok fazla ek başlatma gerektirmez. * Dolayısıyla, hazırla () yöntemi yalnızca OutputCollector nesnesine bir başvuru kaydeder. * / @Override public void hazırlığı (Map stormConf, TopologyContext context, OutputCollector toplayıcı) { // TODO Otomatik oluşturulan yöntem saplaması this.collector = toplayıcı; } / ** * SplitSentenceBolt'un temel işlevi, IBolt arayüzünde tanımlanan IBolt sınıfındaki execute () yöntemini tanımlamaktır. * Bu yöntem, Bolt akıştan abone olunan bir demeti her aldığında çağrılır. * Bu örnekte, alınan demetteki "cümle" değerini bulun, * Ve değeri tek tek kelimelere bölün ve ardından kelimeye göre yeni demetler gönderin. * / @Override public void execute (Tuple girişi) { // TODO Otomatik oluşturulan yöntem saplaması String cümle = input.getStringByField ("cümle"); Dize kelimeleri = cümle.split (""); for (Dize kelimesi: kelimeler) { this.collector.emit (new Values (word)); // Verileri bir sonraki cıvataya yayınla } } / ** * PlitSentenceBolt sınıfı, her biri bir alan ("kelime") içeren bir tuple akışını tanımlar. * / @Override public void declareOutputFields (OutputFieldsDeclarer deklaratörü) { // TODO Otomatik oluşturulan yöntem saplaması declarer.declare (yeni Alanlar ("kelime")); } } 3. Bir yandan başka bir cıvata yazın, önceki cıvatadan tek geçişi alın, diğer yandan, aynı tek seferin kaç kez olduğunu kaydedin ve mevcut sonucu sonraki cıvataya geçirin / ** * Bölünmüş cümle cıvatasının çıktı akışına abone olun, kelime sayısını gerçekleştirin ve mevcut sayımı bir sonraki cıvataya gönderin * @ yazar ruhu * * / genel sınıf WordCountBolt, BaseRichBolt'u genişletir { özel OutputCollector toplayıcı; // Kelimeleri ve karşılık gelen sayıları saklayın özel HashMap < Dize, Uzun > counts = null; // Not: serileştirilemez nesnelerin hazırlanırken somutlaştırılması gerekir / ** * Çoğu örnek değişkeni genellikle hazırda () somutlaştırılır. Bu tasarım deseni, topolojinin dağıtım yöntemi tarafından belirlenir * Çünkü topoloji konuşlandırıldığında, bileşen ağzı ve cıvata ağda gönderilen serileştirilmiş örnek değişkenleridir. * Spout veya bolt, serileştirmeden önce başlatılan (örneğin, yapıcıda oluşturulan) herhangi bir serileştirilemez örnek değişkenine sahipse * NotSerializableException atılacak ve topoloji yayınlanmayacaktır. * Bu örnekte, HashMap serileştirilebilir olduğundan, yapıcıda güvenle başlatılabilir. * Ancak, yapıcıdaki temel veri türlerini ve serileştirilebilir nesneleri kopyalamak ve örneklemek genellikle en iyisidir * Prepar () yönteminde, serileştirilemez nesne somutlaştırılır. * / @Override public void hazırlığı (Map stormConf, TopologyContext context, OutputCollector toplayıcı) { // TODO Otomatik oluşturulan yöntem saplaması this.collector = toplayıcı; this.counts = yeni HashMap < Dize, Uzun > (); } / ** * Execute () yönteminde, alınan kelimelerin sayısına bakarız (eğer mevcut değilse, 0 olarak başlatırız) * Ardından sayımı artırın ve kaydedin ve yeni bir sözcük ve geçerli sayımdan oluşan iki demet gönderin. * Yayın olarak emisyon sayımı, topolojinin diğer cıvatalarının abone olmasına ve ek işlemler gerçekleştirmesine izin verir. * / @Override public void execute (Tuple girişi) { // TODO Otomatik oluşturulan yöntem saplaması Dize kelimesi = input.getStringByField ("kelime"); Uzun sayı = this.counts.get (kelime); if (count == null) { count = 0L; // Mevcut değilse, 0 olarak başlat } count ++; // Sayıyı artır this.counts.put (kelime, sayı); // mağaza sayısı this.collector.emit (yeni Değerler (kelime, sayı)); } / ** * * / @Override public void declareOutputFields (OutputFieldsDeclarer deklaratörü) { // TODO Otomatik oluşturulan yöntem saplaması // Tuple'ın kelimeyi ve karşılık gelen sayımı içerdiği bir çıktı akışı bildirin ve onu geriye doğru gönderin // Daha fazla işlem için diğer cıvatalar bu veri akışına abone olabilir declarer.declare (yeni Alanlar ("kelime", "sayı")); } } 4. Başka bir cıvata yazın, son cıvatadan tek istatistiksel sonucu alın ve konsola yazdırın. Asıl son adım genellikle veri sonuçlarını HBase veya Redis gibi ilişkisel olmayan bir veritabanında depolar / ** * Bir rapor oluşturun * @ yazar ruhu * * / public class ReportBolt, BaseRichBolt'u genişletir { özel HashMap < Dize, Uzun > counts = null; // Kelimeleri ve karşılık gelen sayıları kaydet @Override public void hazırlığı (Map stormConf, TopologyContext context, OutputCollector toplayıcı) { // TODO Otomatik oluşturulan yöntem saplaması this.counts = yeni HashMap < Dize, Uzun > (); } @Override public void execute (Tuple girişi) { // TODO Otomatik oluşturulan yöntem saplaması Dize kelimesi = input.getStringByField ("kelime"); Uzun sayım = input.getLongByField ("count"); this.counts.put (kelime, sayım); // Gerçek zamanlı çıktı System.out.println ("Sonuç:" + this.counts); } @Override public void declareOutputFields (OutputFieldsDeclarer deklaratörü) { // TODO Otomatik oluşturulan yöntem saplaması // Bu son cıvata, veri akışını iletmeye gerek yok, burada tanımlamaya gerek yok } / ** * Temizleme IBolt arayüzünde tanımlanmıştır * Fırtına, bir cıvatayı sonlandırmadan önce bu yöntemi çağıracak * Bu örnekte, topoloji kapatıldığında son sayım sonucunu çıkarmak için cleanup () yöntemini kullanıyoruz * Normal koşullar altında cleanup () yöntemi, açık dosya tutamaçları veya veritabanı bağlantıları gibi cıvatanın işgal ettiği kaynakları serbest bırakmak için kullanılır * Ancak Storm topolojisi bir küme üzerinde çalışırken, IBolt.cleanup () yönteminin yürütülmesi garanti edilemez (bu geliştirme modudur, bunu üretim ortamında yapmayın). * / @Override public void cleanup () { System.out.println ("---------- NİHAİ SAYILAR -----------"); Dizi Listesi < Dize > anahtarlar = new ArrayList < Dize > (); keys.addAll (this.counts.keySet ()); Collections.sort (anahtarlar); for (Dize anahtarı: anahtarlar) { System.out.println (anahtar + ":" + this.counts.get (anahtar)); } System.out.println ("----------------------------"); } } 5. Topolojiyi yazın, önceki dört adımın Spout ve Bolt'unu bir topolojide birleştirin ve sonuçları görmek için doğrudan ana yöntemi çalıştırın. Bu, Storm'un yerel modudur. Gönderilen yöntemi biraz değiştirirseniz, bir küme moduna dönüştürebilirsiniz. , Aslında bir küme modudur, bu kodlar bir jar paketi olarak Nimbus'a gönderilir ve kümede çalıştırılır. / ** * Kelime sayma topolojisinin farkına varın * * / public class Uygulaması { private static final String SENTENCE_SPOUT_ID = "cümle-uç"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "kelime sayımı-topolojisi"; public static void main (String argümanları) // İstisna atar { //System.out.println ("Merhaba Dünya!"); // Emzik ve cıvatayı örnekleyin CümleSpout spout = new CümleSpout (); SplitSentenceBolt splitBolt = new SplitSentenceBolt (); WordCountBolt countBolt = new WordCountBolt (); ReportBolt reportBolt = new ReportBolt (); TopologyBuilder oluşturucu = new TopologyBuilder (); // Bir TopologyBuilder örneği oluşturuldu // TopologyBuilder, topoloji bileşenleri arasındaki veri akışını tanımlamak için bir akış tarzı API sağlar //builder.setSpout(SENTENCE_SPOUT_ID, spout); // bir cümle spout kaydedin // Varsayılan olarak biri olmak üzere iki Yürütücü (iş parçacığı) ayarlayın builder.setSpout (SENTENCE_SPOUT_ID, emzik, 2); // CümleSpout - > SplitSentenceBolt // Bir cıvatayı kaydedin ve cümle tarafından yayılan veri akışına abone olun. ShuffleGrouping yöntemi, Storm'a, CümleSpout tarafından yayılan tupleları SplitSentenceBolt örneklerine rastgele ve eşit olarak dağıtmasını söyler //builder.setBolt(SPLIT_BOLT_ID, splitBolt) .shuffleGrouping (SENTENCE_SPOUT_ID); // SplitSentenceBolt kelime ayırıcı, 4 Görev, 2 Yürütme (iş parçacığı) kümeleri builder.setBolt (SPLIT_BOLT_ID, splitBolt, 2) .setNumTasks (4) .shuffleGrouping (SENTENCE_SPOUT_ID); // SplitSentenceBolt - > WordCountBolt // alanlarGruplama, belirli verileri içeren demetleri özel cıvata örneklerine yönlendirir // Burada FieldsGrouping () yöntemi, aynı "word" alanına sahip tüm tuupleların aynı WordCountBolt örneğine yönlendirilmesini sağlar //builder.setBolt(COUNT_BOLT_ID, countBolt) .fieldsGrouping (SPLIT_BOLT_ID, new Fields ("word")); // WordCountBolt kelime sayacı kümesi 4 Yürütücü (thread) builder.setBolt (COUNT_BOLT_ID, countBolt, 4) .fieldsGrouping (SPLIT_BOLT_ID, yeni Alanlar ("kelime")); // WordCountBolt - > ReportBolt // globalGrouping, WordCountBolt tarafından yayılan tüm tupleları tek ReportBolt'a yönlendirmektir builder.setBolt (REPORT_BOLT_ID, reportBolt) .globalGrouping (COUNT_BOLT_ID); Config config = new Config (); // Config sınıfı bir HashMap'dir < Dize, Nesne > Çalışma zamanında topolojinin davranışını yapılandırmak için bir alt sınıfı // Çalışan sayısını belirleyin //config.setNumWorkers(2); LocalCluster kümesi = new LocalCluster (); // Yerel gönderim cluster.submitTopology (TOPOLOGY_NAME, config, builder.createTopology ()); Utils.sleep (10000); cluster.killTopology (TOPOLOGY_NAME); cluster.shutdown (); } }

Fırtınanın diğer akış bilgi işlem çerçeveleriyle karşılaştırılması

1.Spark Akışı

İşlemden önce, zaman aralığına göre önceden toplu işlere bölünür.

Spark'ın sürekli veri akışı için soyutlamasına DStream (DiscretizedStream) denir,

Bir DStream, bir mikro gruplama RDD'dir (Esnek Dağıtılmış Veri Kümesi),

RDD, rastgele işlev ve kayan pencere veri dönüştürme olmak üzere iki şekilde paralel olarak çalışabilen dağıtılmış bir veri kümesidir.

2.Flink

Veri akışı ve toplu veri için dağıtılmış işleme motoru

Yerel akış işleme sistemi,

İşlenecek ana senaryo veri akışıdır, toplu veri sadece veri akışı için son derece özel bir durumdur

Flink, tüm görevleri akış olarak ele alacak

3. Fırtına

Yerel akış işleme sistemi, milisaniye işlem yapabilir

Yorumu okumak, iletmek ve takip etmek için tıklayın, takipte birçok bilgi noktası olacak, sizinle paylaşın, ayrıca bana özel mesaj da gönderebilirsiniz!

Düşünemeyeceğin hiçbir şey yok, sadece sen yapamazsın.

Hive SQL ayrıştırma işleminin ayrıntılı açıklaması
önceki
Apple HomePod, BAT'a meydan okumak için Çin'e girdi. Akıllı hoparlörler savaşında kazanma şansı nedir?
Sonraki
Yazın en kapsamlı DIY satın alma rehberi olan Xiaobai, okuduktan sonra yardım istemeyecek
Yasak Şehir için 80.000 bilet satıldı Kar yağıyor Yasak Şehir'i nasıl çekebilirim? "Saraya giremeyen" insanlar, arkadaş çevrelerini en üst düzeye çıkaran fotoğraflar da çekebilirler!
Apple HomePod, BAT'a meydan okumak için Çin'e girdi. Akıllı hoparlörler savaşında kazanma şansı nedir?
Programcılar, mükemmel yüksek maaşlı mimarlara nasıl yükseltilebilir?
Çin Yeni Yılı'nda eve giderken birçok insan güldü ve ağladı ...
Hangzhou Körfezi Köprüsü: Nehirden okyanusa sıçramayı gerçekleştirmek için kılıcı bileme on yıl
OnePlus 5T, Android 9.0 güncellemesini başlattı, resmi itme hidrojen OS 9.0.1
Yaygın web sunucusu mimarileri nelerdir?
Pekin Festivali'nden sonra Ruixue'ye hoş geldiniz! Caochang'ın dört hutonundaki kırmızı fenerler "kar şapkaları" takıyor
Aylık 29 + 300 dakika kira + sınırsız data, bu Telekom paketi tükendi
Endüstri 4.0 denizaşırı birleşme ve satın almalar Zhejiang'ın çift "ejderhası" başladı
Pekin'in üzerine yine kar yağıyor! Pekin'liler bir yana, bu 15 yer en şaşırtıcı yerler.
To Top