Büyük veri gerçek zamanlı işleme

Yazar: Wu Zhihui, Pekin Mobil Ağ İşletim ve Bakım Merkezi Veri Sistemleri Mimarı, Yazılım Mühendisliği Yüksek Lisansı, Pekin Posta ve Telekomünikasyon Üniversitesi, kıdemli mühendis. Uzun yıllardır operatörler için sistem mimarisi tasarımı, yazılım geliştirme, büyük veri analizi ve madencilikle uğraştı.

İnternet çağının gelişmesiyle birlikte, içerik aktarımı için boru hattı hizmet sağlayıcıları olarak operatörler, veri alanında büyük avantajlara sahip oluyorlar.Bu verilerin nasıl değere dönüştürüleceği operatörler tarafından giderek daha fazla değerleniyor.

Operatörlerin büyük verileri, çeşitli arama faturaları, sinyalizasyon vb. Gibi büyük hacimli ve çeşitli türlerin özelliklerine sahiptir. Genellikle, bir arama faturasının günlük veri hacmi on milyardır. İş analizi gereksinimleri daha yüksek ve daha yüksek gerçek zamanlı veri işleme gereksinimleri gerektirdiğinden, büyük veri işleme mimarimize de büyük zorluklar getirdi.İnternette bulunan örneklere bakın ve bunu gerçek işleme mimarisine uygulayın. Gerçek zamanlı veri akışı büyüktür ve kararsız sistem çalışmasına ve çeşitli anormalliklere neden olur. Büyük veri gerçek zamanlı işleme mimarisinin geliştirilmesinden lansmanına kadar yaklaşık 2 ay sürdü.Birçok optimizasyondan sonra sistemimiz stabilize oldu. Sonunda, gerçek zamanlı olarak günde on milyarlarca veriyi işlemek için 10 sunucudan oluşan bir küme kullanıyoruz Burada, her veri için 100 alan var ve en uzun alan içeriği 1.000 baytı aşıyor.

Büyük miktarda veriyi gerçek zamanlı büyük veri işleme sürecinde özetlediğimiz iniş ve çıkışları paylaşalım.

  • Proje Hedefleri

Sınırlı sayıdaki sunucu kümeleri temelinde, günde on milyarı aşan ve 20T'nin üzerinde bir hacme sahip bir çağrı biletinin gerçek zamanlı olarak işlenmesi gerçekleştirilir. Özel gereksinim, FTP'nin ayrıntılı faturaları birden çok çağrı faturası sunucusunda toplaması, kullanıcıların ayrıntılı faturaları gerçek zamanlı olarak sorgulaması için verileri gerçek zamanlı işlemden sonra Hbase veritabanında depolaması ve faturaları çevrimdışı analiz için HDFS'de depolamasıdır.

  • Donanım kaynakları

10 x86 sunucu, bağımsız yapılandırma 16 kutu CPU, 128G bellek, 2T sabit disk * 10, 300G sabit disk * 2 (sistem diski).

  • sistem yapısı

10 sunucu bir hadoop kümesi oluşturur, burada NameNode düğümü aynı anda FTP ve Flume yüklemek için bir toplama makinesi olarak kullanılır ve diğer 5 sunucu büyük veri gerçek zamanlı akış işleme mimarisini uygulamak için Kafka, Zookeeper ve Storm'u kurmak için seçilir. Küme hesaplama kaynaklarından tam olarak yararlanmak için, bu 5 sunucu da Az miktarda İplik bilgi işlem kaynağı, günlük çevrimdışı veri analizi ihtiyaçlarına katılmak üzere yapılandırılır. Kalan 4 sunucu için büyük veri altında ikinci seviye sorgu gereksinimlerini karşılamak için Hbase'i kurduk.Sistem topolojisi aşağıdaki gibidir:

Şekil 1 Sistem topolojisi şeması

  • Proje uygulaması

1. Kullanılan ilgili teknolojiler

