1. Gerçek zamanlı veriler oluşturmak üzere gerçek dünya web sitelerini simüle etmek için Python komut dosyalarını kullanın:
# kodlama = UTF-8 rastgele içe aktar ithalat zamanı url_paths = ip_slices = http_referers = search_keyword = status_codes = def sample_url (): random.sample (url_paths, 1) döndür def sample_ip (): dilim = rasgele. örnek (ip_ dilim, 4) return ".". join () def sample_referer (): rastgele ise üniform (0,1) > 0.2: dönüş "-" refer_str = random.sample (http_referers, 1) query_str = random.sample (search_keyword, 1) return refer_str.format (sorgu = sorgu_dizesi) def sample_status_code (): random.sample döndür (durum_kodları, 1) def generate_log (count = 10): time_str = time.strftime ("% Y-% m-% d% H:% M:% S", time.localtime ()) f = open ("/ root / DataSet / access.log", "w +") sayarken > = 1: query_log = "{ip} \ t {local_time} \ t \" GET / {url} HTTP / 1.1 \ "\ t {status_code} \ t {referer}". format (url = sample_url (), ip = sample_ip () , referer = sample_referer (), status_code = sample_status_code (), local_time = time_str) sorgu_ günlüğünü yazdır f.write (sorgu_ günlüğü + "\ n") count = count-1 __name__ == '__ main__' ise: create_log (100)Her seferinde manuel olarak gerçekleştirilmesi gerektiğinde, gerçek üretim durumu ile uyumlu değildir. crontab ifadesi , Python betiğinin her dakika bir veri yığını oluşturmasına izin verin.
Linux * * * * * * ------ | | | | | | | | | | | + yıl | | | | + ----- haftanın günü (0-7) (Pazar = 0 veya 7) | | | + ---------- ay (1-12) | | + --------------- ayın günü (1-31) | + -------------------- saat (0-23) + ------------------------- dk (0-59) Java (Bahar) * * * * * * * ------- | | | | | | | | | | | | | + yıl | | | | | + ----- haftanın günü (0-7) (Pazar = 0 veya 7) | | | | + ---------- ay (1-12) | | | + --------------- ayın günü (1-31) | | + -------------------- saat (0-23) | + ------------------------- dk (0-59) + ------------------------------ saniye (0-59)Her dakika yürütülen crontab ifadesi: * / 1 * * * *
İçinde bir cümle bulunan log_generator.sh komut dosyası yazın: python /root/Project/generate_log.py,
Bu betiğe yürütülebilir izinler verin: chmod u + x log_generator.sh
Günlükleri oluşturmak üzere Python programını çalıştırmak için log_generator.sh betiğini çalıştırın.
Her dakika log_generator.sh betiğini çalıştıran crontab ifadesi şu şekilde yazılır: * / 1 * * * * /root/Project/log_generator.sh
İlk önce şu komutu kullanıyoruz: crontab -e Ve sonra koy * / 1 * * * * /root/Project/log_generator.sh içine kopyalayın, kaydedin ve çıkın, çalıştırabilirsiniz!
2. Python günlük oluşturucunun günlük çıktısını Flume'a bağlayın
Flume yapılandırma dosyasını streaming_project.conf yazın:
exec-memory-logger.sources = exec-source
exec-memory-logger.sinks = logger-sink
exec-memory-logger.channels = bellek-kanalı
exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /root/DataSet/access.log
exec-memory-logger.sources.exec-source.shell = / bin / sh -c
exec-memory-logger.channels.memory-channel.type = bellek
exec-memory-logger.sinks.logger-sink.type = günlükçü
exec-memory-logger.sources.exec-source.channels = bellek-kanalı
exec-memory-logger.sinks.logger-sink.channel = bellek-kanalı
Başlat komutu: flume-ng aracısı -n exec-memory-logger -c /soft/flume1.6/conf/ -f /soft/flume1.6/conf/streaming_project.conf -Dflume.root.logger = INFO, konsol
Konsolu inceleyin ve bağlantının başarılı olduğunu belirten aşağıdaki çıktıyı yazdırın:
2019-02-2611: 08: 06,713 (SinkRunner-PollingRunner-DefaultSinkProcessor) Olay: {üstbilgiler: {} gövde: 3133322E 33302E 38372E 363309323031132.30.87.63.201}
2019-02-2611: 08: 06,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) Olay: {üstbilgiler: {} gövde: 33302E 3133322E 3136372E 373209323030.132.167.72.20}
2019-02-2611: 08: 06,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) Olay: {üstbilgiler: {} gövde: 3132342E 3135362E 36332E 3535093230124.156.63.55.20}
3. Kanal havuzu verilerini kafka'ya dönüştürmek için Flume yapılandırma dosyasını değiştirin:
streaming_project2.conf
exec-memory-logger.sources = exec-source
exec-memory-logger.sinks = kafka-sink
exec-memory-logger.channels = bellek-kanalı
exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /root/DataSet/access.log
exec-memory-logger.sources.exec-source.shell = / bin / sh -c
exec-memory-logger.channels.memory-channel.type = bellek
exec-memory-logger.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-logger.sinks.kafka-sink.brokerList = hadoop0: 9092, hadoop1: 9092, hadoop2: 9092, hadoop3: 9092
exec-memory-logger.sinks.kafka-sink.topic = streamingtopic
exec-memory-logger.sinks.kafka-sink.batchSize = 5
exec-memory-logger.sinks.kafka-sink.requiredAcks = 1
exec-memory-logger.sources.exec-source.channels = bellek-kanalı
exec-memory-logger.sinks.kafka-sink.channel = bellek-kanalı
Başka bir makinede bir kafka tüketicisi başlatın: ./kafka-console-consumer.sh --zookeeper hadoop0: 2181 --topic streamingtopic ve ardından flume: flume-ng agent -n exec-memory-logger -c / soft / flume1.6 / conf / -f /soft/flume1.6/conf/streaming_project2.conf -Dflume.root.logger = INFO, konsol
Entegrasyonun başarılı olduğunu belirtmek için kafka tüketicisinin aşağıdaki verileri yazdırdığını gözlemleyin:
187.168.10.1672019-02-2611:24:01 "GET /class/112.html HTTP / 1.1" -200
156.10.124.292019-02-2611:24:01 "GET /class/131.html HTTP / 1.1" -500
63.46.29.1872019-02-2611:24:01 "GET /class/145.html HTTP / 1.1" -200
87.10.124.1672019-02-2611:24:01 "GET /class/112.html HTTP / 1.1" -500
63.29.72.1322019-02-2611:24:01 "GET /class/112.html HTTP / 1.1" -500
30.63.124.982019-02-2611:24:01 "GET /class/128.html HTTP / 1.1" https://search.yahoo.com/search?p=Hadoop temelleri 200
4. Bir sonraki adım, SparkStreaming ve Kafka'nın kenetlenmesidir
FlumeKafkaSparkStreaming'in tüm hattını açın ve kayıt numarası istatistiklerini tamamlamak için Spark uygulamasında kafka verilerini alın.
SparkStreaming uygulamasını yazın:
org.apache.spark.SparkConf dosyasını içe aktar import org.apache.spark.streaming.kafka.KafkaUtils org.apache.spark.streaming. {Seconds, StreamingContext} dosyasını içe aktarın / ** * @yazar YuZhansheng * @desc, Kafka'daki verileri işlemek için SparkStreaming kullanır * @create 2019-02-2611:40 * / object ImoocStatStreamingApp { def main (değiştirgeler: Dizi): Birim = { // Parametre sayısının 4 olup olmadığına karar verin, 4 değilse işlemden çıkın eğer (args.length! = 4) { println ("Kullanım: ImoocStatStreamingApp < zkQuorum > < grup > < konular > < numThreads > ") System.exit (1) } val Array (zkQuorum, groupId, konular, numThreads) = değiştirgeler val sparkConf = new SparkConf (). setAppName ("ImoocStatStreamingApp"). setMaster ("yerel") val ssc = new StreamingContext (sparkConf, Seconds (60)) val topicMap = konular.split (","). harita ((_, numThreads.toInt)). toMap val mesajlar = KafkaUtils.createStream (ssc, zkQuorum, groupId, topicMap) // Test adımı 1: Veri alımını test edin messages.map (_._ 2) .count (). yazdır ssc.start () ssc.awaitTermination () } }Test, parametreler IDEA'ya girilir:
Konsol, entegrasyonun başarılı olduğunu gösteren aşağıdaki bilgileri yazdırır:
-------------------------------------------
Zaman: 1551165180000 ms
-------------------------------------------
100
5. Veri temizleme: ihtiyacımız olan alan bilgilerini orijinal günlükten çıkarın
Yeni bir takım sınıfı DataUtils oluşturun:
java.util.Date'i içe aktar org.apache.commons.lang3.time.FastDateFormat dosyasını içe aktar / ** * @yazar YuZhansheng * @desc Tarih ve saat araçları * @create 2019-02-2615:22 * / object DateUtils { val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance ("yyyy-AA-gg SS: dd: ss") val TARGE_FORMAT = FastDateFormat.getInstance ("yyyyMMddHHmmss") def getTime (zaman: Dize) = { YYYYMMDDHHMMSS_FORMAT.parse (zaman) .getTime } def parseToMinute (time: String) = { TARGE_FORMAT.format (yeni Tarih (getTime (saat))) } def main (değiştirgeler: Dizi): Birim = { println (parseToMinute ("2019-02-2615:22:01")) } }ClickLog için bir etki alanı nesnesi oluşturun:
/ ** * @yazar YuZhansheng * @desc Temizlik sonrası günlük bilgileri * @param günlük erişim ip adresi * @param günlük erişim süresi * @Param günlüğü tarafından erişilen gerçek kurs numarası * @param günlüğü erişim durum kodu * @param günlük erişim referansı * @create 2019-02-2615:43 * / durum sınıfı ClickLog (ip: String, time: String, courseId: Int, statusCode: Int, referer: String)Veri temizliğini sağlamak için aşağıdaki programı kırmızı yazı tipiyle yukarıdaki SparkStreaming programına ekleyin:
com.xidian.spark.project.domain.ClickLog dosyasını içe aktarın import com.xidian.spark.project.utils.DateUtils // Test adımı 1: Veri alımını test edin //messages.map(_._2).count().print // İkinci adımı test edin: veri temizleme val günlükleri = mesajlar.map (_._ 2) val cleanData = logs.map (satır = > { val infos = line.split ("\ t") // infos (2) = "/ sınıf/112.html HTTP / 1.1 GET // url = /class/112.html val url = infos (2) .split ("") (1) var courseId = 0 // / class ile başlayan dersin numarasını alın eğer (url.startsWith ("/ class")) { val courseIdHTML = url.split ("/") (2) courseId = courseIdHTML.substring (0, courseIdHTML.lastIndexOf (".")). toInt } ClickLog (infos (0), DateUtils.parseToMinute (infos (1)), courseId, infos (3) .toInt, infos (4)) }). filtre (clicklog = > clicklog.courseId! = 0) cleanData.print ()Programı çalıştırın ve konsolun, veri temizleme işlevinin gerçekleştirildiğini gösteren aşağıdaki bilgileri verdiğini gözlemleyin:
-------------------------------------------
Zaman: 1551169320000 ms
-------------------------------------------
ClickLog (187.156.167.30,20190226162102,130,404, -)
ClickLog (72.87.46.124,20190226162102,128,200, https: //www.sogou.com/web? Query = Hadoop ile ilgili temel bilgiler)
ClickLog (30.168.46.187,20190226162102,146,500, -)
ClickLog (72.143.168.63,20190226162102,128,200, -)
ClickLog (168.30.132.98,20190226162102,145,404, -)
ClickLog (132.29.143.10,20190226162102,146,200, -)
..........
6. Talep: Bugüne kadar pratik kurslara (/ sınıf ile başlayan kurslar) yapılan ziyaretlerin sayısını sayın
Analiz: Bu gereksinimi yerine getirmek için, istatistiksel sonuçlarımızı depolamak için bir veritabanı kullanmamız ve istatistiksel sonuçları veritabanına yazmak için SparkStreaming kullanmamız gerekir.
yyyyMMdd kurs kimliği, veritabanındaki istatistiksel sonuçları görüntüler.
İstatistiksel sonuçların depolanması için hangi veritabanını seçmeliyim?
RDBMS: MySQL, Oracle ...
NoSQL: HBase, Redis ...
HBase'yi seçiyoruz! Açıklama yok. O (_) O
Önce HBase'i başlatın: ./start-hbase.sh
HBase Shell'i başlatın: ./hbase shell
Tüm tabloları görüntülemek için list komutunu kullanın.
Ancak aşağıdaki hata oluştu:
hbase (ana): 004: 0 * liste
TABLO
HATA: org.apache.hadoop.hbase.PleaseHoldException: Master başlatılıyor
org.apache.hadoop.hbase.master.HMaster.checkInitialized (HMaster.java:2293) adresinde
org.apache.hadoop.hbase.master.MasterRpcServices.getTableNames (MasterRpcServices.java:900) adresinde
org.apache.hadoop.hbase.protobuf.generated.MasterProtos $ MasterService $ 2.callBlockingMethod (MasterProtos.java:55650) adresinde
org.apache.hadoop.hbase.ipc.RpcServer.call adresinde (RpcServer.java:2180)
org.apache.hadoop.hbase.ipc.CallRunner.run adresinde (CallRunner.java:112)
org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop'ta (RpcExecutor.java:133)
org.apache.hadoop.hbase.ipc.RpcExecutor adresinde $ 1.run (RpcExecutor.java:108)
java.lang.Thread.run'da (Thread.java:748)
İnternette birçok çözüm bulundu, örneğin:
Hbase-site.xml dosyasındaki hbase.rootdir dosyasını aşağıdaki içeriğe değiştirin:
< Emlak > < isim > hbase.rootdir < / isim > < değer > hdfs: // redhat6: 9000 / hbase < / değer > < /Emlak >Ama sorunumu çözmedi! Yorum alanında bir yönteme bakın:
Her düğümün saatinin senkronize edilip edilmediğini kontrol etmek için tarih girin. Senkronize edilmemişse (kendi problemlerimin% 80'i temelde senkronize edilmemiştir), tarih -s "2019-02-2614:18" girebilirsiniz (saat kendiniz ayarlanmıştır) ) Her küme düğümündeki zaman eşitlemesinden sonra hbase'i yeniden başlatın, sorun çözüldü! !
Bir HBase tablosu oluşturun: create'imooc_course_clickcount ',' info '
Satır anahtarı tasarımı: day_courseid
Sonraki adım, HBase'i çalıştırmak için Scala'yı uygulamaktır:
Önce bir varlık sınıfı tanımlayın:
/ ** * @yazar YuZhansheng * @desc gerçek savaş kursu ziyaretleri * day_course: HBase'deki satır anahtarına karşılık gelir, format: 20190227_1 * click_count: 20190227_1 gününde kursa yapılan ziyaretlerin sayısına karşılık gelir * @create 2019-02-2619:19 * / case class CourseClickCount (day_course: Dize, click_count: Uzun)Veritabanına erişim ayrıca bir dao katmanı, yeni bir paket, yeni bir nesne, CourseClickCountDAO kurulmasını gerektirir.
Ayrıca bir Java sınıfı olan ve Java paketine yerleştirilmesi gereken bir araç sınıfı olan HBaseUtils.java oluşturmak da gereklidir.Bu sınıf HBase üzerindeki işlemleri özel olarak gerçekleştirir. İki sınıf aşağıdaki gibidir:
org.apache.hadoop.conf.Configuration dosyasını içe aktarın; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; java.io.IOException'ı içe aktarın; / ** * @yazar YuZhansheng * @desc HBase işletim aracı sınıfı, Java araç sınıfının tekli modda kapsüllenmesi önerilir * @create 2019-02-2710:11 * / public class HBaseUtils { HBaseAdmin admin = boş; Yapılandırma yapılandırması = boş; // Tekli mod, özel yapım yöntemi gerektirir özel HBaseUtils () { konfigürasyon = yeni Konfigürasyon (); configuration.set ("hbase.zookeeper.quorum", "hadoop0, hadoop1, hadoop2, hadoop3"); configuration.set ("hbase.rootdir", "hdfs: // hadoop0: 9000 / hbase"); Deneyin { admin = new HBaseAdmin (konfigürasyon); } catch (IOException e) { e.printStackTrace (); } } // Tembel tekli mod private static HBaseUtils örneği = null; genel statik senkronize HBaseUtils getInstance () { if (null == örnek) { örnek = new HBaseUtils (); } dönüş örneği; } // Tablo adına göre HTable örneğini edinin public HTable getTable (String tableName) { HTable tablo = boş; Deneyin { tablo = yeni HTable (konfigürasyon, tabloAdı); } catch (IOException e) { e.printStackTrace (); } dönüş tablosu; } / ** * HBase tablosuna bir kayıt ekleyin * @param tableName HBase tablo adı * @param rowkey HBase tablosu satır anahtarı * @param cf columnfamily of HBase table * @param column HBase tablo sütunu * @param değeri HBase tablosuna yazılan değer * / public void put (String tableName, String satır anahtarı, String cf, String sütunu, String değeri) { HTable table = getTable (tableName); Put put = new Put (Bytes.toBytes (satır anahtarı)); put.add (Bytes.toBytes (cf), Bytes.toBytes (sütun), Bytes.toBytes (değer)); Deneyin { table.put (koymak); } catch (IOException e) { e.printStackTrace (); } } // Verileri test edin, kullanırken bu ana işlevi yorumlayın public static void main (String args) { // HTable table = HBaseUtils.getInstance (). GetTable ("imooc_course_clickcount"); //System.out.println(table.getName (). GetNameAsString ()); String tableName = "imooc_course_clickcount"; Dize satır anahtarı = "20190111_88"; Dize cf = "bilgi"; Dize sütunu = "click_count"; Dize değeri = "2"; HBaseUtils.getInstance (). Put (tableName, rowkey, cf, column, value); } } paket com.xidian.spark.project.dao com.xidian.spark.project.domain.CourseClickCount'u içe aktar ithal com.xidian.spark.project.utils.HBaseUtils org.apache.hadoop.hbase.client.Get dosyasını içe aktar import org.apache.hadoop.hbase.util.Bytes scala.collection.mutable.ListBuffer'ı içe aktar / ** * @yazar YuZhansheng * @desc Gerçek savaş parkuru isabetleri için veri erişim katmanı * @create 2019-02-279:51 * / object CourseClickCountDAO { val tableName = "imooc_course_clickcount" val cf = "bilgi" val qualifer = "click_count" // Verileri HBase'e kaydedin def save (liste: ListBuffer): Birim = { val tablo = HBaseUtils.getInstance (). getTable (tableName) için (ele < -liste){ table.incrementColumnValue (Bytes.toBytes (ele.day_course), Bytes.toBytes (cf), Bytes.toBytes (qualifer), ele.click_count ) } } // Satır anahtarına göre sorgu değeri def count (day_course: String): Uzun = { val tablo = HBaseUtils.getInstance (). getTable (tableName) val get = new Get (Bytes.toBytes (day_course)) değer değeri = table.get (get) .getValue (cf.getBytes, qualifer.getBytes) eğer (değer == null) { 0l }Başka{ Bytes.toLong (değer) } } // Programın mevcut olup olmadığını test edin def main (değiştirgeler: Dizi): Birim = { val listesi = yeni ListBuffer list.append (CourseClickCount ("20190227_8"; 8)) list.append (CourseClickCount ("20190227_9"; 18)) list.append (CourseClickCount ("20190227_1"; 12)) kaydet (liste) } }Test: CourseClickCountDAO'nun ana işlevini (test işlevi) çalıştırın, konsol çıktısını görüntülemek için HBase kabuk konsolundaki scan'imooc_course_clickcount 'komutunu kullanın:
hbase (ana): 005: 0 > scan'imooc_course_clickcount ' SATIR SÜTUN + HÜCRE 20190211_88 sütun = bilgi: click_count, zaman damgası = 1551235301383, değer = 920190227_1 sütun = bilgi: click_count, zaman damgası = 1551236147161, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x0C 20190227_8 sütun = bilgi: click_count, zaman damgası = 1551236147119, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x0820190227_9 sütun = bilgi: click_count, zaman damgası = 1551236147150, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x120,0650 saniyede 4 satırVeriler başarıyla eklendi.
Son adım, gereksinimleri tamamlamak için SparkStreaming programını tamamlamaktır: şimdiye kadarki gerçek muharebe kurslarına (/ class ile başlayan kurslar) yapılan ziyaretlerin sayısını sayın ve bunları HBase veritabanına yazın. Aşağıdaki programı ImoocStatStreamingApp'e ekleyin, günlükleri oluşturmak için crontab'ı başlatın, Flume'u başlatın, kafka, HBase'i başlatın, programı çalıştırın ve HBase'deki veri artışını gözlemleyin.
//cleanData.print () // Test Adım Üç: Bugüne kadar pratik kurslara yapılan ziyaretlerin sayısını (/ class ile başlayarak) sayın cleanData.map (x = > { // HBase satır anahtarı tasarımı: 20190226_8 (x.time.substring (0,8) + "_" + x.courseId, 1) }). lessByKey (_ + _). foreachRDD (rdd = > { rdd.foreachPartition (partitionRecords = > { val listesi = yeni ListBuffer partitionRecords.foreach (çift = > { list.append (CourseClickCount (pair._1, pair._2)) }) CourseClickCountDAO.save (liste) }) }) hbase (ana): 006: 0 > scan'imooc_course_clickcount ' SATIR SÜTUN + HÜCRE 20190211_88 sütun = bilgi: click_count, zaman damgası = 1551235301383, değer = 920190226_112 sütun = bilgi: click_count, zaman damgası = 1551238136748, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x0020190226_128 sütun = bilgi: click_count, zaman damgası = 1551238136477, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1820190226_130 sütun = bilgi: click_count, zaman damgası = 1551238136755, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1420190226_131 sütun = bilgi: click_count, zaman damgası = 1551238136488, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1C 20190226_145 sütun = bilgi: click_count, zaman damgası = 1551238136767, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1920190226_146 sütun = bilgi: click_count, zaman damgası = 1551238136512, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x1620190227_1 sütun = bilgi: click_count, zaman damgası = 1551236147161, değer = \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x00 \ x0CTalep yerine getirildi.
6. Talep: Arama motorlarından gelen pratik kursların tıklanma sayısını sayın
Yeni bir HBase tablosu oluşturun: create'imooc_course_search_clickcount ',' info '
Rowkey tasarımı: İşletmemizin ihtiyaçlarına göre şu şekilde tasarlanmıştır: 20190227 + arama + 1
Ardından, CourseSearchClickCount adlı yeni bir varlık sınıfı oluşturun:
/ ** * Arama motorundan gelen gerçek savaş kursu hitlerinin varlık kategorisi * @param day_search_course * @param click_count * / case class CourseSearchClickCount (day_search_course: Dize, click_count: Uzun)Veri erişim katmanı DAO:
com.xidian.spark.project.domain.CourseSearchClickCount'u içe aktar ithal com.xidian.spark.project.utils.HBaseUtils org.apache.hadoop.hbase.client.Get dosyasını içe aktar import org.apache.hadoop.hbase.util.Bytes scala.collection.mutable.ListBuffer'ı içe aktar / ** * Arama motorları-veri erişim katmanından gerçek savaş kurslarının hitleri * / object CourseSearchClickCountDAO { val tableName = "imooc_course_search_clickcount" val cf = "bilgi" val qualifer = "click_count" / ** * Verileri HBase'e kaydedin * @param list CourseSearchClickCount koleksiyonu * / def save (liste: ListBuffer): Birim = { val tablo = HBaseUtils.getInstance (). getTable (tableName) için (ele < -liste) { table.incrementColumnValue (Bytes.toBytes (ele.day_search_course), Bytes.toBytes (cf), Bytes.toBytes (qualifer), ele.click_count) } } / ** * Satır anahtarına göre sorgu değeri * / def count (day_search_course: String): Long = { val tablo = HBaseUtils.getInstance (). getTable (tableName) val get = new Get (Bytes.toBytes (day_search_course)) değer değeri = table.get (get) .getValue (cf.getBytes, qualifer.getBytes) eğer (değer == null) { 0L }Başka{ Bytes.toLong (değer) } } // // def main (değiştirgeler: Dizi): Birim = { // // // Kullanılabilirliği test edin // değer listesi = yeni ListBuffer // list.append (CourseSearchClickCount ("20190227_www.baidu.com_8", 8)) // list.append (CourseSearchClickCount ("20190227_cn.bing.com_9", 9)) // // kaydet (liste) // // println (count ("20190227_www.baidu.com_8") + ":" + count ("20190227_cn.bing.com_9")) //} }Son adım, SparkStreaming programını değiştirmek ve istatistiksel işlevi tamamlamak için aşağıdaki programı eklemektir:
// Test Adımı 4: Bugünden bugüne arama motorundan pratik kurslara yapılan ziyaretlerin sayısını sayın cleanData.map (x = > { // Dönüştürme: https://www.sogou.com/web?query=Spark SQL gerçek savaş == > https: /www.sogou.com/web? query = Spark SQL savaşı val referer = x.referer.replaceAll ("//", "/") val bölmeler = referer.split ("/") var host = "" if (splits.length > 2) { ana bilgisayar = bölmeler (1) } (host, x.courseId, x.time) }). filtre (_._ 1! = "") .map (x = > { (x._3.substring (0,8) + "_" + x._1 + "_" + x._2, 1) }). lessByKey (_ + _). foreachRDD (rdd = > { rdd.foreachPartition (partitionRecords = > { val listesi = yeni ListBuffer partitionRecords.foreach (çift = > { list.append (CourseSearchClickCount (pair._1, pair._2)) }) CourseSearchClickCountDAO.save (liste) }) })