Kıvılcım Akışı Akışı İşleme Projesi 11-Kapsamlı Gerçek Savaş

1. Gerçek zamanlı veriler oluşturmak üzere gerçek dünya web sitelerini simüle etmek için Python komut dosyalarını kullanın:

# kodlama = UTF-8 rastgele içe aktar ithalat zamanı url_paths = ip_slices = http_referers = search_keyword = status_codes = def sample_url (): random.sample (url_paths, 1) döndür def sample_ip (): dilim = rasgele. örnek (ip_ dilim, 4) return ".". join () def sample_referer (): rastgele ise üniform (0,1) > 0.2: dönüş "-" refer_str = random.sample (http_referers, 1) query_str = random.sample (search_keyword, 1) return refer_str.format (sorgu = sorgu_dizesi) def sample_status_code (): random.sample döndür (durum_kodları, 1) def generate_log (count = 10): time_str = time.strftime ("% Y-% m-% d% H:% M:% S", time.localtime ()) f = open ("/ root / DataSet / access.log", "w +") sayarken > = 1: query_log = "{ip} \ t {local_time} \ t \" GET / {url} HTTP / 1.1 \ "\ t {status_code} \ t {referer}". format (url = sample_url (), ip = sample_ip () , referer = sample_referer (), status_code = sample_status_code (), local_time = time_str) sorgu_ günlüğünü yazdır f.write (sorgu_ günlüğü + "\ n") count = count-1 __name__ == '__ main__' ise: create_log (100)

Her seferinde manuel olarak gerçekleştirilmesi gerektiğinde, gerçek üretim durumu ile uyumlu değildir. crontab ifadesi , Python betiğinin her dakika bir veri yığını oluşturmasına izin verin.

Linux * * * * * * ------ | | | | | | | | | | | + yıl | | | | + ----- haftanın günü (0-7) (Pazar = 0 veya 7) | | | + ---------- ay (1-12) | | + --------------- ayın günü (1-31) | + -------------------- saat (0-23) + ------------------------- dk (0-59) Java (Bahar) * * * * * * * ------- | | | | | | | | | | | | | + yıl | | | | | + ----- haftanın günü (0-7) (Pazar = 0 veya 7) | | | | + ---------- ay (1-12) | | | + --------------- ayın günü (1-31) | | + -------------------- saat (0-23) | + ------------------------- dk (0-59) + ------------------------------ saniye (0-59)

Her dakika yürütülen crontab ifadesi: * / 1 * * * *

İçinde bir cümle bulunan log_generator.sh komut dosyası yazın: python /root/Project/generate_log.py,

Bu betiğe yürütülebilir izinler verin: chmod u + x log_generator.sh

Günlükleri oluşturmak üzere Python programını çalıştırmak için log_generator.sh betiğini çalıştırın.

Her dakika log_generator.sh betiğini çalıştıran crontab ifadesi şu şekilde yazılır: * / 1 * * * * /root/Project/log_generator.sh

İlk önce şu komutu kullanıyoruz: crontab -e Ve sonra koy * / 1 * * * * /root/Project/log_generator.sh içine kopyalayın, kaydedin ve çıkın, çalıştırabilirsiniz!

2. Python günlük oluşturucunun günlük çıktısını Flume'a bağlayın

Flume yapılandırma dosyasını streaming_project.conf yazın:

exec-memory-logger.sources = exec-source

exec-memory-logger.sinks = logger-sink

exec-memory-logger.channels = bellek-kanalı

exec-memory-logger.sources.exec-source.type = exec

exec-memory-logger.sources.exec-source.command = tail -F /root/DataSet/access.log

exec-memory-logger.sources.exec-source.shell = / bin / sh -c

exec-memory-logger.channels.memory-channel.type = bellek

exec-memory-logger.sinks.logger-sink.type = günlükçü

exec-memory-logger.sources.exec-source.channels = bellek-kanalı

exec-memory-logger.sinks.logger-sink.channel = bellek-kanalı

Başlat komutu: flume-ng aracısı -n exec-memory-logger -c /soft/flume1.6/conf/ -f /soft/flume1.6/conf/streaming_project.conf -Dflume.root.logger = INFO, konsol