Öncelikle ilgili büyük veri mimarisini ve açık kaynak teknolojisini, büyük veri işleme ayırma hattı analiz mimarisini ve gerçek zamanlı işleme mimarisini inceleyelim. Çevrimdışı analiz mimarisi (Hive, Map / Reduce, Spark Sql, vb.), Veri analizi sonrası ve veri madenciliğinin uygulama gereksinimlerini karşılayabilir. Kullanıcı gerçek zamanlı ayrıntılı sipariş sorgulama, iş hacmi izleme vb. Gibi yüksek gerçek zamanlı gereksinimleri olan uygulamalar için gerçek zamanlı işlem mimarisi gereklidir. Şu anda, büyük veriler için en yaygın açık kaynaklı gerçek zamanlı işleme mimarileri Storm ve Spark Streaming'dir. Spark Streaming yarı gerçek zamanlı toplu işleme sistemi ile karşılaştırıldığında, Strom daha saf bir gerçek zamanlı işleme sistemidir, yani bir olay işlenir ve daha yüksek gerçek zamanlı performansa sahiptir.

Flume, Cloudera tarafından sağlanan yüksek düzeyde kullanılabilir, yüksek güvenilirliğe sahip, dağıtılmış bir toplu günlük toplama, toplama ve iletim sistemidir. Flume, bağımsız ve kümeleri destekler, sürekli olarak yeni dosyalar oluşturan sürekli yazılan dosyalar, soketler ve klasörler gibi birden çok veri kaynağını destekler ve HDFS, Kafka ve Mysql veritabanları gibi birden çok çıktıyı destekler. Flume kullanıldığında, yalnızca basit yapılandırma gereklidir ve program geliştirmeye gerek yoktur.

Kafka, büyük bir veri arabellek havuzuna benzeyen, yüksek verimli dağıtılmış bir yayınlama ve abone olma mesajlaşma sistemidir ve tek bir veri parçasının çok kullanıcılı tüketimini destekler. ZooKeeper, kümeler arasında bazı bileşenlerin durum senkronizasyon bilgilerini depolamaktan sorumlu, dağıtılmış, açık kaynak dağıtılmış bir uygulama koordinasyon hizmetidir. Nimbus ana düğümü ve Supervisor bağımlı düğümü (Storm 1.0'dan Nimbus yedekleme düğümü eklenmiştir) dahil olmak üzere Storm dağıtılmış gerçek zamanlı bilgi işlem sistemi ve düğümlerin durum senkronizasyonu için Zookeeper'a güvenmesi gerekir. Fırtına kümesi bileşenleri:

  • Nimbus: Storm kümesinin kaynak tahsisi ve görev planlamasından sorumlu ana düğümüdür.

  • Süpervizör: Storm kümesinin slave düğümüdür.Nimbus tarafından atanan görevleri kabul etmekten, işçi süreçlerini kendi yönetimi altında başlatmak ve durdurmaktan sorumludur.Gerçekten anlamıyla dağıtılmış bir hesaplama düğümüdür.

Şekil 2 Fırtına kümesi bileşenleri

Storm uygulaması, Java programlarının geliştirilmesini ve programlama modelinde yer alan kavramları içerir:

  • Topoloji: Storm'da çalışan gerçek zamanlı bir uygulama Çeşitli bileşenler arasındaki mesajların akışı mantıksal bir topoloji oluşturur Topoloji başlatıldıktan sonra bellekte kalır ve çalışan kaynaklarını işgal eder.

  • Spout: Bir Topolojide bir kaynak veri akışı oluşturan bir bileşen. Normalde Spout, harici veri kaynaklarından verileri okur ve ardından bunları Topoloji dahili kaynak verilerine dönüştürür.

  • Bolt: Verileri kabul eden ve ardından bir Topolojide işleme gerçekleştiren bir bileşen. Bolt; filtreleme, fonksiyon işlemleri, birleştirme ve veritabanına yazma gibi her türlü işlemi gerçekleştirebilir.

  • Tuple: Mesaj aktarımının temel birimi.

2. Açık kaynak bileşen kurulumu ve yapılandırması

a) Kanal montajı ve konfigürasyonu

