Spark Streaming Stream Processing Project 7-Spark Streaming Actual Combat 2

Durum 4: Spark Streaming gerçek savaşında pencere işlevlerinin kullanımı:

Pencere mekanizması: Bir zaman periyodu içindeki veri işleme düzenli olarak gerçekleştirilir ve zaman periyodu çakışabilir.Aşağıdaki şekle bakın:

pencere uzunluğu: pencerenin uzunluğu, yukarıdaki şekildeki pencerenin uzunluğu 3'tür;

pencere aralığı: pencere aralığı, yukarıdaki şekildeki pencere aralığı 2'dir.

Bu iki parametre, daha önce bahsedilen parti boyutu ile ilgilidir. Pencereyi kullanarak, belirli bir aralıktaki verileri ne sıklıkla hesaplayabiliriz, örneğin, önceki 10 dakikadaki verileri her 10 saniyede bir hesaplayabiliriz (veriler kesinlikle örtüşecektir).

/ Her 10 saniyede bir son 30 saniyelik veriyi azaltın val windowedWordCounts = pairs.reduceByKeyAndWindow ((a: Int, b: Int) = > (a + b), Saniye (30), Saniye (10))

Örnek 5: Gerçek Kıvılcım Akışı savaşında kara liste filtreleme:

Gereksinimler: Belirtilen verileri veri akışından filtreleyin

Aşağıdaki erişim günlüğü (DStream) varsa:

20180808, zhangsan

20180808, lisi

20180808, wangwu

Kara liste tablosu (RDD):

Zhangsan

lisi

Kara listedeki adları filtreledikten sonra, yalnızca 20180808, wangwu çıktı

Fikirler:

İlk olarak, erişim günlüğünde (DStream) aşağıdaki işlemleri yapın:

== > (zhangsan: 20180808, zhangsan) (lisi: 20180808, lisi) (wangwu: 20180808, wangwu) anahtar / değer olarak yazılır, burada anahtar ad ve değer orijinal erişim günlüğüdür

Ardından kara listede (RDD) benzer işlemleri yapın:

== > (zhansan: doğru) (lisi: doğru)

Sonra kullanırız sol yönden katılım , Aşağıdaki sonuçları alın:

(Zhangsan:]

(Lisi:]

(Wangwu:]

Son olarak, yalnızca doğru olan ikincisini filtrelememiz gerekir ve gerisi istediğimiz sonuçtur.