Konsolu inceleyin ve bağlantının başarılı olduğunu belirten aşağıdaki çıktıyı yazdırın:

2019-02-2611: 08: 06,713 (SinkRunner-PollingRunner-DefaultSinkProcessor) Olay: {üstbilgiler: {} gövde: 3133322E 33302E 38372E 363309323031132.30.87.63.201}

2019-02-2611: 08: 06,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) Olay: {üstbilgiler: {} gövde: 33302E 3133322E 3136372E 373209323030.132.167.72.20}

2019-02-2611: 08: 06,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) Olay: {üstbilgiler: {} gövde: 3132342E 3135362E 36332E 3535093230124.156.63.55.20}

3. Kanal havuzu verilerini kafka'ya dönüştürmek için Flume yapılandırma dosyasını değiştirin:

streaming_project2.conf

exec-memory-logger.sources = exec-source

exec-memory-logger.sinks = kafka-sink

exec-memory-logger.channels = bellek-kanalı

exec-memory-logger.sources.exec-source.type = exec

exec-memory-logger.sources.exec-source.command = tail -F /root/DataSet/access.log

exec-memory-logger.sources.exec-source.shell = / bin / sh -c

exec-memory-logger.channels.memory-channel.type = bellek

exec-memory-logger.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink

exec-memory-logger.sinks.kafka-sink.brokerList = hadoop0: 9092, hadoop1: 9092, hadoop2: 9092, hadoop3: 9092

exec-memory-logger.sinks.kafka-sink.topic = streamingtopic

exec-memory-logger.sinks.kafka-sink.batchSize = 5

exec-memory-logger.sinks.kafka-sink.requiredAcks = 1

exec-memory-logger.sources.exec-source.channels = bellek-kanalı

exec-memory-logger.sinks.kafka-sink.channel = bellek-kanalı

Başka bir makinede bir kafka tüketicisi başlatın: ./kafka-console-consumer.sh --zookeeper hadoop0: 2181 --topic streamingtopic ve ardından flume: flume-ng agent -n exec-memory-logger -c / soft / flume1.6 / conf / -f /soft/flume1.6/conf/streaming_project2.conf -Dflume.root.logger = INFO, konsol

Entegrasyonun başarılı olduğunu belirtmek için kafka tüketicisinin aşağıdaki verileri yazdırdığını gözlemleyin:

187.168.10.1672019-02-2611:24:01 "GET /class/112.html HTTP / 1.1" -200

156.10.124.292019-02-2611:24:01 "GET /class/131.html HTTP / 1.1" -500

63.46.29.1872019-02-2611:24:01 "GET /class/145.html HTTP / 1.1" -200

87.10.124.1672019-02-2611:24:01 "GET /class/112.html HTTP / 1.1" -500

63.29.72.1322019-02-2611:24:01 "GET /class/112.html HTTP / 1.1" -500

30.63.124.982019-02-2611:24:01 "GET /class/128.html HTTP / 1.1" https://search.yahoo.com/search?p=Hadoop temelleri 200

4. Bir sonraki adım, SparkStreaming ve Kafka'nın kenetlenmesidir

FlumeKafkaSparkStreaming'in tüm hattını açın ve kayıt numarası istatistiklerini tamamlamak için Spark uygulamasında kafka verilerini alın.

SparkStreaming uygulamasını yazın:

org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.kafka.KafkaUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @yazar YuZhansheng * @desc, Kafka'daki verileri işlemek için SparkStreaming kullanır * @create 2019-02-2611:40 * / object ImoocStatStreamingApp { def main (değiştirgeler: Dizi): Birim = { // Parametre sayısının 4 olup olmadığına karar verin, 4 değilse işlemden çıkın eğer (args.length! = 4) { println ("Kullanım: ImoocStatStreamingApp < zkQuorum > < grup > < konular > < numThreads > ") System.exit (1) } val Array (zkQuorum, groupId, konular, numThreads) = değiştirgeler val sparkConf = new SparkConf (). setAppName ("ImoocStatStreamingApp"). setMaster ("yerel") val ssc = new StreamingContext (sparkConf, Seconds (60)) val topicMap = konular.split (","). harita ((_, numThreads.toInt)). toMap val mesajlar = KafkaUtils.createStream (ssc, zkQuorum, groupId, topicMap) // Test adımı 1: Veri alımını test edin messages.map (_._ 2) .count (). yazdır ssc.start () ssc.awaitTermination () } }

