Günlük blog | Kafka üreticisini kaynak kodu analizinden zarif bir şekilde kullanma

Açık kaynak Çin OSC başlık numarasını takip edin ve en son teknik bilgileri alın

Önsöz

Yukarıda milyon düzeyinde bir mesaj itme sistemi tasarlarken, Kafka'nın mesaj akışı için ara katman olarak kullanıldığından bahsedilmektedir.

Aralarından bir arkadaş, büyük miktarda haber olması durumunda Kakfa'nın haberlerin etkinliğini ve tutarlılığını nasıl garanti ettiğini sordu.

Mesajların nasıl doğru ve verimli bir şekilde gönderileceğini tartışmak için bu soruyu Kakfa'nın kaynak koduyla birlikte kullanır.

Daha fazla içerik var, kaynak koduyla ilgilenen arkadaşlar, lütfen emniyet kemeri takın (kaynak kodu v0.10.0.0 sürümünün analizine dayanmaktadır). Aynı zamanda, Kafka'yı kullanma konusunda biraz deneyim sahibi olmak ve temel kullanımı bilmek en iyisidir.

Basit mesaj gönderme

Analiz etmeden önce, basit bir mesaj göndermenin nasıl bir şey olduğunu görelim.

Aşağıdaki kod, SpringBoot'u temel alır.

Önce org.apache.kafka.clients.producer.Producer'dan bir bean oluşturun.

Esas olarak gerekli bir parametre olan bootstrap.servers'a odaklanın. Kafka kümesindeki aracı adresini ifade eder, örneğin 127.0.0.1:9094.

Kalan parametreler şimdilik tartışılmayacak, daha sonra ayrıntılı olarak tanıtılacaktır.

Ardından, mesaj göndermek üzere gönderme işlevini çağırmak için bu fasulyeyi enjekte edin.

Burada belirli bir konuya 10W'lık veri gönderdim ve çalışan program mesajları normal olarak gönderildi.

Ancak bu yalnızca mesaj göndermedir ve mesaj tamamen göz ardı edilir, bu da saf bir eşzamansız yönteme eşdeğerdir.

Senkronize et

Bu yüzden mesaj başarıyla gönderilirse ne yapmam gerektiğini bilmek istiyorum.

Aslında, Yapımcının API'si bizim için çoktan dikkate alındı.Gönderdikten sonra, gönderme sonucunu eşzamanlı olarak almak için sadece get () yöntemini çağırmamız gerekiyor.

Sonuç gönder:

Bu tür iletim verimliliği aslında nispeten düşüktür, çünkü her seferinde mesaj iletiminin sonucunu eşzamanlı olarak beklemek gerekir.

asenkron

Bu nedenle, eşzamansız bir şekilde göndermeliyiz Aslında, get () yöntemi manuel olarak çağrılmadığı sürece send () yöntemi varsayılan olarak eşzamansızdır.

Ancak bu şekilde gönderimin sonucunu bilmek imkansızdır.

Yani send () API'ye bakarak, başka bir parametrenin daha olduğunu görebilirsiniz.

Gelecek < RecordMetadata > gönder (Yapımcı Kaydı < K, V > yapımcı, Geri arama geri arama);

Geri arama, mesaj gönderildikten sonra özelleştirilmiş uygulamamızı geri arayabilen bir geri arama arayüzüdür.

Yürütmeden sonraki sonuç:

Sonuç aynı şekilde elde edilebilir ve geri aramanın iş parçacığının yukarıdaki senkronizasyonun ana iş parçacığı olmadığı ve bunun da eşzamansız bir geri arama olduğunu kanıtlayabileceği bulunmuştur.

Aynı anda geri arama yapılırken iki parametre geçilecek:

  • RecordMetadata ve meta veriler başarıyla gönderildikten sonra yukarıdaki mesajla tutarlı.
  • İstisna Mesajın gönderilmesi sırasında istisna bilgisi.

Ancak, bu iki parametrenin aynı anda verisi yoktur, sadece gönderme başarısız olduğunda bir istisna mesajı olacaktır ve gönderen meta veri boş olacaktır.

Dolayısıyla doğru yazı şöyle olmalıdır:

Neden sadece bir parametrenin değerinin olduğuna gelince, aşağıdaki kaynak kod analizinde tek tek açıklanacaktır.

Kaynak kod analizi

Şimdi sadece temel mesaj gönderme konusunda ustalaştım.Gönderirken bazı parametre yapılandırmalarını derinlemesine anlamak istiyorsanız, yine de kaynak kodunda son söz hakkına sahipsiniz.

Öncelikle tüm mesaj gönderme sürecinden bahsedelim Kafka mesajı brokere ağ üzerinden göndermez, Java ile optimize edilmiş ve tasarlanmıştır.

Gönderme süreci

Gönderme sürecini sezgisel olarak anlamak için, gönderme sürecindeki birkaç temel adım basitçe çizilir.

Baştan aşağı:

  • Kafka-yapımcı-ağ-iş parçacığı IO iş parçacığı, mesajları başlatan ve gerçekten mesaj gönderiyor.
  • Mesajı seri hale getirin.
  • Gönderilmesi gereken bölümü alın.
  • Dahili bir arabellek alanına yazın.
  • Başlatılan IO iş parçacığı, mesaj göndermek için bu önbelleği sürekli olarak kullanır.

Adım analizi

Ardından, her adımı ayrıntılı olarak açıklayın.

başlatma

Bu inşa yöntemini başlatma için çağırırken, basitçe KafkaProducer'a temel parametreleri yazmaktan daha fazlasıdır. Daha zahmetli olan şey, Sender iş parçacığını arabellek tüketimi için başlatmaktır.

IO iş parçacığını başlatın:

Gönderen iş parçacığının aşağıdakiler gibi üye değişkenlere ihtiyaç duyduğunu görebilirsiniz:

acks, yeniden denemeler, requestTimeout

Bekle, bu parametreler daha sonra analiz edilecek.

Serileştirilmiş mesaj

Aslında send () işlevini çağırdıktan sonraki ilk adım serileştirmedir Sonuçta mesajlarımızın ağ üzerinden Kafka'ya gönderilmesi gerekir.

ValueSerializer.serialize (record.topic (), record.value ()); bir arayüzdür ve başlatma sırasında serileştirme uygulama sınıfını belirtmemiz gerekir.

Sadece org.apache.kafka.common.serialization.Serializer arayüzünü uygulayarak serileştirmeyi kendimiz de gerçekleştirebiliriz.

Yönlendirme bölümü

Bir sonraki adım, bölümlerin yönlendirilmesidir.Kullandığımız Konu genellikle ölçeklenebilirlik ve yüksek performans için birden çok bölüm oluşturur.

Eğer bir bölüm ise, söylemesi kolay, içine tüm mesajlar yazılabilir.

Bununla birlikte, birden fazla bölümün kaçınılmaz olarak hangi bölüme yazılacağını bilmesi gerekir.

Genellikle üç yol vardır.

Özel bölüm

ProducerRecord oluştururken her mesaj için bölüm belirtebilirsiniz.

Bu şekilde yönlendirme sırasında bir atama olup olmadığına karar verilecek ve varsa bölüm direkt olarak kullanılacaktır.

Bu genellikle özel senaryolarda kullanılır.

Özel yönlendirme stratejisi

Herhangi bir bölüm belirtilmezse, partitioner.partition arabirimi özel bir bölüm stratejisi yürütmek için çağrılır.

Ve sadece org.apache.kafka.clients.producer.Partitioner arayüzünü uygulamak için bir sınıfı özelleştirmemiz ve bir KafkaProducer örneği oluştururken partitioner.class parametresini yapılandırmamız gerekir.

Genellikle bölümü özelleştirmeniz gerekir, genellikle mesajların sırasını olabildiğince sağlamaya çalışır.

Veya bazı benzersiz bölümlere yazılabilir ve özel tüketiciler tarafından işlenebilir.

Varsayılan strateji

Sonuncusu, hiçbir şey yapmazsak yürütülecek varsayılan yönlendirme stratejisidir.

Bu strateji aynı zamanda mesaj dağıtımını daha eşit hale getirecektir.

Uygulamasına bir göz atın:

Kısaca şu adımlara ayrılmıştır:

  • Konu bölümlerinin sayısını alın.
  • Dahili olarak tutulan bir iplik güvenlik sayacı +1.
  • Bölüm numarası, bölüm sayısı modulo ile elde edilir.

Aslında, bu tipik bir yoklama algoritmasıdır, bu nedenle bölümlerin sayısı sık sık değişmediği sürece, bu yöntem daha tek tip olacaktır.

Dahili önbelleğe yaz

Send () yöntemi bölümü aldıktan sonra, bir append () işlevi çağrılır:

Bu işlev, grupları dahili bir önbelleğe yazmak için bir getOrCreateDeque () çağırır.

Tüketim önbelleği

En başında başlatılan IO iş parçacığı, aslında bu verileri her zaman tüketen bir arka plan programı iş parçacığıdır.

Önceden yazılmış veriler, şekildeki çeşitli işlevlerle elde edilebilir. Bu içeriğe girmenize gerek yok, ancak çok önemli olan bir completeBatch yöntemi var.

Bu yöntem çağrıldığında mesajın gönderilmiş olması gerekir, bu nedenle send () yönteminde tanımladığımız geri çağrı arayüzünü tamamlamak için batch.done () çağrılacaktır.

İletim tamamlandıktan sonra yalnızca bir meta veri ve istisna bilgisinin görünmesinin neden söylendiği buradan da görülebilir.

Üretici parametre analizi

Gönderim işlemi bittikten sonra, Producer'daki daha önemli parametrelere bir göz atalım.

acks

acks, mesaj verimini etkileyen önemli bir parametredir.

Esas olarak bu seçenekler vardır ve varsayılan 1'dir.

Kafka aktif / bekleme modunda değil, Zookeeper'a benzer aktif / bekleme modunda olduğundan.

Öncül, Konu yapılandırma kopyalarının sayısıdır > 1.

Acks = all / -1 olduğunda:

Bu, geri dönmeden önce tüm takipçi kopyalarının veri yazmayı tamamladığından emin olacağı anlamına gelir.

Bu, mesajın kaybolmamasını sağlar!

Ancak aynı zamanda performans ve verim en düşük seviyededir.

Acks = 0 olduğunda:

Yapımcı, mesajlarını kaybetmenin en kolay yolu olan kopyadan herhangi bir yanıt beklemeyecek, ancak aynı zamanda performans en iyisidir!

Acks = 1 olduğunda:

Bu uzlaşmacı bir çözümdür ve çoğaltma Liderinin yanıt vermesini bekleyecek, ancak izleyicinin yanıt vermesini beklemeyecektir.

Lider telefonu kapatır kapatmaz mesaj kaybolur. Ancak performans ve mesaj güvenliği garanti altına alınmıştır.

Parti boyutu

Bu parametre, dahili arabellek alanının boyut sınırı olarak adıyla anılabilir.Uygun şekilde ayarlanması verimi artırabilir.

Ancak aşırı olamaz, çok büyük bellek israfına neden olur. Küçükse işe yaramaz, aynı zamanda tipik bir zaman ve mekan değiş tokuşudur.

Yukarıdaki resim, çeşitli kullanımların bir tezahürüdür.

yeniden dener

yeniden denemeler Bu parametre esas olarak yeniden denemeler için kullanılır.Bazı ağ titreşimleri meydana geldiğinde, yeniden denemelere neden olur.

Bu parametre aynı zamanda yeniden deneme sayısını da sınırlar.

Ancak başka sorunlar da var.

  • Yeniden iletim olduğu için mesajların sırası tutarlı olmayabilir Bu aynı zamanda yukarıda belirtilen durumdur, bölümlenmiş bir mesaj bile tam sırayla olmayacaktır.
  • Veya ağ sorunları nedeniyle, orijinal mesaj başarıyla yazılmıştır, ancak üreticiye başarılı bir yanıt verilmez ve mesaj yeniden denenirken tekrarlanabilir. Bu yalnızca tüketiciler için idempotent olabilir.

Etkili teslimat yöntemi

Mesaj hacmi gerçekten çok büyükse ve mesajların bir an önce Kafka'ya gönderilmesi gerekiyorsa. Bir üretici her zaman önbelleğin boyutundan etkilenir.