Kanal kurulum paketini adresinden indirin ve sıkıştırmasını açın; kurmak için Cloudera Manager veya Ambari kullanıyorsanız, sadece ilgili yönetim sayfası aracılığıyla kurmanız ve yapılandırmanız gerekir. Sadece tek makineli Flume'u kurduk ve Flume kümesini kurmadık.Tek makineli Flume işleme verimliliği çok yüksektir ve her gün on milyarlarca veriyi işleme ihtiyacımızı tam olarak karşılayabilir, ancak Flume'un çok sağlam ve çoğu zaman işlediği unutulmamalıdır. İşlem, verilerin işlenmediği durumda takılı kalırsa, Flume'u kullanırken aşağıdaki noktalara dikkat edin:

  • Kanal izleme dizini dizin içeremez;

  • Kanal tarafından işlenmekte olan dosya diğer işlemler tarafından değiştirilemez (örneğin, FTP ile aktarılan dosyanın kanal işlemeyi önlemek için filtre koşullarını ayarlaması gerekir). Kanal izleme dizininin FTP gerçek zamanlı aktarım dizininden ayrılması tavsiye edilir, bu da derinin FTP aktarımındaki dosyaları işlemesini önlemek için istisnalara neden olabilir.Ayrıca, aktarılan dosyaları yok saymak için normal bir ifade de ayarlayabilirsiniz:

a1.sources.r1.ignorePattern = ^ (.) * \. tmp $
  • Kanal tarafından işlenen dosyalar, kanal işleminin donmasına neden olan özel karakterler içerebilir. Tanınmayan karakterlerle karşılaştığında yok saymak ve atlamak için ayarlayın:

a1.sources.r1.decodeErrorPolicy = IGNORE
  • Kanal çalıştırma sırasında GC bellek taşması hatası oluştu, flume-env.sh içindeki bellek yapılandırmasını yapılandırın (varsayılan değer çok küçüktür);

dışa aktar JAVA_OPTS = "- Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote"
  • Flume başlatıldığında, -c ayrıntılı kanal yapılandırma dosyası dizinine verilmelidir, aksi takdirde flume-env.sh içindeki yapılandırma yüklenmez ve varsayılan yapılandırma kullanılır.Örneğin, tam yapılandırma dosyası dizinine aşağıdaki başlatma komutu verilir:

/hadoop/apache-flume-1.6.0-bin/bin/flume-ng agent -c /hadoop/apache-flume-1.6.0-bin/conf/
  • Bir bellek kuyruğu kullanıyorsanız, lütfen bellek kuyruğundaki mesaj sayısının yapılandırmasına dikkat edin. TransactionCapacity kuyruğu boyutu batchSize'den büyük veya ona eşit olmalıdır;

a1.channels.c1.transactionCapacity = 2000a1.sinks.k1.batchSize = 2000
  • BatchSize'ın artırılması, kanalın işleme hızını artırabilir. İlke, kanal tarafından işlenen olayların işlem kuyruğunda depolanmasıdır.BatchSize'ın miktar koşulu karşılanıncaya kadar, bunlar bir seferde toplu olarak çökmeye gönderilir. Ancak gerçek veri hacminin boyutuna dikkat edin.Gerçek veri hacmi küçükse, batchSize çok büyük yapılandırılamaz, aksi takdirde veriler batchSize'nin miktar durumuna ulaşmaz ve uzun süre işlem kuyruğunda biriktirilir ve arkadaki gerçek zamanlı işlem programı bunu anlamaz. Düşük gerçek zamanlı performansa neden olan veriler;

  • Kanalda okunan bir kaydın uzunluğu 2048 karakteri aşıyor, yani 4096 bayt kesilecek, çözmek için yapılandırma dosyasına aşağıdaki yapılandırma öğelerini ekleyebilirsiniz:

üretici.sources.s.deserializer.maxLineLength = 65535
  • Flume karakter dönüştürme istisnası sorunu, java.nio.charset.MalformedInputException: Giriş uzunluğu = 1, yapılandırma dosyasına aşağıdaki yapılandırma öğeleri eklenerek çözülebilir:

a1.sources.r1.inputCharset = ISO8859-1
  • Hatalı karakterlerle karşılaşıldığında Flume durur ve bir istisna bildirir: java.nio.charset.MalformedInputException. Hata verilerini yok saymak için konfigürasyon dosyasına aşağıdaki konfigürasyonu ekleyebilirsiniz (varsayılan, FAIL'dir, bir istisna atar ve bir hata bildirir ve Flume durur) sorunu çözmek için;

maker.sources.s.decodeErrorPolicy = IGNORE
  • Varsayılan olarak, Flume tarafından işlenen dosyalar .completed sonekini ekler. Büyük miktarda veri olması durumunda, toplayıcının sabit diski hızla doldurulur. Flume'un işlendikten sonra otomatik olarak silmesine izin vermek için aşağıdaki yapılandırmayı yapılandırma dosyasına ekleyebilirsiniz. Veri dosyası çözüldü.

a1.sources.r1.deletePolicy = anında

Kanal konfigürasyonu:

a1.sources = r1

Flume-env.sh yapılandırması:

# Çevre değişkenleri buradan ayarlanabilir.

Kanal başlatma komutu:

/hadoop/apache-flume-1.6.0-bin/bin/flume-ng agent -c /hadoop/apache-flume-1.6.0-bin/conf/ -f /hadoop/apache-flume-1.6.0-bin /conf/viewdata.conf -n yapımcı Dflume.root.logger = HATA

Tam Flume yapılandırma dosyasına giden yolun verilmesi gerektiğine dikkat edin, aksi takdirde Flume-env.sh yapılandırması Flume başlatılırken doğru şekilde yüklenemez.

b) Kafka küme kurulumu ve yapılandırması

Kafka kurulum paketini kafka _ *. Tgz adresinden indirin, sıkıştırmasını açın ve server.properties dosyasını yapılandırın.

server.properties yapılandırması:

# Kafka kümesindeki bu makinenin kimliği

broker.id = 48

#Service Port

bağlantı noktası = 9092

#CPU adı

host.name = storm01

# Ağ isteklerini işleyen iş parçacığı sayısı

num.network.threads = 3

# Disk G / Ç yapan iş parçacığı sayısı

num.io.threads = 8

# Soket sunucusu tarafından kullanılan gönderme tamponu (SO_SNDBUF)

socket.send.buffer.bytes = 102400

# Soket sunucusu tarafından kullanılan alma tamponu (SO_RCVBUF)

socket.receive.buffer.bytes = 102400

# Soket sunucusunun kabul edeceği maksimum istek boyutu (OOM'ye karşı koruma)

socket.request.max.bytes = 104857600

#kafka Veri saklama konumu (veri miktarı büyük olduğunda, saklanması gereken dizinin boyutu da yeterli olmalıdır)

log.dirs = / data1 / kafka-logs

Bölüm sayısını oluşturmak için # Varsayılan konu

num.partitions = 1

# Bu değerin RAID dizisinde bulunan veri dizinlerine sahip kurulumlarda artırılması önerilir.

num.recovery.threads.per.data.dir = 1

#kafkaEvent Yalnızca sabit diske giden flaş sonraki tüketiciler tarafından kullanılabilir, bu nedenle flaş süresi parametreleri küçük veri hacimlerinde çok uzun veri yenileme süresinden kaçınmak için yapılandırılmalıdır.

log.flush.interval.messages = 10000

log.flush.interval.ms = 1000

# Verilerin Kafka'da saat cinsinden saklandığı zaman, zaman aşımı verileri Kafka tarafından otomatik olarak silinecektir.

log.retention.hours = 48

# Bir günlük segment dosyasının maksimum boyutu Bu boyuta ulaşıldığında yeni bir günlük segmenti oluşturulacaktır.

log.segment.bytes = 1073741824

# Günlük segmentlerinin silinip silinemeyeceklerinin kontrol edildiği aralık.

saklama politikalarına

log.retention.check.interval.ms = 300000

# Eğer log.cleaner.enable = true ayarlanmışsa, temizleyici etkinleştirilecek ve daha sonra ayrı günlükler günlük sıkıştırması için işaretlenebilecektir.

log.cleaner.enable = false

# zookeeper küme yapılandırması

zookeeper.connect = master: 2181, storm01: 2181, storm02: 2181, storm03: 2181, storm04: 2181

# Zookeeper'a bağlanmak için ms cinsinden zaman aşımı

