Spark Streaming Stream Processing Projesi 9-Spark Streaming, Kafka savaşını entegre ediyor

Durum 1-Alıcıya Göre Entegrasyon

Zookeeper'ı başlatın

Kafka'yı başlatın ve Kafka'yı dört makinede başlatmak için kabuk betiği aşağıdaki gibidir:

brokers = "hadoop0 hadoop1 hadoop2 hadoop3"

kafka_home = "/ yumuşak / kafka"

$ brokerler için

yapmak

echo "kafka $ {i} tarihinde başlatılıyor ..."

ssh $ {i} "kaynak / etc / profile; nohup sh $ {kafka_home} /bin/kafka-server-start.sh $ {kafka_home} /config/server.properties > / dev / null 2 > 1 "

eğer>; sonra

echo "$ {i} adresinde kafka'yı başlat tamam!"

fi

bitti

echo kafka kafka başladı!

çıkış 0

Bir konu oluşturun: ./kafka-topics.sh --create --zookeeper localhost: 2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic

Oluşturmanın başarılı olup olmadığını kontrol edin: ./kafka-topics.sh --list --zookeeper localhost: 2181

Bu konu için normal üretim ve tüketim verilerinin mevcut olup olmadığını konsol aracılığıyla test edin:

Kafka yapımcısı: ./kafka-console-producer.sh --broker-list 192.168.25.128:9092 --topic kafka_streaming_topic

Bir makineyi değiştirmek bir tüketici yaratır:

kafka tüketici: ./kafka-console-consumer.sh --zookeeper localhost: 2181 --topic kafka_streaming_topic

Test mevcuttur.

Artık Spark Streaming uygulamaları geliştirebilirsiniz.

İlk olarak bağımlılıkları ekleyin:

< bağımlılık > < Grup kimliği > org.apache.spark < /Grup kimliği > < artifactId > spark-streaming-kafka-0-8_2.11 < / artifactId > < versiyon > $ {spark.version} < / version > < /bağımlılık >

program:

org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.kafka.KafkaUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @yazar YuZhansheng * @desc Spark Streaming, Alıcı entegrasyonuna dayalı kafka yöntem 1'e bağlanır * @create 2019-02-2311:28 * / object KafkaReceiverWordCount { def main (değiştirgeler: Dizi): Birim = { // Parametreler IDEA üzerinden aktarılır eğer (args.length! = 4) { System.err.println ("Kullanım: KafkaReceiverWordCount < zkQuorum > < grup > < konular > < numThreads > ") } val Array (zkQuorum, grup, konular, numThreads) = args val sparkConf = new SparkConf (). setAppName ("KafkaReceiverWordCount"). setMaster ("yerel") val ssc = new StreamingContext (sparkConf, Seconds (5)) val topicMap = konular.split (","). harita ((_, numThreads.toInt)). toMap // TODO ..SparkStreaming, Kafka'ya bağlanır // val kafkaStream = KafkaUtils.createStream (streamingContext ,,,) val mesajlar = KafkaUtils.createStream (ssc, zkQuorum, group, topicMap) messages.map (_._ 2) .flatMap (_. bölme ("")). harita ((_, 1)). indirgemeByKey (_ + _). print () ssc.start () ssc.awaitTermination () } }

IDEA aracılığıyla parametreleri geçirin:

Ölçek:

Böylece yerel ortam testi geçti!

Ortak hata ayıklama ve test için sunucuya yüklerken, setAppName ("KafkaReceiverWordCount"). SetMaster ("local") yorumlanmalıdır. Yüklemeyi paketlemek için maven'i kullanın; komutu gönderin:

spark-submit \

--class com.xidian.spark.KafkaReceiverWordCount \

- yönetici yerel \

--name KafkaReceiverWordCount \

--packages org.apache.spark: spark-streaming-kafka-0-8_2.11: 2.2.0 \

/root/Project/spark-1.0.jar hadoop0: 2181 test kafka_streaming_topic 1

Durum 2-Doğrudan tabanlı entegrasyon (Alıcı Yok)