Göndermek için birden fazla üretici oluşturmak mümkün mü?

  • Maksimum sayıda üretici yapılandırın.
  • Bir mesaj gönderirken, önce bir üretici alın ve elde edilirken maksimum limite ulaşılıp ulaşılmadığını belirleyin, yoksa yeni bir tane oluşturun ve aynı zamanda dahili Listeye kaydedin.Kaydederken, eşzamanlılık sorunlarını önlemek için senkronizasyon işlemini gerçekleştirin.
  • Göndereni alırken, varsayılan bölümleme stratejisine göre yoklama ile elde edilebilir (tek tip kullanım sağlamak için).

Bu, büyük ve sık bir mesaj gönderme senaryosunda iletim verimliliğini artırabilir ve tek bir üreticinin baskısını azaltabilir.

Yapımcıyı Kapat

Sonuncusu, Üreticinin kapatılmasıdır.Üretici kullanım sırasında çok fazla kaynak (iş parçacığı, bellek, ağ vb.) Tüketir, bu nedenle bu kaynakları geri kazanmak için açıkça kapatılması gerekir.

Hem varsayılan close () yöntemi hem de zaman aşımı süresi olan yöntem, belirli bir süre sonra kapanmaya zorlanır.

Ancak kalan görevler sona erme tarihinden önce işlenecektir.

Yani hangisinin kullanılacağı duruma bağlıdır.

sonuç olarak

Bu makale, Kafka üreticilerini örnekler ve kaynak kodlar açısından analiz eden çok sayıda içerik içermektedir.

Umarım kitabı okuyan arkadaşlar ödüllendirilir ve ayrıca tartışma için bir mesaj bırakabilirler.

Şaşırtıcı olmayan bir şekilde, Kafka tüketicileri bir sonraki sayıda tartışılacak.

Blog yazarı (genel hesap): crossoverJie

Günlük blog sütunu, her gün sizin için mükemmel blog yazarlarından yüksek kaliteli teknik makaleler önerir. Aynı zamanda, kullanıcılar katkıda bulunabilirler.Makale resmi hesaba dahil edildikten sonra, web sitesinin ana sayfasında önereceğiz. Dikkat Açık kaynak Çin OSC Her gün yüksek kaliteli itin, tıklayın " daha fazlasını anla "Orijinal makaleyi okuyun.

ID 42'nin başında tebrikler! Asayiş Bakanlığı derhal tam olarak uygulanacağını resmen açıkladı!
önceki
Bir nükleer ülke yenilirse, çılgınca nükleer silahlar fırlatmayı ve dünyayla birlikte ölmeyi seçecekler mi?
Sonraki
Che Yun Morning News | Geely ve Ningde Times işbirliği, Renault Jiangling yeni enerji pazarına girmek için güçlerini birleştirdi
İyi metin çevirisi Grafik veri yapısına giriş
Call of Duty IV End of the World Operation Beijing (BJ) 40 PLUS ikilisi Huanxin listelendi
Günlük Blog MySQL Genel Sorgu Günlüğü ve Yavaş Sorgu Günlüğü Analizi
İyi metin çevirisi BeautifulSoup ve Selenium ile web kazıma
Pilotları işe alırken hiçbir yara izine izin verilmez Yaralı pilot gökyüzüne uçmaya devam edebilir mi?
Çukurda Apple kablosuz kulaklık airpods deneyimi: yenilmez rahatlık, tek kelime "harika"!
Iwei Yumurtalı Rulo Bileklik Değerlendirme: Tüm işlevler yeterince şık, akıllı bileklik maliyet performansı için ilk tercihtir
Airmaster A380, Avrupalıların gururudur 15 yıldan az bir süre sonra neden iflas ilan etti?
İspanyol Binasını 100 milyonu aşkın kayıpla satmak Wang Jianlin'in denizaşırı "küçük hedefi" hayal kırıklığına uğramış.
Araçların İnterneti sektörünün dönüşümünü hızlandıran beş unsur ve tarihi fırsatlar ortaya çıktı
Açık kaynak yazarlar JD'nin ağır projelerinin intihalini kınadılar
To Top