org.apache.spark.SparkConf dosyasını içe aktar org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @yazar YuZhansheng * @desc kara liste filtreleme * @create 2019-02-2015:33 * / nesne TransformApp { def main (değiştirgeler: Dizi): Birim = { val sparkConf = new SparkConf (). setMaster ("yerel"). setAppName ("TransformApp") // StreamingContext'in oluşturulması iki parametre gerektirir: SparkConf ve toplu iş aralığı val ssc = new StreamingContext (sparkConf, Seconds (5)) // Bir kara liste oluşturun val blacks = Liste ("zhangsan", "lisi") val blacksRDD = ssc.sparkContext.parallelize (siyahlar) .map (x = > (x, doğru)) val satırları = ssc.socketTextStream ("localhost", 6789) val clicklog = lines.map (x = > (x.split (",") (1), x)). dönüşümü (rdd = > { rdd.leftOuterJoin (blacksRDD) .filter (x = > x._2._2.getOrElse (yanlış)! = doğru) .map (x = > x._2._1) }) clicklog.print () ssc.start () ssc.awaitTermination () } }

Test: Verileri göndermek için istemcide nc -lk 6789 komutunu kullanın ve doğrulama için konsol tarafından yazdırılan sonucu görün.

Örnek 6: Spark Streaming, kelime frekansı istatistikleri işlemini tamamlamak için SparkSQL'i entegre eder

Önce bağımlılıkları ekleyin

< ! - SparkSQL bağımlılığı-- > < bağımlılık > < Grup kimliği > org.apache.spark < /Grup kimliği > < artifactId > spark-sql_2.11 < / artifactId > < versiyon > $ {spark.version} < / version > < /bağımlılık >

java.beans.Transient'i içe aktar org.apache.spark.SparkConf dosyasını içe aktar org.apache.spark.rdd.RDD dosyasını içe aktar org.apache.spark.sql.SparkSession'ı içe aktar org.apache.spark.streaming'i içe aktarın. {Seconds, StreamingContext, Time} / ** * @yazar YuZhansheng * @desc Spark Streaming, kelime frekansı istatistikleri işlemini tamamlamak için SparkSQL'i entegre eder * @create 2019-02-2016:03 * / object SqlNetworkWordCount { def main (değiştirgeler: Dizi): Birim = { val sparkConf = new SparkConf (). setMaster ("yerel"). setAppName ("SqlNetworkWordCount") // StreamingContext'in oluşturulması iki parametre gerektirir: SparkConf ve toplu iş aralığı val ssc = new StreamingContext (sparkConf, Seconds (5)) val satırları = ssc.socketTextStream ("localhost", 6789) val sözcükler = lines.flatMap (_. bölme ("")) words.foreachRDD {(rdd: RDD, zaman: Zaman) = > val spark = SparkSessionSingleton.getInstance (rdd.sparkContext.getConf) import spark.implicits._ val wordsDataFrame = rdd.map (w = > Kayıt (w)). ToDF () wordsDataFrame.createOrReplaceTempView ("kelimeler") val wordCountsDataFrame = spark.sql ("kelime seç, kelime gruplarından toplam olarak sayı (*)") println (s "======= $ zaman =======") wordCountsDataFrame.show () } ssc.start () ssc.awaitTermination () } durum sınıfı Kaydı (kelime: Dize) object SparkSessionSingleton { @Transient özel var örneği: SparkSession = _ def getInstance (sparkConf: SparkConf): SparkSession = { eğer (örnek == null) { örnek = SparkSession .builder () .config (sparkConf) .getOrCreate () } örnek } } }

Test: Verileri göndermek için istemcide nc -lk 6789 komutunu kullanın ve doğrulama için konsol tarafından yazdırılan sonucu görün.

+ ---- + ----- +

| kelime | toplam |

+ ---- + ----- +

| ds | 2 |

| e | 1 |

| d | 1 |

| a | 2 |

| olarak | 1 |

+ ---- + ----- +

(Bu vaka, çalışırken daha fazla günlük yazdıracaktır.)

Küçük bir ipucu | Yuvarlanan pantolon da incelenmeye değer bir konudur, cidden anladınız mı?
önceki
Arkadaş Anları | Bu demiryolu geçiş işaretlerini tanıyın, istasyondan ayrılıp transfer ettiğinizde yakalanmayacaksınız!
Sonraki
"Crazy Aliens" şovun ilk gününde 400 milyon gişe rekorunu kırdı, Huang Bo ve Shen Teng peri grubu tarafından övüldü
Gionee W919, Sanayi ve Bilgi Teknolojileri Bakanlığı'nda tanıtıldı, Samsungun rakibi olabilir mi?
Takım savaşının hasatçısı Cengiz Han, kolayca beş kişiyi öldürür!
Okuduktan sonra panikledim! Koyu Yeşil henüz yayınlanmadı, YEEZY BOOST 350 V2 iki yeni renk yayınladı!
2018'de NLP alanı sıcak olmaya devam ediyor ve üreticiler iniş senaryoları bulmayı umuyor
Range Rover ailesinin yeni üyesi Land Rover Starburst resmi haritası planlanandan önce açıklandı
Spark Akışı Akışı İşleme Projesi 5-Spark Akışı ile Başlarken
"Dolaşan Dünya", Bahar Şenliği'nin en güçlü ağızdan ağza filmi oldu
Yaşamın tadının ölümden daha iyi olduğunu deneyimlemek istiyorsanız, Xiahou Dun bunu sizin için yapmaya hazır
Nihai Starbucks'a pazarlama eğlencesi: Sadece bir kedinin pençe kupası değil, aklınıza gelmeyecek kadar çok patlama var
Satın alamıyorsanız, lütfen kontrol edin! Yeni sıcak marka M + RC NOIR'in yeni serisi çıktı!
Yeni Volkswagen Metway / Kelway 351.800 fiyatla satışa sunuluyor
To Top