Test, parametreler IDEA'ya girilir:

Konsol, entegrasyonun başarılı olduğunu gösteren aşağıdaki bilgileri yazdırır:

-------------------------------------------

Zaman: 1551165180000 ms

-------------------------------------------

100

5. Veri temizleme: ihtiyacımız olan alan bilgilerini orijinal günlükten çıkarın

Yeni bir takım sınıfı DataUtils oluşturun:

java.util.Date'i içe aktar org.apache.commons.lang3.time.FastDateFormat dosyasını içe aktar / ** * @yazar YuZhansheng * @desc Tarih ve saat araçları * @create 2019-02-2615:22 * / object DateUtils { val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance ("yyyy-AA-gg SS: dd: ss") val TARGE_FORMAT = FastDateFormat.getInstance ("yyyyMMddHHmmss") def getTime (zaman: Dize) = { YYYYMMDDHHMMSS_FORMAT.parse (zaman) .getTime } def parseToMinute (time: String) = { TARGE_FORMAT.format (yeni Tarih (getTime (saat))) } def main (değiştirgeler: Dizi): Birim = { println (parseToMinute ("2019-02-2615:22:01")) } }

ClickLog için bir etki alanı nesnesi oluşturun:

/ ** * @yazar YuZhansheng * @desc Temizlik sonrası günlük bilgileri * @param günlük erişim ip adresi * @param günlük erişim süresi * @Param günlüğü tarafından erişilen gerçek kurs numarası * @param günlüğü erişim durum kodu * @param günlük erişim referansı * @create 2019-02-2615:43 * / durum sınıfı ClickLog (ip: String, time: String, courseId: Int, statusCode: Int, referer: String)

Veri temizliğini sağlamak için aşağıdaki programı kırmızı yazı tipiyle yukarıdaki SparkStreaming programına ekleyin:

com.xidian.spark.project.domain.ClickLog dosyasını içe aktarın import com.xidian.spark.project.utils.DateUtils // Test adımı 1: Veri alımını test edin //messages.map(_._2).count().print // İkinci adımı test edin: veri temizleme val günlükleri = mesajlar.map (_._ 2) val cleanData = logs.map (satır = > { val infos = line.split ("\ t") // infos (2) = "/ sınıf/112.html HTTP / 1.1 GET // url = /class/112.html val url = infos (2) .split ("") (1) var courseId = 0 // / class ile başlayan dersin numarasını alın eğer (url.startsWith ("/ class")) { val courseIdHTML = url.split ("/") (2) courseId = courseIdHTML.substring (0, courseIdHTML.lastIndexOf (".")). toInt } ClickLog (infos (0), DateUtils.parseToMinute (infos (1)), courseId, infos (3) .toInt, infos (4)) }). filtre (clicklog = > clicklog.courseId! = 0) cleanData.print ()

Programı çalıştırın ve konsolun, veri temizleme işlevinin gerçekleştirildiğini gösteren aşağıdaki bilgileri verdiğini gözlemleyin:

-------------------------------------------

Zaman: 1551169320000 ms

-------------------------------------------

ClickLog (187.156.167.30,20190226162102,130,404, -)