zookeeper.connection.timeout.ms = 6000

# Konu yapılandırmasını silmek mümkünse, varsayılan yanlış, konuyu silemez

delete.topic.enable = true

Kafka service start: jps komutu, kafka'nın işlem adını görebilir ve kafka'nın başarıyla başlatıldığını gösterir.

nohup kafka-server-start.sh /home/hadoop/kafka_2.9.1-0.8.2.1/config/server.properties

Konu oluştur: 24 bölümlü bir konu olan 2 çoğaltma faktörü oluşturun, birden çok bölüm oluşturmanın amacı paralelliği artırmaktır ve çoğaltma faktörünün amacı veri güvenliğini ve yedekliliği sağlamaktır.

kafka-topics.sh --create --zookeeper master: 2181, storm01: 2181, storm02: 2181, storm03: 2181, storm04: 2181 - replication-factor 2 --bartitions 24 --topic sighttp

Kafka veri depolama yöntemi: kafka veri depolama dizininde, her yöntemden sonra adlandırılan klasörleri görebilirsiniz, örneğin, sighttp-19, aşağıdaki şekilde gösterildiği gibi konu: sighttp, partition: 19 anlamına gelir:

Şekil üç

Konu bölümü dizinine girin, .index ve .log ile biten birçok dosyayı görebilirsiniz. .Log, verileri Kafka arabellek havuzunda depolayan veri dosyasıdır ve .index, dizin dosyasıdır. Veri dosyası ve dizin dosyası çiftler halinde görünür. Dosya adı, dosyada depolanan verilerin başlangıcını tanımlayan bir sayı dizisidir Seri numarası aşağıdaki gibidir:

Şekil Dört

Kafka veri tüketim durumu sorgusu: Kafka'dan tüketici veri tüketiminin durumu, zookeeper'da kaydedilir ve zkCli.sh komutu kullanılarak görüntülenebilir.Aşağıdaki şekil, tüketim konusunun durumunu sorgular: sighttp, partition: 0 ve ofset, 49259227840 satır işlediğini gösterir. ,Aşağıda gösterildiği gibi:

Şekil 5

Deneyim: Tüketilen satır sayısı ve depolanan satır sayısı aracılığıyla, veri işleme programının hızının veri oluşturma hızı talebini karşılayıp karşılamadığına karar verilebilir.

Kafka tüketimi tipik anormallikler:

HATA: Korelasyon kimliği 0 olan tüketiciden bölüm uzaklığı 6535061966 için getirme isteği işlenirken hata oluştu. Olası neden: Göreli konum 6535061966 için istek ancak sadece 6580106664 ile 6797636149 aralığında günlük segmentlerimiz var. (Kafka.server.ReplicaManager)

İstisna nedeni: 6535061966 seri numaralı mesaj, mesajın süresinin dolması nedeniyle Kafka'da silinmiştir. Şu anda Kafka'da yalnızca 6580106664 ila 6797636149 aralığında günlükler vardır, ancak tüketicinin süresi dolmuş silinmiş mesajla ilgilenmesi gerekir, ardından bu istisna mesajı görünür (genellikle Bunun nedeni, veri oluşturma hızının gereksinimlerini karşılayamayan yavaş veri işleme hızından kaynaklanmaktadır ve bu da bir mesaj birikimi ile sonuçlanmaktadır. Mesaj biriktirme listesi, Kafka tarafından yapılandırılan sona erme süresine ulaşır ve Kafka tarafından silinir).

c) Fırtına kümesi kurulumu ve yapılandırması

Storm kurulum paketini adresinden indirin. En son sürüm, özellikle STORM-935 problemi olmak üzere birçok hatayı düzelttiğinden (topoloji başladıktan sonra çok fazla sistem kaynağını kullanacaktır) için Storm 0.10.0 veya daha üstünü kullanmanız önerilir. , Kararsız Topoloji işlemiyle sonuçlanan).

Storm.yaml dosyası yapılandırması:

#zookeeper küme sunucusu yapılandırması

storm.zookeeper.servers: - "ana" - "storm01" - "storm02" - "storm03" - "storm04"

#storm ana düğümü