Spark 1.3, daha iyi uçtan-uca garantiler sağlamak için bu yeni alıcısız doğrudan yöntemi tanıttı. Bu yöntem, alıcıyı veri almak için kullanmaz, ancak her konu bölümündeki en son ofseti elde etmek için Periyodik olarak Kafka'yı sorgular ve her partide buna göre işlenecek ofset aralığını tanımlar. Veri işleyen bir işe başlarken, Kafka'nın basit tüketici API'si, Kafka'dan tanımlanmış ofset aralığını okumak için kullanılır (dosya sisteminden bir dosyayı okumaya benzer). Bu özelliğin Scala ve Java API'sinin Spark 1.3'ünde ve Python API'sinin Spark 1.4'ünde sunulduğunu unutmayın.

Alıcı tabanlı yöntemle (Yöntem 1) karşılaştırıldığında, bu yöntemin aşağıdaki avantajları vardır.

Paralelliği basitleştirin: Birden fazla giriş Kafka akışı oluşturup bunları birleştirmeye gerek yoktur. DirectStream kullanarak, Spark akışı mümkün olduğunca çok RDD bölümü oluşturacaktır çünkü kullanılacak Kafka bölümleri vardır ve bu bölümler verileri Kafka'dan paralel olarak okuyacaktır. Dolayısıyla, Kafka ve RDD bölümleri arasında anlaşılması ve ayarlanması daha kolay olan bire bir eşleştirme var.

etkililik : İlk yöntemde, sıfır veri kaybının ön yazma günlüğünde depolanması gerekir, bu da verileri daha fazla kopyalayacaktır. Bu aslında verimsizdir, çünkü veriler Kafka tarafından bir kez ve İleriye Yazma günlüğü tarafından ikinci kez etkili bir şekilde çoğaltılır. İkinci yöntem, alıcı olmadığı için bu sorunu ortadan kaldırır, bu nedenle önceden günlükleri yazmaya gerek yoktur. Yeterli Kafka rezervasyonunuz olduğu sürece Kafka'dan mesajları kurtarabilirsiniz.

Bir zamanlar kesin anlambilim : İlk yöntem, Zookeeper'da tüketim ofsetini depolamak için Kafka'nın gelişmiş API'sini kullanır. Bu, Kafka verilerini kullanmanın geleneksel yoludur. Bu yöntem (ön yazma günlüğü ile birleştirildiğinde) sıfır veri kaybını (yani en az bir kez anlambilim) sağlayabilse de, bazı kayıtlar bazı hata durumlarında iki kez kullanılabilir. Bunun nedeni, Spark akışı tarafından güvenilir bir şekilde alınan veriler ile Zookeeper tarafından izlenen ofset arasında bir tutarsızlık olmasıdır. Bu nedenle, ikinci yöntemde, Zookeeper kullanmayan basit bir Kafka API kullanıyoruz. Ofset, kontrol noktası içindeki Spark akışı aracılığıyla izlenir. Bu, Spark akışı ile Zookeeper / Kafka arasındaki tutarsızlığı ortadan kaldırır, böylece hataya rağmen, Spark akışı her kaydı bir kez etkili bir şekilde alabilir. Sonucun çıktısı için tek seferlik kesin anlambilim elde etmek için, verileri harici veri deposuna kaydeden çıktı işlemi idempotent veya sonucu ve ofseti kaydeden atomik bir işlem olmalıdır (daha fazla bilgi için lütfen ana programlama kılavuzuna bakın Çıktı işlemlerinin semantiği).

Lütfen bu yöntemin bir dezavantajının Zookeeper'daki ofseti güncellememesidir, bu nedenle Zookeeper'ı temel alan Kafka izleme aracı ilerlemeyi görüntülemeyecektir. Ancak, her partide bu yöntemle işlenen ofsete erişebilir ve Zookeeper'ı kendiniz güncelleyebilirsiniz.

Not: Durum 1: Alıcı tabanlı entegrasyon hem eski sürümde (0.8. *. * - 0.10. *. *) Hem de yeni sürümde (0.10. *. * Ve üzeri) kullanılabilir, ancak yerel kafka sürümü 0.11.0.1'dir, bu nedenle Pom dosyasını değiştirmeniz gerekiyor:

< bağımlılık > < Grup kimliği > org.apache.spark < /Grup kimliği > < artifactId > spark-streaming-kafka-0-10_2.11 < / artifactId > < versiyon > $ {spark.version} < / version > < /bağımlılık >

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

org.apache.spark.SparkConf dosyasını içe aktar

import org.apache.spark.streaming.dstream. {DStream, InputDStream}

import org.apache.spark.streaming.kafka010.KafkaUtils

org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/ **

* @yazar YuZhansheng

* Kafka 2-Direct tabanlı entegrasyon ile @desc Spark Streaming bağlantı istasyonu

* @create 2019-02-2311:28

* /

nesne KafkaDirectWordCount {

def main (değiştirgeler: Dizi): Birim = {

val sparkConf = new SparkConf (). setAppName ("KafkaDirectWordCount"). setMaster ("yerel")

val ssc = new StreamingContext (sparkConf, Seconds (5))

val konular: String = "kafka_streaming_topic"

val topicarr = konular.split (",")

val brokers = "hadoop0: 9092, hadoop1: 9092, hadoop2: 9092, hadoop3: 9092"

val kafkaParams: Harita = Harita (

"bootstrap.servers" - > komisyoncu,

"key.deserializer" - > sınıfı,

"value.deserializer" - > sınıfı,

"Grup kimliği"- > "Ölçek",

"auto.offset.reset" - > "En son",

"enable.auto.commit" - > (false: java.lang.Boolean)

)

val kafka_streamDStream: InputDStream = KafkaUtils.createDirectStream (

ssc, PreferConsistent,

Abone ol (topicarr, kafkaParams))

val resDStream: DStream = kafka_streamDStream.map (satır = >

(line.offset (), line.partition (), line.value ())).

flatMap (t = > {t._3.split ("") .map (kelime = > (t._1, t._2, kelime)))).

harita (k = > ((k._1, k._2, k._3), 1)). düşürmekByKey (_ + _)

resDStream.print ()

ssc.start ()

ssc.awaitTermination ()

}

}

Ev mobilyaları ve sanat koleksiyonunun mükemmel kombinasyonu olan Unbox Industries x Filter017 yumuşak bebekler sadece satışta mevcuttur
önceki
100 günlük üniversiteye giriş sınavı ve reşit olma töreni yeminleri, bu aile mektupları onları ağlatıyor
Sonraki
Jackie Chanın gişe rekorları kıran "Dedektif Pu Songling" in ağızdan ağza bir sürprizi var, modern kardeşi Liu Yuning "Sevgiyi ifade etmeyi tercih ederim" diyor
Huang Zhongun taret çarpması, kuleyi parçaladı ve çifte baskı uyguladı
Spark Streaming Stream İşleme Projesi 8-Spark Streaming ve Flume Entegrasyonu
Enfes performans ve detaylar, harika sahnelenen "seri plan" "Kusursuz Hükümet" "Gerçek Güç" izleyiciler tarafından övgüyle karşılandı
Yingzheng dövüştüğünde, tank çıtır çıtırdı!
Dubai Prime Hastanesi'ne HDL Buspro akıllı kontrol sistemi uygulandı
Küçük bir ipucu | Yuvarlanan pantolon da incelenmeye değer bir konudur, cidden anladınız mı?
Spark Streaming Stream Processing Project 7-Spark Streaming Actual Combat 2
Arkadaş Anları | Bu demiryolu geçiş işaretlerini tanıyın, istasyondan ayrılıp transfer ettiğinizde yakalanmayacaksınız!
"Crazy Aliens" şovun ilk gününde 400 milyon gişe rekorunu kırdı, Huang Bo ve Shen Teng peri grubu tarafından övüldü
Gionee W919, Sanayi ve Bilgi Teknolojileri Bakanlığı'nda tanıtıldı, Samsungun rakibi olabilir mi?
Takım savaşının hasatçısı Cengiz Han, kolayca beş kişiyi öldürür!
To Top