ClickLog (72.87.46.124,20190226162102,128,200, https: //www.sogou.com/web? Query = Hadoop ile ilgili temel bilgiler)

ClickLog (30.168.46.187,20190226162102,146,500, -)

ClickLog (72.143.168.63,20190226162102,128,200, -)

ClickLog (168.30.132.98,20190226162102,145,404, -)

ClickLog (132.29.143.10,20190226162102,146,200, -)

..........

6. Talep: Bugüne kadar pratik kurslara (/ sınıf ile başlayan kurslar) yapılan ziyaretlerin sayısını sayın

Analiz: Bu gereksinimi yerine getirmek için, istatistiksel sonuçlarımızı depolamak için bir veritabanı kullanmamız ve istatistiksel sonuçları veritabanına yazmak için SparkStreaming kullanmamız gerekir.

yyyyMMdd kurs kimliği, veritabanındaki istatistiksel sonuçları görüntüler.

İstatistiksel sonuçların depolanması için hangi veritabanını seçmeliyim?

RDBMS: MySQL, Oracle ...

NoSQL: HBase, Redis ...

HBase'yi seçiyoruz! Açıklama yok. O (_) O

Önce HBase'i başlatın: ./start-hbase.sh

HBase Shell'i başlatın: ./hbase shell

Tüm tabloları görüntülemek için list komutunu kullanın.

Ancak aşağıdaki hata oluştu:

hbase (ana): 004: 0 * liste

TABLO

HATA: org.apache.hadoop.hbase.PleaseHoldException: Master başlatılıyor

org.apache.hadoop.hbase.master.HMaster.checkInitialized (HMaster.java:2293) adresinde

org.apache.hadoop.hbase.master.MasterRpcServices.getTableNames (MasterRpcServices.java:900) adresinde

org.apache.hadoop.hbase.protobuf.generated.MasterProtos $ MasterService $ 2.callBlockingMethod (MasterProtos.java:55650) adresinde

org.apache.hadoop.hbase.ipc.RpcServer.call adresinde (RpcServer.java:2180)

org.apache.hadoop.hbase.ipc.CallRunner.run adresinde (CallRunner.java:112)

org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop'ta (RpcExecutor.java:133)

org.apache.hadoop.hbase.ipc.RpcExecutor adresinde $ 1.run (RpcExecutor.java:108)

java.lang.Thread.run'da (Thread.java:748)

İnternette birçok çözüm bulundu, örneğin:

Hbase-site.xml dosyasındaki hbase.rootdir dosyasını aşağıdaki içeriğe değiştirin:

< Emlak > < isim > hbase.rootdir < / isim > < değer > hdfs: // redhat6: 9000 / hbase < / değer > < /Emlak >

Ama sorunumu çözmedi! Yorum alanında bir yönteme bakın:

Her düğümün saatinin senkronize edilip edilmediğini kontrol etmek için tarih girin. Senkronize edilmemişse (kendi problemlerimin% 80'i temelde senkronize edilmemiştir), tarih -s "2019-02-2614:18" girebilirsiniz (saat kendiniz ayarlanmıştır) ) Her küme düğümündeki zaman eşitlemesinden sonra hbase'i yeniden başlatın, sorun çözüldü! !

Bir HBase tablosu oluşturun: create'imooc_course_clickcount ',' info '

Satır anahtarı tasarımı: day_courseid

Sonraki adım, HBase'i çalıştırmak için Scala'yı uygulamaktır:

Önce bir varlık sınıfı tanımlayın:

/ ** * @yazar YuZhansheng * @desc gerçek savaş kursu ziyaretleri * day_course: HBase'deki satır anahtarına karşılık gelir, format: 20190227_1 * click_count: 20190227_1 gününde kursa yapılan ziyaretlerin sayısına karşılık gelir * @create 2019-02-2619:19 * / case class CourseClickCount (day_course: Dize, click_count: Uzun)

Veritabanına erişim ayrıca bir dao katmanı, yeni bir paket, yeni bir nesne, CourseClickCountDAO kurulmasını gerektirir.

Ayrıca bir Java sınıfı olan ve Java paketine yerleştirilmesi gereken bir araç sınıfı olan HBaseUtils.java oluşturmak da gereklidir.Bu sınıf HBase üzerindeki işlemleri özel olarak gerçekleştirir. İki sınıf aşağıdaki gibidir:

org.apache.hadoop.conf.Configuration dosyasını içe aktarın; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; java.io.IOException'ı içe aktarın; / ** * @yazar YuZhansheng * @desc HBase işletim aracı sınıfı, Java araç sınıfının tekli modda kapsüllenmesi önerilir * @create 2019-02-2710:11 * / public class HBaseUtils { HBaseAdmin admin = boş; Yapılandırma yapılandırması = boş; // Tekli mod, özel yapım yöntemi gerektirir özel HBaseUtils () { konfigürasyon = yeni Konfigürasyon (); configuration.set ("hbase.zookeeper.quorum", "hadoop0, hadoop1, hadoop2, hadoop3"); configuration.set ("hbase.rootdir", "hdfs: // hadoop0: 9000 / hbase"); Deneyin { admin = new HBaseAdmin (konfigürasyon); } catch (IOException e) { e.printStackTrace (); } } // Tembel tekli mod private static HBaseUtils örneği = null; genel statik senkronize HBaseUtils getInstance () { if (null == örnek) { örnek = new HBaseUtils (); } dönüş örneği; } // Tablo adına göre HTable örneğini edinin public HTable getTable (String tableName) { HTable tablo = boş; Deneyin { tablo = yeni HTable (konfigürasyon, tabloAdı); } catch (IOException e) { e.printStackTrace (); } dönüş tablosu; } / ** * HBase tablosuna bir kayıt ekleyin * @param tableName HBase tablo adı * @param rowkey HBase tablosu satır anahtarı * @param cf columnfamily of HBase table * @param column HBase tablo sütunu * @param değeri HBase tablosuna yazılan değer * / public void put (String tableName, String satır anahtarı, String cf, String sütunu, String değeri) { HTable table = getTable (tableName); Put put = new Put (Bytes.toBytes (satır anahtarı)); put.add (Bytes.toBytes (cf), Bytes.toBytes (sütun), Bytes.toBytes (değer)); Deneyin { table.put (koymak); } catch (IOException e) { e.printStackTrace (); } } // Verileri test edin, kullanırken bu ana işlevi yorumlayın public static void main (String args) { // HTable table = HBaseUtils.getInstance (). GetTable ("imooc_course_clickcount"); //System.out.println(table.getName (). GetNameAsString ()); String tableName = "imooc_course_clickcount"; Dize satır anahtarı = "20190111_88"; Dize cf = "bilgi"; Dize sütunu = "click_count"; Dize değeri = "2"; HBaseUtils.getInstance (). Put (tableName, rowkey, cf, column, value); } }

paket com.xidian.spark.project.dao com.xidian.spark.project.domain.CourseClickCount'u içe aktar ithal com.xidian.spark.project.utils.HBaseUtils org.apache.hadoop.hbase.client.Get dosyasını içe aktar import org.apache.hadoop.hbase.util.Bytes scala.collection.mutable.ListBuffer'ı içe aktar / ** * @yazar YuZhansheng * @desc Gerçek savaş parkuru isabetleri için veri erişim katmanı * @create 2019-02-279:51 * / object CourseClickCountDAO { val tableName = "imooc_course_clickcount" val cf = "bilgi" val qualifer = "click_count" // Verileri HBase'e kaydedin def save (liste: ListBuffer): Birim = { val tablo = HBaseUtils.getInstance (). getTable (tableName) için (ele < -liste){ table.incrementColumnValue (Bytes.toBytes (ele.day_course), Bytes.toBytes (cf), Bytes.toBytes (qualifer), ele.click_count ) } } // Satır anahtarına göre sorgu değeri def count (day_course: String): Uzun = { val tablo = HBaseUtils.getInstance (). getTable (tableName) val get = new Get (Bytes.toBytes (day_course)) değer değeri = table.get (get) .getValue (cf.getBytes, qualifer.getBytes) eğer (değer == null) { 0l }Başka{ Bytes.toLong (değer) } } // Programın mevcut olup olmadığını test edin def main (değiştirgeler: Dizi): Birim = { val listesi = yeni ListBuffer list.append (CourseClickCount ("20190227_8"; 8)) list.append (CourseClickCount ("20190227_9"; 18)) list.append (CourseClickCount ("20190227_1"; 12)) kaydet (liste) } }

Test: CourseClickCountDAO'nun ana işlevini (test işlevi) çalıştırın, konsol çıktısını görüntülemek için HBase kabuk konsolundaki scan'imooc_course_clickcount 'komutunu kullanın:

hbase (ana): 005: 0 > scan'imooc_course_clickcount ' SATIR SÜTUN + HÜCRE 20190211_88 sütun = bilgi: click_count, zaman damgası = 1551235301383, değer = 920190227_1 sütun = bilgi: click_count, zaman damgası = 1551236147161, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x0C 20190227_8 sütun = bilgi: click_count, zaman damgası = 1551236147119, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x0820190227_9 sütun = bilgi: click_count, zaman damgası = 1551236147150, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x120,0650 saniyede 4 satır

Veriler başarıyla eklendi.

Son adım, gereksinimleri tamamlamak için SparkStreaming programını tamamlamaktır: şimdiye kadarki gerçek muharebe kurslarına (/ class ile başlayan kurslar) yapılan ziyaretlerin sayısını sayın ve bunları HBase veritabanına yazın. Aşağıdaki programı ImoocStatStreamingApp'e ekleyin, günlükleri oluşturmak için crontab'ı başlatın, Flume'u başlatın, kafka, HBase'i başlatın, programı çalıştırın ve HBase'deki veri artışını gözlemleyin.

//cleanData.print () // Test Adım Üç: Bugüne kadar pratik kurslara yapılan ziyaretlerin sayısını (/ class ile başlayarak) sayın cleanData.map (x = > { // HBase satır anahtarı tasarımı: 20190226_8 (x.time.substring (0,8) + "_" + x.courseId, 1) }). lessByKey (_ + _). foreachRDD (rdd = > { rdd.foreachPartition (partitionRecords = > { val listesi = yeni ListBuffer partitionRecords.foreach (çift = > { list.append (CourseClickCount (pair._1, pair._2)) }) CourseClickCountDAO.save (liste) }) })

hbase (ana): 006: 0 > scan'imooc_course_clickcount ' SATIR SÜTUN + HÜCRE 20190211_88 sütun = bilgi: click_count, zaman damgası = 1551235301383, değer = 920190226_112 sütun = bilgi: click_count, zaman damgası = 1551238136748, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x0020190226_128 sütun = bilgi: click_count, zaman damgası = 1551238136477, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1820190226_130 sütun = bilgi: click_count, zaman damgası = 1551238136755, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1420190226_131 sütun = bilgi: click_count, zaman damgası = 1551238136488, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1C 20190226_145 sütun = bilgi: click_count, zaman damgası = 1551238136767, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1920190226_146 sütun = bilgi: click_count, zaman damgası = 1551238136512, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1620190227_1 sütun = bilgi: click_count, zaman damgası = 1551236147161, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x0C

Talep yerine getirildi.

6. Talep: Arama motorlarından gelen pratik kursların tıklanma sayısını sayın

Yeni bir HBase tablosu oluşturun: create'imooc_course_search_clickcount ',' info '

Rowkey tasarımı: İşletmemizin ihtiyaçlarına göre şu şekilde tasarlanmıştır: 20190227 + arama + 1

Ardından, CourseSearchClickCount adlı yeni bir varlık sınıfı oluşturun:

/ ** * Arama motorundan gelen gerçek savaş kursu hitlerinin varlık kategorisi * @param day_search_course * @param click_count * / case class CourseSearchClickCount (day_search_course: Dize, click_count: Uzun)

Veri erişim katmanı DAO:

com.xidian.spark.project.domain.CourseSearchClickCount'u içe aktar ithal com.xidian.spark.project.utils.HBaseUtils org.apache.hadoop.hbase.client.Get dosyasını içe aktar import org.apache.hadoop.hbase.util.Bytes scala.collection.mutable.ListBuffer'ı içe aktar / ** * Arama motorları-veri erişim katmanından gerçek savaş kurslarının hitleri * / object CourseSearchClickCountDAO { val tableName = "imooc_course_search_clickcount" val cf = "bilgi" val qualifer = "click_count" / ** * Verileri HBase'e kaydedin * @param list CourseSearchClickCount koleksiyonu * / def save (liste: ListBuffer): Birim = { val tablo = HBaseUtils.getInstance (). getTable (tableName) için (ele < -liste) { table.incrementColumnValue (Bytes.toBytes (ele.day_search_course), Bytes.toBytes (cf), Bytes.toBytes (qualifer), ele.click_count) } } / ** * Satır anahtarına göre sorgu değeri * / def count (day_search_course: String): Long = { val tablo = HBaseUtils.getInstance (). getTable (tableName) val get = new Get (Bytes.toBytes (day_search_course)) değer değeri = table.get (get) .getValue (cf.getBytes, qualifer.getBytes) eğer (değer == null) { 0L }Başka{ Bytes.toLong (değer) } } // // def main (değiştirgeler: Dizi): Birim = { // // // Kullanılabilirliği test edin // değer listesi = yeni ListBuffer // list.append (CourseSearchClickCount ("20190227_www.baidu.com_8", 8)) // list.append (CourseSearchClickCount ("20190227_cn.bing.com_9", 9)) // // kaydet (liste) // // println (count ("20190227_www.baidu.com_8") + ":" + count ("20190227_cn.bing.com_9")) //} }

Son adım, SparkStreaming programını değiştirmek ve istatistiksel işlevi tamamlamak için aşağıdaki programı eklemektir:

// Test Adımı 4: Bugünden bugüne arama motorundan pratik kurslara yapılan ziyaretlerin sayısını sayın cleanData.map (x = > { // Dönüştürme: https://www.sogou.com/web?query=Spark SQL gerçek savaş == > https: /www.sogou.com/web? query = Spark SQL savaşı val referer = x.referer.replaceAll ("//", "/") val bölmeler = referer.split ("/") var host = "" if (splits.length > 2) { ana bilgisayar = bölmeler (1) } (host, x.courseId, x.time) }). filtre (_._ 1! = "") .map (x = > { (x._3.substring (0,8) + "_" + x._1 + "_" + x._2, 1) }). lessByKey (_ + _). foreachRDD (rdd = > { rdd.foreachPartition (partitionRecords = > { val listesi = yeni ListBuffer partitionRecords.foreach (çift = > { list.append (CourseSearchClickCount (pair._1, pair._2)) }) CourseSearchClickCountDAO.save (liste) }) })
Land Rover'ın beşinci nesil keşfi, 798.000 RMB ila 1108.000 RMB arası satış için piyasaya sürüldü
önceki
IMAX Çin, Bahar Festivali'nin ilk gününde gişe rekoru kırdı
Sonraki
Dongfeng Honda, Mayıs ayında yeni bir CR-V / hibrit versiyonunu piyasaya sürmeyi planlıyor
Spark Streaming Stream Processing Project 10-Log Generator Geliştirme Tamamlandı Log Çıktısı
Angel, siyah sırayı hiç görmemiş olmalısın
Gerçek güç gerçekten birbirine bağlıdır, Roewe RX3, şehirli slash gençliği için dikkatle tasarlanmıştır
Derinlemesine yorum: "2019 Çin Akıllı Ev Geliştirme Teknik Raporu"
Ev mobilyaları ve sanat koleksiyonunun mükemmel kombinasyonu olan Unbox Industries x Filter017 yumuşak bebekler sadece satışta mevcuttur
Spark Streaming Stream Processing Projesi 9-Spark Streaming, Kafka savaşını entegre ediyor
100 günlük üniversiteye giriş sınavı ve reşit olma töreni yeminleri, bu aile mektupları onları ağlatıyor
Jackie Chanın gişe rekorları kıran "Dedektif Pu Songling" in ağızdan ağza bir sürprizi var, modern kardeşi Liu Yuning "Sevgiyi ifade etmeyi tercih ederim" diyor
Huang Zhongun taret çarpması, kuleyi parçaladı ve çifte baskı uyguladı
Spark Streaming Stream İşleme Projesi 8-Spark Streaming ve Flume Entegrasyonu
Enfes performans ve detaylar, harika sahnelenen "seri plan" "Kusursuz Hükümet" "Gerçek Güç" izleyiciler tarafından övgüyle karşılandı
To Top