nimbus.host: "ana"

#stromYönetim sayfası hizmeti bağlantı noktası

ui.port: 8081

#storm Düğüm hizmeti bağlantı noktası yapılandırmasında varsayılan değer 6700-6703'tür, bu da her bir sunucunun 4 çalışan yuva sağlayabileceği anlamına gelen toplam 4 bağlantı noktasıdır. Burada 6704 ve 6705 bağlantı noktaları eklenir, yani tek bir sunucu 2 çalışan yuvası ekler , Çalışan sayısındaki artış, fırtına kümesinin daha fazla bilgi işlem kaynağı sağlayabileceği anlamına gelir.

supervisor.slots.ports: -6700-6701-6702-6703-6704-6705

# Durum bilgisi depolama konumu, / tmp kullanmaktan kaçının

storm.local.dir: "/home/hadoop/apache-storm-0.10.0/workdir"

# Ana düğüm belleği

nimbus.childopts: "-Xmx3072m"

# Köle düğüm belleği

supervisor.childopts: "-Xmx3072m"

# işçinin hafızası, artan hafıza, GC aşırı yüklenme problemini azaltabilir

worker.childopts: "-Xmx3072m"

# Varsayılan 30'dur, Netty iletişim sorunları nedeniyle çalışanın kararsızlığını azaltmak için ağ zaman aşımını ve diğer parametreleri artırın

storm.messaging.netty.max_retries: 60

#Add storm.messaging.netty.max_wait_ms ayarı, varsayılan 1000'dir

storm.messaging.netty.max_wait_ms: 2000

Hizmeti başlatın:

  • Ana düğüm: (ana düğüm hizmetini ve yönetim sayfasını başlatın)

    nohup fırtına nimbus ve

    nohup fırtına kullanıcı arabirimi

  • Köle düğümü: nohup fırtına süpervizörü

Fırtına yönetimi sayfası:

Tarayıcıda, Storm UI'nin bulunduğu sunucunun adresini + port 8081'i girin ve aşağıda gösterildiği gibi Strom yönetim sayfasını açın:

Şekil Altı

Şekil 6 Küme Özeti'nden Storm kümesinde 4 Supervisor düğümü olduğu görülebilir, çünkü her bir Süpervizör 6 yuva sağlar (supervisor.slots.ports özelliği storm.yaml konfigürasyon dosyasında yapılandırılmamışsa, her Süpervizör varsayılan olarak 4 sağlar. Yuvalar), bu nedenle 22'si kullanılan ve 2'si ücretsiz olan toplam 4 * 6 = 24 yuva vardır. Her topoloji serbest bırakıldığında, yuvayı uzun süre işgal edeceği unutulmamalıdır.Yeterli yuva yoksa, yeni serbest bırakılan topoloji yalnızca boşta kalan yuvayı işgal edecek ve diğer dolu yuva kaynaklarını engellemeyecektir; yuva yoksa, serbest bırakılmayacaktır. Yeni topolojide, şu anda Storm küme sunucusuna dokunmanız, yuva kaynaklarını artırmanız veya yapılandırma dosyası aracılığıyla yeni sunucular eklemeniz gerekir.

Şekil 6'daki Topoloji Özetinden, kümede 7 Topolojinin serbest bırakıldığı görülebilir.Her Topoloji tarafından işgal edilen çalışan kaynakları, başlatılan yürütme iş parçacığı sayısı ve özel kaynak kullanımı Fırtına Topolojisi geliştirme programında belirtilmiştir.

d) Kafka + Storm + Hdfs + Hbase topoloji geliştirme

MAVEN projesini oluşturmak için Eclipse kullanıyoruz ve pom.xml yapılandırma dosyasına Storm ve Hdfs ile ilgili bağımlılıkları ekliyoruz.Bu örnekte Storm, Kafka'daki verileri tüketiyor ve ETL işleminden sonra HDFS ve Hbase'de depoluyor.Bu nedenle Storm-Kafka eklememiz gerekiyor, Storm-Hdfs, Storm-Hbase ve diğer bağımlılıklar, bağımlılık paketi sürümünün küme ile tutarlı olmasına dikkat edin.

