Önceki makalede, Ignite ve Spark adlı iki teknolojiyi tanıttık ve iki teknolojiyi kapsamlı ve derinlemesine bir şekilde karşılaştırdık. Analizden sonra bir sonuç çıkarabiliriz: her ikisi de çok güçlü, ancak çok farklılar ve farklı konumlandırmalara sahipler, dolayısıyla farklı uygulama alanları olacak.
Bununla birlikte, bu iki teknoloji tamamlayıcı da olabilir, öyleyse tamamlayıcı uygulanabilir senaryoları nelerdir? Esas olarak şu yönlerden: Spark'ta SQL'in daha yavaş olduğunu düşünüyorsanız, Ignite, Spark uygulamalarını kendi yolunda daha da hızlandırmak için çözümler sunar.Bu konuda çok fazla çözüm yoktur ve geliştirilmesi tavsiye edilir. Yazar, diğerinin veri ve durum paylaşımı olduğunu düşünmektedir, tabi ki bu alanda pek çok çözüm vardır ve Ignite kullanmak gerekli değildir.
Ignite, doğal olarak Spark için destek sağlar, bu makale esas olarak Neden karşı o nasıl Ignite ve Spark'ı entegre edin.
1. Ignite'ı Spark ile entegre edin
Bu iki teknolojiyi entegre etmek, Spark uygulamalarına birkaç bariz fayda sağlayacaktır:
Aşağıdaki şekil, bu iki teknolojinin nasıl entegre edileceğini gösterir ve önemli avantajları vurgular:
Bu figür sayesinde, Ignite'ın tüm Spark uygulamasındaki konumunu ve rolünü genel mimari perspektifinden görebilirsiniz.
Ignite'ın Spark için desteği esas olarak iki yönden yansıtılır, biri Ignite RDD ve diğeri Ignite DataFrame'dir. Bu makale önce Ignite RDD'ye, ardından Ignite DataFrame'e odaklanacaktır.
2. Ignite RDD
Ignite, IgniteRDD adlı bir SparkRDD uygulaması sağlar. Bu uygulama, bellekteki Spark işleri genelinde herhangi bir veriyi ve durumu paylaşabilir. IgniteRDD, Ignite'ta aynı bellek verilerinin paylaşılan ve değişken bir görünümünü sağlar; Farklı Spark işleri, çalışan düğümleri veya uygulamaları, aksine, yerel SparkRDD, Spark işleri veya uygulamaları arasında paylaşılamaz.
Ignite dağıtılmış önbelleğin bir görünümü olarak, IgniteRDD, Spark işi yürütme sürecinde, Spark çalışan düğümünde veya kendi kümesinde konuşlandırılabilir. Bu nedenle, önceden yapılandırılmış dağıtım modeline göre, durum paylaşımı yalnızca bir Spark uygulamasının yaşam döngüsü içinde (yerleşik mod) veya Spark uygulamasının dışında (bağımsız mod) olabilir.
Ignite, Spark uygulamalarının SQL performansını artırmasına da yardımcı olabilir. SparkSQL zengin SQL sözdizimini desteklese de, dizinleri uygulamaz. Sonuçlar açısından, genellikle küçük bir veri kümesinde bile, Spark sorguları birkaç dakika sürebilir çünkü tam bir tablo taraması gereklidir. Ignite kullanılıyorsa, Spark kullanıcıları, binlerce kez performans iyileştirmesi sağlayabilen birincil ve ikincil dizinleri yapılandırabilir.
2.1. IgniteRDD örneği
Aşağıda, bazı kodlar aracılığıyla IgniteRDD'nin faydaları ve birkaç uygulama oluşturma yöntemi gösterilmektedir.
Ignite RDD'ye, diller arası gereksinimleri olan ekipler için uygun olan birden çok dilde erişilebilir.Aşağıdaki kod, iki basit Scala uygulamasını ve iki Java uygulamasını içerir. Ek olarak, uygulamalar iki farklı ortamdan çalıştırılacaktır: Terminalden Scala uygulamaları ve IDE aracılığıyla Java uygulamaları. Ek olarak, bazı SQL sorguları Java uygulamasında çalıştırılacaktır.
Scala uygulamaları için, verileri IgniteRDD'ye yazmak için bir uygulama kullanılırken, başka bir uygulama kısmi filtreleme gerçekleştirecek ve sonuç kümesini döndürecektir. Kodu bir jar dosyasına oluşturmak ve programı terminal penceresinde çalıştırmak için Maven'i kullanın. Ayrıntılı kod aşağıda verilmiştir:
RDDWriter nesnesi Uygulamayı genişletir { val conf = new SparkConf (). setAppName ("RDDWriter") val sc = new SparkContext (conf) val ic = new IgniteContext (sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD = ic.fromCache ("sharedRDD") sharedRDD.savePairs (sc.parallelize (1 ila 1000, 10) .map (i = > (i, i))) ic.close (doğru) sc.stop () } RDDReader nesnesi Uygulamayı genişletir { val conf = new SparkConf (). setAppName ("RDDReader") val sc = new SparkContext (conf) val ic = new IgniteContext (sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD = ic.fromCache ("sharedRDD") val greatThanFiveHundred = paylaşılanRDD.filter (_._ 2 > 500) println ("Sayı" + daha büyükThanFiveHundred.count ()) ic.close (doğru) sc.stop () }Bu Scala RDDWriter'da önce uygulama adını içeren SparkConf oluşturulur, ardından bu konfigürasyona göre SparkContext oluşturulur ve son olarak bu SparkContext'e göre bir IgniteContext oluşturulur. IgniteContext'i oluşturmanın birçok yolu vardır. Bu örnekte, example-shared-rdd.xml adlı bir XML dosyası kullanılmaktadır. Bu dosya Ignite dağıtımıyla birleştirilecek ve ardından gereksinimlere göre önceden yapılandırılacaktır. Açıkçası, yolu (Ignite ana dizini) kendi ortamınıza göre değiştirmeniz, ardından IgniteRDD tarafından tutulan tamsayı değeri demetini belirtmeniz ve son olarak, 1'den 1000'e kadar tamsayı değerini IgniteRDD'ye depolamanız ve değer depolaması 10 paralel işlem kullanır.
Bu Scala RDDReader'da, başlatma ve yapılandırma Scala RDDWriter ile aynıdır.Aynı XML yapılandırma dosyası kullanılacak. Uygulama kısmi filtreleme gerçekleştirecek ve ardından 500'den büyük kaç değerin saklandığına odaklanacak ve sonunda yanıt çıktı olarak alınacaktır.
IgniteContext ve IgniteRDD hakkında daha fazla bilgi için Ignite belgelerine bakın.
Jar dosyasını oluşturmak için aşağıdaki maven komutunu kullanabilirsiniz:
mvn temiz yüklemeArdından, Java koduna bir göz atın. Önce IgniteRDD'ye birden çok kayıt yazmak için bir Java uygulaması yazın, ardından başka bir uygulama kısmi filtreleme gerçekleştirecek ve sonuç kümesini döndürecektir. RDDWriter'ın kod ayrıntıları aşağıdadır:
genel sınıf RDDWriter { public static void main (String args) { SparkConf sparkConf = new SparkConf (). SetAppName ("RDDWriter"). SetMaster ("yerel"). Set ("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext (sparkConf); Logger.getRootLogger (). SetLevel (Level.OFF); Logger.getLogger ("org.apache.ignite"). SetLevel (Level.OFF); JavaIgniteContext < Tamsayı, Tamsayı > igniteContext = yeni JavaIgniteContext < Tamsayı, Tamsayı > ( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD < Tamsayı, Tamsayı > sharedRDD = igniteContext. < Tamsayı, Tamsayı > fromCache ("sharedRDD"); Liste < Tamsayı > data = new ArrayList < > (20); for (int i = 1001; i < = 1020; i ++) { data.add (i); } JavaRDD < Tamsayı > javaRDD = sparkContext. < Tamsayı > paralelleştirme (veri); sharedRDD.savePairs (javaRDD. < Tamsayı, Tamsayı > mapToPair (yeni PairFunction < Tamsayı, Tamsayı, Tamsayı > () { genel Tuple2 < Tamsayı, Tamsayı > call (Tamsayı değeri) İstisna {atar yeni Tuple2'yi iade et < Tamsayı, Tamsayı > (değer, değer); } })); igniteContext.close (true); sparkContext.close (); } }Bu Java RDDWriter'da, ilk olarak uygulama adını ve yürütücü sayısını içeren bir SparkConf oluşturulur, ardından bu yapılandırmaya göre bir SparkContext oluşturulur ve son olarak, bu SparkContext'e dayalı olarak bir IgniteContext oluşturulur. Son olarak, IgniteRDD'ye 20 değer daha eklendi.
Bu Java RDDReader'da, başlatma ve yapılandırma Java RDDWriter ile aynıdır ve aynı XML yapılandırma dosyası kullanılacaktır. Uygulama kısmi filtreleme gerçekleştirecek ve ardından 500'den büyük kaç değerin depolandığına odaklanacaktır. Yanıt, sonunda verilecektir. Aşağıdaki Java RDDReader'dır. Kod:
genel sınıf RDDReader { public static void main (String args) { SparkConf sparkConf = new SparkConf (). SetAppName ("RDDReader"). SetMaster ("yerel"). Set ("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext (sparkConf); Logger.getRootLogger (). SetLevel (Level.OFF); Logger.getLogger ("org.apache.ignite"). SetLevel (Level.OFF); JavaIgniteContext < Tamsayı, Tamsayı > igniteContext = yeni JavaIgniteContext < Tamsayı, Tamsayı > ( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD < Tamsayı, Tamsayı > sharedRDD = igniteContext. < Tamsayı, Tamsayı > fromCache ("sharedRDD"); JavaPairRDD < Tamsayı, Tamsayı > greatThanFiveHundred = sharedRDD.filter (yeni İşlev < Tuple2 < Tamsayı, Tamsayı > , Boole > () { genel Boole çağrısı (Tuple2 < Tamsayı, Tamsayı > tuple) Exception { dönüş tuple._2 () > 500; } }); System.out.println ("Sayı" + daha büyükThanFiveHundred.count ()); System.out.println (" > > > Ignite Shared RDD üzerinden SQL sorgusu yürütülüyor ... "); Veri kümesi df = sharedRDD.sql ("Tamsayı'dan _val'i seçin, burada _val > 10 ve _val < 100 sınır 10 "); df.show (); igniteContext.close (true); sparkContext.close (); } }Şimdi kodu test edebilirsiniz.
2.2. Uygulamayı çalıştırın
İlk terminal penceresinde, Spark'ın ana düğümünü aşağıdaki gibi başlatın:
$ SPARK_HOME / sbin / start-master.shİkinci terminal penceresinde, Spark işçi düğümünü aşağıdaki gibi başlatın:
$ SPARK_HOME / bin / spark-class org.apache.spark.deploy.worker.Worker spark: // ip: portIP adresini ve bağlantı noktası numarasını (ip: bağlantı noktası) ortamınıza göre değiştirin.
Üçüncü terminal penceresinde, aşağıdaki gibi bir Ignite düğümü başlatın:
$ IGNITE_HOME / bin / ignite.sh örnekleri / config / spark / example-shared-rdd.xmlDaha önce tartışılan örnek paylaşılan rdd.xml dosyası burada kullanılır.
Dördüncü terminal penceresinde, RDDWriter uygulamasının Scala sürümünü aşağıdaki gibi çalıştırabilirsiniz:
$ SPARK_HOME / bin / spark-submit --class "com.gridgain.RDDWriter" --master spark: // ip: port "/path_to_jar_file/ignite-spark-scala-1.0.jar"IP adresini ve bağlantı noktasını (ip: bağlantı noktası) ve jar dosyasının yolunu (/ path_to_jar_file) kendi ortamınıza göre değiştirin.
Aşağıdaki çıktıyı üretecek:
Sayım 500Bu, istenen çıktıdır.
Daha sonra, Ignite düğümü çalışırken ve IgniteRDD diğer uygulamalar için kullanılabilir durumdayken, Spark'ın ana düğümünü ve çalışan düğümlerini öldürün.Ardından, Java uygulaması aracılığıyla IgniteRDD'ye erişmek için IDE'yi kullanacağım.
RDDWriter'ın Java sürümünü çalıştırmak, IgniteRDD'de önceden depolanan kayıtların listesini genişletir. RDDReader'ın Java sürümünü çalıştırarak bunu test edebilirsiniz. Aşağıdaki çıktıyı üretir:
Sayı 520Bu aynı zamanda istenen çıktıdır.
Son olarak, SQL sorgusu IgniteRDD'de bir SELECT ifadesini çalıştıracak ve 10 ile 100 arasında değişen ilk 10 değeri döndürecektir. Çıktı aşağıdaki gibidir:
Sonuç doğrudur.
3. IgniteDataframes
Spark'ın DataFrame API'si, verileri açıklamak için desen kavramını sunar Spark, desenleri yönetir ve verileri tablolar biçiminde düzenler.
DataFrame, adlandırılmış sütunlarda düzenlenen dağıtılmış bir veri kümesidir. Kavramsal olarak DataFrame, ilişkisel bir veritabanındaki bir tabloya eşdeğerdir ve Spark'ın verimli sorgu yürütme planları oluşturmak için Catalyst sorgu iyileştiricisini kullanmasına izin verir. RDD, küme düğümleri arasında bölümlenmiş öğelerden oluşan bir koleksiyondur.
Ignite, Spark'ın bellek deposu olarak Ignite kullanıldığında DataFrame'leri genişletir, geliştirmeyi basitleştirir ve veri erişim süresini iyileştirir. Avantajları şunları içerir:
3.1. IgniteDataframes örneği
Ignite ve Spark'ı bazı kodlar aracılığıyla Ignite DataFrames aracılığıyla entegre etmeyi ve birkaç küçük program oluşturmayı öğrenelim.
Toplam iki küçük Java uygulaması yazılacak ve daha sonra IDE'de çalıştırılacak ve bu Java uygulamalarında bazı SQL sorguları yürütülecektir.
Bir Java uygulaması, JSON dosyasından bazı verileri okuyacak ve ardından Ignite'ta depolanan bir DataFrame oluşturacaktır. Bu JSON dosyası Ignite sürümünde sağlanmıştır. Başka bir Java uygulaması, Ignite DataFrame'deki verileri okuyacak ve ardından gerçekleştirmek için SQL'i kullanacaktır. Sormak.
İşte uygulamayı yazmak için gereken kod:
public class DFWriter { private static final String CONFIG = "config / example-ignite.xml"; public static void main (String args) { Ignite ignite = Ignition.start (CONFIG); SparkSession spark = SparkSession.builder (). AppName ("DFWriter"). Master ("yerel"). Config ("spark.executor.instances", "2"). GetOrCreate (); Logger.getRootLogger (). SetLevel (Level.OFF); Logger.getLogger ("org.apache.ignite"). SetLevel (Level.OFF); Veri kümesi < Kürek çekmek > peopleDF = spark.read (). json ( solutionIgnitePath ("kaynaklar / insanlar.json"). getAbsolutePath ()); System.out.println ("JSON dosyası içeriği:"); peopleDF.show (); System.out.println ("Tutuşturmak için DataFrame Yazılıyor."); peopleDF.write (). format (IgniteDataFrameSettings.FORMAT_IGNITE ()). seçenek (IgniteDataFrameSettings.OPTION_CONFIG_FILE (), CONFIG) .option (IgniteDataFrameSettings.OPTION_TABLE (), "ARIYABİLİR_KURUMU). seçenek (IgniteYFrameSettings.OPTION_CREATE) ) .option (IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS (), "şablon = çoğaltılmış"). save (); System.out.println ("Bitti!"); Ateşleme. Dur (yanlış); } }DFWriter'da, önce uygulama adını içeren SparkSession oluşturulur ve ardından JSON dosyasını okumak ve dosya içeriğini çıkarmak için spark.read (). Json () kullanılır.Bir sonraki adım, verileri Ignite deposuna yazmaktır. İşte DFReader'ın kodu:
public class DFReader { private static final String CONFIG = "config / example-ignite.xml"; public static void main (String args) { Ignite ignite = Ignition.start (CONFIG); SparkSession spark = SparkSession.builder (). AppName ("DFReader"). Master ("yerel"). Config ("spark.executor.instances", "2"). GetOrCreate (); Logger.getRootLogger (). SetLevel (Level.OFF); Logger.getLogger ("org.apache.ignite"). SetLevel (Level.OFF); System.out.println ("Ignite tablosundan veri okunuyor."); Veri kümesi < Kürek çekmek > peopleDF = spark.read (). format (IgniteDataFrameSettings.FORMAT_IGNITE ()). seçenek (IgniteDataFrameSettings.OPTION_CONFIG_FILE (), CONFIG) .option (IgniteDataFrameSettings.OPTION_TABLE (), "insanlar"). load (); peopleDF.createOrReplaceTempView ("insanlar"); Veri kümesi < Kürek çekmek > sqlDF = spark.sql ("SEÇİN * KİMLİĞİ NEREDEN > 0 AND id < 6 "); sqlDF.show (); System.out.println ("Bitti!"); Ateşleme. Dur (yanlış); } }DFReader'da, başlatma ve yapılandırma DFWriter ile aynıdır. Bu uygulama bazı filtreleme yapacaktır. Gerekli olan tüm kimlikleri bulmaktır > 0 ve < 6 kişi, ardından sonucu çıktı.
IDE'de, bir Ignite düğümü aşağıdaki kodla başlatılabilir:
public class ExampleNodeStartup { public static void main (String argümanları) IgniteException {atar Ignition.start ("config / örnek-ignite.xml"); } }Bu noktada kodu test edebilirsiniz.
3.2. Uygulamayı çalıştırın
Önce IDE'de bir Ignite düğümü başlatın, ardından DFWriter uygulamasını çalıştırın, çıktı aşağıdaki gibidir:
Yukarıdaki sonucu JSON dosyasının içeriğiyle karşılaştırırsanız, ikisinin tutarlı olduğunu gösterecektir ve bu da beklenen sonuçtur.
Daha sonra DFReader'ı çalıştırın, çıktı aşağıdaki gibidir:
Bu aynı zamanda istenen çıktıdır.
4. Özet
Bu makale sayesinde, Ignite ve Spark'ın entegrasyonunun çok basit olduğunu göreceksiniz ve IgniteRDD'ye birden çok programlama dili kullanarak birden çok ortamdan nasıl kolayca erişebileceğinizi de görebilirsiniz. Veriler IgniteRDD'ye okunabilir ve yazılabilir ve Spark kapatılsa bile, Ignite aracılığıyla korunabilir ve ayrıca Ignite aracılığıyla DataFrame'i okurken ve yazarken de gördüm. Okuyucular bunu kolayca deneyebilir.
Bu örneklerin kaynak kodunu istiyorsanız buradan indirebilirsiniz.
Mimar Li Yujue, mimari tasarım ve teknoloji Ar-Ge ekibi yönetimi, topluluk teknolojisi çevirmeni ve yazarı ve açık kaynak teknolojisi katılımcısı konusunda kapsamlı deneyime sahiptir. Apache Ignite teknik Çince belge tercümesinin yazarı, Ignite teknolojisinin Çin'de tanıtım / teknik destek / danışmanlık çalışmalarıyla uzun süredir uğraşmaktadır.
Bu makale, yazarın katkıda bulunduğu bir makaledir. Katkılara açığız.
Katkı içeriği gereksinimleri
Gönderme yöntemi
önemli
Tıklayın " daha fazlasını anla "Orijinal makaleyi okuyun.