Önce bağımlılıkları ekleyin
< ! - SparkStreaming, Flume'un bağımlılıklarını entegre eder - > < bağımlılık > < Grup kimliği > org.apache.spark < /Grup kimliği > < artifactId > spark-streaming-flume_2.11 < / artifactId > < versiyon > $ {spark.version} < / version > < /bağımlılık >Entegrasyon yöntemi 1: İtin
Flume yapılandırma dosyası flume_push_streaming.conf:
simple-agent.sources = netcat-source
simple-agent.sinks = avro-havuz
simple-agent.channels = bellek-kanalı
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop0
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = hadoop0
simple-agent.sinks.avro-sink.port = 41414
simple-agent.channels.memory-channel.type = bellek
simple-agent.sources.netcat-source.channels = bellek-kanalı
simple-agent.sinks.avro-sink.channel = bellek-kanalı
Kod:
org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.flume.FlumeUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @ yazar YuZhansheng * @ desc Spark Streaming ve Flume-push yönteminin entegrasyonu * @ oluştur 2019-02-2110:04 * / object FlumePushWordCount { def main (değiştirgeler: Dizi): Birim = { val sparkConf = new SparkConf (). setMaster ("yerel"). setAppName ("FlumePushWordCount") val ssc = new StreamingContext (sparkConf, Seconds (5)) //TODO... Flume'u entegre etmek için SparkStreaming nasıl kullanılır val flumeStream = FlumeUtils.createStream (ssc, "hadoop0", 41414) flumeStream.map (x = > new String (x.event.getBody.array ()). trim) .flatMap (_. split ("")). map ((_, 1)). lessByKey (_ + _). print () ssc.start () ssc.awaitTermination () } }Önce SparkStreaming programını çalıştırın ve ardından Flume'u başlatın
Flume komutunu başlatın: flume-ng aracısı --name basit-aracı --conf $ FLUME_HOME / conf --conf-file $ FLUME_HOME / conf / flume_push_streaming.conf -Dflume.root.logger = INFO, konsol
Test doğrulaması: telnet hadoop044444
Geliştirilmiş sürüm: Ana bilgisayar adı ve bağlantı noktası numarası, programın genişletilmesine elverişli olmayan programda ölü olarak yazılmıştır, bu nedenle SparkStreaming kodunu yükseltin
org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.flume.FlumeUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @yazar YuZhansheng * @desc Spark Streaming ve Flume-push yönteminin entegrasyonu * @create 2019-02-2110:04 * / object FlumePushWordCount { def main (değiştirgeler: Dizi): Birim = { // Yargı parametreleri eğer (args.length! = 2) { System.err.println ("Kullanım: FlumePushWordCount < ana bilgisayar adı > < Liman > ") System.exit (1) } // Ana bilgisayar adını ve bağlantı noktası numarasını parametreler aracılığıyla iletin val Array (ana bilgisayar adı, bağlantı noktası) = değiştirgeler val sparkConf = new SparkConf (). setMaster ("yerel"). setAppName ("FlumePushWordCount") val ssc = new StreamingContext (sparkConf, Seconds (5)) //TODO... Flume'u entegre etmek için SparkStreaming nasıl kullanılır val flumeStream = FlumeUtils.createStream (ssc, hostname, port.toInt) flumeStream.map (x = > new String (x.event.getBody.array ()). trim) .flatMap (_. split ("")). map ((_, 1)). lessByKey (_ + _). print () ssc.start () ssc.awaitTermination () } }IDEA'da parametre bilgileri nasıl iletilir?
Entegrasyon yöntemi 2: Çekin
İki bağımlılık daha ekleyin:
< bağımlılık > < Grup kimliği > org.apache.spark < /Grup kimliği > < artifactId > spark-streaming-kanal-havuz_2.11 < / artifactId > < versiyon > $ {spark.version} < / version > < /bağımlılık > < bağımlılık > < Grup kimliği > org.apache.commons < /Grup kimliği > < artifactId > commons-lang3 < / artifactId > < versiyon > 3.5 < / version > < /bağımlılık >Flume yapılandırma dosyası flume_pull_streaming.conf:
simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = bellek-kanalı
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop0
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop0
simple-agent.sinks.spark-sink.port = 41414
simple-agent.channels.memory-channel.type = bellek
simple-agent.sources.netcat-source.channels = bellek-kanalı
simple-agent.sinks.spark-sink.channel = bellek kanalı
Kod:
org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.flume.FlumeUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @yazar YuZhansheng * @desc Spark Streaming ve Flume-pull yönteminin entegrasyonu * @create 2019-02-2110:04 * / object FlumePullWordCount { def main (değiştirgeler: Dizi): Birim = { // Yargı parametreleri eğer (args.length! = 2) { System.err.println ("Kullanım: FlumePushWordCount < ana bilgisayar adı > < Liman > ") System.exit (1) } // Ana bilgisayar adını ve bağlantı noktası numarasını parametreler aracılığıyla iletin val Array (ana bilgisayar adı, bağlantı noktası) = değiştirgeler val sparkConf = new SparkConf (). setMaster ("yerel"). setAppName ("FlumePullWordCount") val ssc = new StreamingContext (sparkConf, Seconds (5)) //TODO... Flume'u entegre etmek için SparkStreaming nasıl kullanılır val flumeStream = FlumeUtils.createPollingStream (ssc, hostname, port.toInt) flumeStream.map (x = > new String (x.event.getBody.array ()). trim) .flatMap (_. split ("")). map ((_, 1)). lessByKey (_ + _). print () ssc.start () ssc.awaitTermination () } }Önce Flume'u, ardından SparkStreaming'i başlatın
flume-ng aracısı --name basit-aracı --conf $ FLUME_HOME / conf --conf-dosyası $ FLUME_HOME / conf / flume_pull_streaming.conf -Dflume.root.logger = INFO, konsol
İstisna: java.lang.IllegalStateException: begin () işlem AÇIK olduğunda çağrılır!
Jar paketi, flume klasörünün altındaki lib dizininde çakışıyor ve scala-kitaplığı sürümü sorunu jar paketi çakışmasına neden oluyor.Maven deposunda yerel olarak yüklenmiş Scala sürümüyle tutarlı bir jar bulun ve yerel ortamım gibi orijinali değiştirin Scala-library-2.11.8.jar mı
Orijinalin ne olduğunu bilmiyorum. Bu durumu test etmek için netcat komutunu kullandığımda aşağıdaki sorunlar her zaman ortaya çıktı:
# telnet localhost 44444
Deneniyor :: 1 ...
telnet: adrese bağlan :: 1: Bağlantı reddedildi
127.0.0.1 deneniyor ...
telnet: 127.0.0.1 adresine bağlan: Bağlantı reddedildi
Belirli bir dosyanın artışını izlemek için kanal yapılandırma dosyasını değiştirmekten ve veri kaynağı kaynağını değiştirmekten başka seçeneğim yok. Yapılandırma içeriği aşağıdaki gibidir:
simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = bellek-kanalı
simple-agent.sources.netcat-source.type = exec
simple-agent.sources.netcat-source.command = tail -F /soft/flume1.6/data/data.log
simple-agent.sources.netcat-source.shell = / bin / sh -c
simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop0
simple-agent.sinks.spark-sink.port = 41414
simple-agent.channels.memory-channel.type = bellek
simple-agent.sources.netcat-source.channels = bellek-kanalı
simple-agent.sinks.spark-sink.channel = bellek kanalı
SparkStreaming programı değişmeden kalır.İlk olarak, kanalı başlatın ve ardından spark programını çalıştırın, data.log dosyasına veri ekleyin ve konsol çıktısını doğru şekilde görün.