Ayıklama işlemi BaseRichBolt sınıfını miras alır:

public class splitBolt, BaseRichBolt'u genişletir {private static final String TAB = ",";

özel OutputCollector toplayıcı;

public void hazırlığı (Map config, TopologyContext context, OutputCollector toplayıcı) {

this.collector = toplayıcı;}

public void execute (Tuple input) {Dize satırı = input.getString (0); Dize kelimeleri = line.split (TAB);

if (words.length > 74) {Dize Hesabı;

if (words.length > 0) Hesap = kelimeler;

else Account = ""; String LocalIPv4;

if (words.length > 0) LocalIPv4 = kelimeler;

else LocalIPv4 = ""; String RemoteIPv4;

if (words.length > 0) RemoteIPv4 = kelimeler;

else RemoteIPv4 = ""; String newline = Account + "|" + LocalIPv4 + "|" + RemoteIPv4; collector.emit (input, new Values (newline));} collector.ack (input);}

public void declareOutputFields (OutputFieldsDeclarer deklaratörü) {declarer.declare (yeni Alanlar ("yeni satır"));}}

Hbase yazmak için HBaseMapper sınıfını uygulamanız gerekir:

public class myHbaseMapper, HBaseMapper'ı uygular {public ColumnList column (Tuple tuple) {String line = tuple.getString (0); String word = line.split ("\ |"); ColumnList cols = new ColumnList;

// Parametreler sütun ailesi adı, sütun adı, sırayla değer if (words.length > 0) cols.addColumn ("içerik" .getBytes, "LocalIPv4" .getBytes, words.getBytes);

if (words.length > 0) cols.addColumn ("içerik" .getBytes, "RemoteIPv4" .getBytes, words.getBytes);

sütunları döndür;}

public byte rowKey (Tuple demeti) {String line = tuple.getString (0); String kelimeler = line.split ("\ |"); Dize anahtarı;

// satır anahtarı, hbase tablosundaki bölümlerin veri dengesini kolaylaştırmak için Hesabın ters dizesine ayarlanır key = new StringBuilder (word) .reverse.toString;

dönüş anahtarı.getBytes;}}

ana işlev:

public static void main (String argümanları) {String zks = "master: 2181, storm01: 2181, storm02: 2181"; // zookeeper cluster String topic = "topicname"; // kafka'daki konu adı String zkRoot = "/ storm" ; // Durum bilgilerini zookeeper'da depolayan kök dizin String id = "kafkatopicname"; // Bu topolojinin durum bilgilerini zookeeper FileNameFormat fileNameFormat = new DefaultFileNameFormat .withPath ("/ storm / tmp /"). WithPrefix (" tmp _ "). withExtension (". dat "); RecordFormat format = new DelimitedRecordFormat .withFieldDelimiter (" | "); // hdfs'ye yazılan dizin dosyası adı'tmp_ 'ile başlar ve'.dat' ile biter // her 10 dakikada bir Hdfs FileRotationPolicy rotationPolicy = new TimedRotationPolicy (10.0f, TimeUnit.MINUTES); BrokerHosts brokerHosts = new ZkHosts (zks);

Yukarıdaki program, Kafka fırtına okuma ve Hdfs ve Hbase yazmanın bir örneğini uygulamaktadır.Ekstraksiyon sınıfında, farklı iş gereksinimlerine göre Java kodu aracılığıyla farklı mantık uygulanabilmektedir. Derlenmiş jar paketini kümeye yükleyin ve Topolojiyi göndermek için fırtına komut satırını kullanın:

fırtına kavanozu ./kafkastream.jar sighdfs.sighttphdfs stormmaster

sonuç olarak

Birkaç aylık fiili operasyonun ardından, büyük veri gerçek zamanlı işleme mimarimiz her zaman sabit kalabilir ve çağrı faturası işleme hızı, operatörlerin büyük verilerinin çeşitli analiz ve sorgu gereksinimlerini etkin bir şekilde destekleyen çağrı faturası oluşturma hızından daha yüksektir. Geliştirme ve optimizasyon süreci zorluklarla doludur.Çeşitli çalışmalar ve girişimlerden sonra sorunlar yavaş yavaş çözülür.Burada ayrıca birçok geliştirme ve optimizasyon deneyimi biriktirdik.

Son olarak, aslında karşılaştığımız iki sorunu paylaşacağız:

  • Zookeeper yapılandırması Storm topolojisinin dengesiz çalışmasına neden oluyor

Storm kümesi durum senkronizasyonu için Zookeeper kümesi gerektirdiğinden, tüm Storm sunucusu çalışan işlemleri sürekli olarak Zookeeper düğümlerine bağlanacaktır. Zookeeper düğümleri için varsayılan bağlantı sayısı 60'tır. Storm bilgi işlem topolojilerinin sayısı büyük olduğunda, Zookeeper yapılandırmasını değiştirmeniz gerekir maxClientCnxns = 1000, artırın Zookeeper bağlantılarının sayısı.

  • HDFS Düğümlerinin Yüksek Disk G / Ç'si Fırtına Topolojisinin Kararsız Çalışmasına Neden Oluyor

Storm gerçek zamanlı bilgi işlem olduğu için, her bağlantının tıkanması Storm topolojisinin istikrarsızlığına neden olacaktır.Geliştirme sırasında, Storm'un fazla mesai yazmasına neden olan bir HDFS düğümünde yüksek disk G / Ç ile karşılaştık ve sonunda Süpervizörün çalışanları öldürmesine neden oldu. Topolojik kararsızlık sorunu. Bunun nedeni, belirli bir Hdfs düğümünde, İplik görevinin bir Azaltma işlemi gerçekleştirmesidir.Yarn ara disk G / Ç'sinin uzun süre meşgul olduğunu ve Yarn ara diskinin de HDFS verisi olduğunu kontrol etmek için iostat -x 110 komutunu kullanın. Disk, yazma isteğinin yanıt vermemesine neden oluyor ve sonunda Storm çalışanlarının HDFS yazma zaman aşımına yol açarak kararsız topoloji çalışmasına neden oluyor. Yarn ara diskini yapılandırırken, işletim sistemi kök diskini kullanmamanız ve HDFS veri diskini kullanmamanız önerilir; bu, Storm yazma HDFS zaman aşımı sorununu etkili bir şekilde önleyebilir.

Yeni Passat'ı deneyimlemek için mağazaya git, bu arabayı satın almamak için bana bir sebep ver
önceki
Sahipler lütfen yer imlerine ekleyin! Arabanın bakımını kendiniz yapın, bu sekiz ipucunun kullanımı çok kolay!
Sonraki
E-ticarette birçok büyük veri pazarlama rutini vardır.Taobao etek satın almak için "insanları görmeli ve yemek yemeli" mi?
UP ana ilem, sıradan otaku, sıradışı iki boyutlu kimlik | Fikir sahibi 100 kişi
Çerçevesiz pencerelerin "zehiri", 300.000 yerden tavana arabanın, Volkswagen CC
İlk yıllarda "The Hunt" dan son yıllarda "The Legend of Demon Cat" e kadar Çin ve Japon filmlerinin karşılıklı öğrenimi ve etkisi nedir?
Ulusal rekabeti kıyaslayın! Profesyonel Beceri Yarışması CNC Torna Montaj ve Bakım Finalleri Yapıldı
"Wufeng Lin Ailesi Tarihi Özel Sergisi" tarihi hatırlıyor ve boğazlar arası entegrasyonu teşvik ediyor
Açılır tavanı olan bir arabaya daha fazla para harcamaya değer mi?
Eylemde reformu derinleştirmek | Merkez eksenin miras uygulamasına yardımcı olan Beijing Jingshan Park, tüm manzarayı ortaya çıkaracak
Yaşlı şoför yürekten şöyle dedi: Neden sana arabayı ödünç veremiyorum?
Qingdao IKEA, Mart 2020'de inşaata başladı ve faaliyete geçti
Bugün, Pekin-Zhangjiakou Yüksek Hızlı Demiryolunun yeni Badaling Tüneli başarıyla tamamlandı!
Cinsel taciz karşısında Siri ve Alexa gibi yapay zeka asistanları nasıl tepki veriyor?
To Top