Spark Streaming Stream İşleme Projesi 8-Spark Streaming ve Flume Entegrasyonu

Önce bağımlılıkları ekleyin

< ! - SparkStreaming, Flume'un bağımlılıklarını entegre eder - > < bağımlılık > < Grup kimliği > org.apache.spark < /Grup kimliği > < artifactId > spark-streaming-flume_2.11 < / artifactId > < versiyon > $ {spark.version} < / version > < /bağımlılık >

Entegrasyon yöntemi 1: İtin

Flume yapılandırma dosyası flume_push_streaming.conf:

simple-agent.sources = netcat-source

simple-agent.sinks = avro-havuz

simple-agent.channels = bellek-kanalı

simple-agent.sources.netcat-source.type = netcat

simple-agent.sources.netcat-source.bind = hadoop0

simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.avro-sink.type = avro

simple-agent.sinks.avro-sink.hostname = hadoop0

simple-agent.sinks.avro-sink.port = 41414

simple-agent.channels.memory-channel.type = bellek

simple-agent.sources.netcat-source.channels = bellek-kanalı

simple-agent.sinks.avro-sink.channel = bellek-kanalı

Kod:

org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.flume.FlumeUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @ yazar YuZhansheng * @ desc Spark Streaming ve Flume-push yönteminin entegrasyonu * @ oluştur 2019-02-2110:04 * / object FlumePushWordCount { def main (değiştirgeler: Dizi): Birim = { val sparkConf = new SparkConf (). setMaster ("yerel"). setAppName ("FlumePushWordCount") val ssc = new StreamingContext (sparkConf, Seconds (5)) //TODO... Flume'u entegre etmek için SparkStreaming nasıl kullanılır val flumeStream = FlumeUtils.createStream (ssc, "hadoop0", 41414) flumeStream.map (x = > new String (x.event.getBody.array ()). trim) .flatMap (_. split ("")). map ((_, 1)). lessByKey (_ + _). print () ssc.start () ssc.awaitTermination () } }

Önce SparkStreaming programını çalıştırın ve ardından Flume'u başlatın

Flume komutunu başlatın: flume-ng aracısı --name basit-aracı --conf $ FLUME_HOME / conf --conf-file $ FLUME_HOME / conf / flume_push_streaming.conf -Dflume.root.logger = INFO, konsol

Test doğrulaması: telnet hadoop044444

Geliştirilmiş sürüm: Ana bilgisayar adı ve bağlantı noktası numarası, programın genişletilmesine elverişli olmayan programda ölü olarak yazılmıştır, bu nedenle SparkStreaming kodunu yükseltin

org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.flume.FlumeUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @yazar YuZhansheng * @desc Spark Streaming ve Flume-push yönteminin entegrasyonu * @create 2019-02-2110:04 * / object FlumePushWordCount { def main (değiştirgeler: Dizi): Birim = { // Yargı parametreleri eğer (args.length! = 2) { System.err.println ("Kullanım: FlumePushWordCount < ana bilgisayar adı > < Liman > ") System.exit (1) } // Ana bilgisayar adını ve bağlantı noktası numarasını parametreler aracılığıyla iletin val Array (ana bilgisayar adı, bağlantı noktası) = değiştirgeler val sparkConf = new SparkConf (). setMaster ("yerel"). setAppName ("FlumePushWordCount") val ssc = new StreamingContext (sparkConf, Seconds (5)) //TODO... Flume'u entegre etmek için SparkStreaming nasıl kullanılır val flumeStream = FlumeUtils.createStream (ssc, hostname, port.toInt) flumeStream.map (x = > new String (x.event.getBody.array ()). trim) .flatMap (_. split ("")). map ((_, 1)). lessByKey (_ + _). print () ssc.start () ssc.awaitTermination () } }

IDEA'da parametre bilgileri nasıl iletilir?

Entegrasyon yöntemi 2: Çekin

İki bağımlılık daha ekleyin:

< bağımlılık > < Grup kimliği > org.apache.spark < /Grup kimliği > < artifactId > spark-streaming-kanal-havuz_2.11 < / artifactId > < versiyon > $ {spark.version} < / version > < /bağımlılık > < bağımlılık > < Grup kimliği > org.apache.commons < /Grup kimliği > < artifactId > commons-lang3 < / artifactId > < versiyon > 3.5 < / version > < /bağımlılık >

Flume yapılandırma dosyası flume_pull_streaming.conf:

simple-agent.sources = netcat-source

simple-agent.sinks = spark-sink

simple-agent.channels = bellek-kanalı

simple-agent.sources.netcat-source.type = netcat

simple-agent.sources.netcat-source.bind = hadoop0

simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink

simple-agent.sinks.spark-sink.hostname = hadoop0

simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = bellek

simple-agent.sources.netcat-source.channels = bellek-kanalı

simple-agent.sinks.spark-sink.channel = bellek kanalı

Kod:

org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.flume.FlumeUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @yazar YuZhansheng * @desc Spark Streaming ve Flume-pull yönteminin entegrasyonu * @create 2019-02-2110:04 * / object FlumePullWordCount { def main (değiştirgeler: Dizi): Birim = { // Yargı parametreleri eğer (args.length! = 2) { System.err.println ("Kullanım: FlumePushWordCount < ana bilgisayar adı > < Liman > ") System.exit (1) } // Ana bilgisayar adını ve bağlantı noktası numarasını parametreler aracılığıyla iletin val Array (ana bilgisayar adı, bağlantı noktası) = değiştirgeler val sparkConf = new SparkConf (). setMaster ("yerel"). setAppName ("FlumePullWordCount") val ssc = new StreamingContext (sparkConf, Seconds (5)) //TODO... Flume'u entegre etmek için SparkStreaming nasıl kullanılır val flumeStream = FlumeUtils.createPollingStream (ssc, hostname, port.toInt) flumeStream.map (x = > new String (x.event.getBody.array ()). trim) .flatMap (_. split ("")). map ((_, 1)). lessByKey (_ + _). print () ssc.start () ssc.awaitTermination () } }

Önce Flume'u, ardından SparkStreaming'i başlatın

flume-ng aracısı --name basit-aracı --conf $ FLUME_HOME / conf --conf-dosyası $ FLUME_HOME / conf / flume_pull_streaming.conf -Dflume.root.logger = INFO, konsol

İstisna: java.lang.IllegalStateException: begin () işlem AÇIK olduğunda çağrılır!

Jar paketi, flume klasörünün altındaki lib dizininde çakışıyor ve scala-kitaplığı sürümü sorunu jar paketi çakışmasına neden oluyor.Maven deposunda yerel olarak yüklenmiş Scala sürümüyle tutarlı bir jar bulun ve yerel ortamım gibi orijinali değiştirin Scala-library-2.11.8.jar mı

Orijinalin ne olduğunu bilmiyorum. Bu durumu test etmek için netcat komutunu kullandığımda aşağıdaki sorunlar her zaman ortaya çıktı:

# telnet localhost 44444

Deneniyor :: 1 ...

telnet: adrese bağlan :: 1: Bağlantı reddedildi

127.0.0.1 deneniyor ...

telnet: 127.0.0.1 adresine bağlan: Bağlantı reddedildi

Belirli bir dosyanın artışını izlemek için kanal yapılandırma dosyasını değiştirmekten ve veri kaynağı kaynağını değiştirmekten başka seçeneğim yok. Yapılandırma içeriği aşağıdaki gibidir:

simple-agent.sources = netcat-source

simple-agent.sinks = spark-sink

simple-agent.channels = bellek-kanalı

simple-agent.sources.netcat-source.type = exec

simple-agent.sources.netcat-source.command = tail -F /soft/flume1.6/data/data.log

simple-agent.sources.netcat-source.shell = / bin / sh -c

simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink

simple-agent.sinks.spark-sink.hostname = hadoop0

simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = bellek

simple-agent.sources.netcat-source.channels = bellek-kanalı

simple-agent.sinks.spark-sink.channel = bellek kanalı

SparkStreaming programı değişmeden kalır.İlk olarak, kanalı başlatın ve ardından spark programını çalıştırın, data.log dosyasına veri ekleyin ve konsol çıktısını doğru şekilde görün.

Huang Zhongun taret çarpması, kuleyi parçaladı ve çifte baskı uyguladı
önceki
Enfes performans ve detaylar, harika sahnelenen "seri plan" "Kusursuz Hükümet" "Gerçek Güç" izleyiciler tarafından övgüyle karşılandı
Sonraki
Yingzheng dövüştüğünde, tank çıtır çıtırdı!
Dubai Prime Hastanesi'ne HDL Buspro akıllı kontrol sistemi uygulandı
Küçük bir ipucu | Yuvarlanan pantolon da incelenmeye değer bir konudur, cidden anladınız mı?
Spark Streaming Stream Processing Project 7-Spark Streaming Actual Combat 2
Arkadaş Anları | Bu demiryolu geçiş işaretlerini tanıyın, istasyondan ayrılıp transfer ettiğinizde yakalanmayacaksınız!
"Crazy Aliens" şovun ilk gününde 400 milyon gişe rekorunu kırdı, Huang Bo ve Shen Teng peri grubu tarafından övüldü
Gionee W919, Sanayi ve Bilgi Teknolojileri Bakanlığı'nda tanıtıldı, Samsungun rakibi olabilir mi?
Takım savaşının hasatçısı Cengiz Han, kolayca beş kişiyi öldürür!
Okuduktan sonra panikledim! Koyu Yeşil henüz yayınlanmadı, YEEZY BOOST 350 V2 iki yeni renk yayınladı!
2018'de NLP alanı sıcak olmaya devam ediyor ve üreticiler iniş senaryoları bulmayı umuyor
Range Rover ailesinin yeni üyesi Land Rover Starburst resmi haritası planlanandan önce açıklandı
Spark Akışı Akışı İşleme Projesi 5-Spark Akışı ile Başlarken
To Top