Açık kaynak Çin OSC başlık numarasını takip edin ve en son teknik bilgileri alın
Yukarıda milyon düzeyinde bir mesaj itme sistemi tasarlarken, Kafka'nın mesaj akışı için ara katman olarak kullanıldığından bahsedilmektedir.
Aralarından bir arkadaş, büyük miktarda haber olması durumunda Kakfa'nın haberlerin etkinliğini ve tutarlılığını nasıl garanti ettiğini sordu.
Mesajların nasıl doğru ve verimli bir şekilde gönderileceğini tartışmak için bu soruyu Kakfa'nın kaynak koduyla birlikte kullanır.
Daha fazla içerik var, kaynak koduyla ilgilenen arkadaşlar, lütfen emniyet kemeri takın (kaynak kodu v0.10.0.0 sürümünün analizine dayanmaktadır). Aynı zamanda, Kafka'yı kullanma konusunda biraz deneyim sahibi olmak ve temel kullanımı bilmek en iyisidir.
Analiz etmeden önce, basit bir mesaj göndermenin nasıl bir şey olduğunu görelim.
Aşağıdaki kod, SpringBoot'u temel alır.
Önce org.apache.kafka.clients.producer.Producer'dan bir bean oluşturun.
Esas olarak gerekli bir parametre olan bootstrap.servers'a odaklanın. Kafka kümesindeki aracı adresini ifade eder, örneğin 127.0.0.1:9094.
Kalan parametreler şimdilik tartışılmayacak, daha sonra ayrıntılı olarak tanıtılacaktır.
Ardından, mesaj göndermek üzere gönderme işlevini çağırmak için bu fasulyeyi enjekte edin.
Burada belirli bir konuya 10W'lık veri gönderdim ve çalışan program mesajları normal olarak gönderildi.
Ancak bu yalnızca mesaj göndermedir ve mesaj tamamen göz ardı edilir, bu da saf bir eşzamansız yönteme eşdeğerdir.
Senkronize et
Bu yüzden mesaj başarıyla gönderilirse ne yapmam gerektiğini bilmek istiyorum.
Aslında, Yapımcının API'si bizim için çoktan dikkate alındı.Gönderdikten sonra, gönderme sonucunu eşzamanlı olarak almak için sadece get () yöntemini çağırmamız gerekiyor.
Sonuç gönder:
Bu tür iletim verimliliği aslında nispeten düşüktür, çünkü her seferinde mesaj iletiminin sonucunu eşzamanlı olarak beklemek gerekir.
asenkron
Bu nedenle, eşzamansız bir şekilde göndermeliyiz Aslında, get () yöntemi manuel olarak çağrılmadığı sürece send () yöntemi varsayılan olarak eşzamansızdır.
Ancak bu şekilde gönderimin sonucunu bilmek imkansızdır.
Yani send () API'ye bakarak, başka bir parametrenin daha olduğunu görebilirsiniz.
Gelecek < RecordMetadata > gönder (Yapımcı Kaydı < K, V > yapımcı, Geri arama geri arama);Geri arama, mesaj gönderildikten sonra özelleştirilmiş uygulamamızı geri arayabilen bir geri arama arayüzüdür.
Yürütmeden sonraki sonuç:
Sonuç aynı şekilde elde edilebilir ve geri aramanın iş parçacığının yukarıdaki senkronizasyonun ana iş parçacığı olmadığı ve bunun da eşzamansız bir geri arama olduğunu kanıtlayabileceği bulunmuştur.
Aynı anda geri arama yapılırken iki parametre geçilecek:
Ancak, bu iki parametrenin aynı anda verisi yoktur, sadece gönderme başarısız olduğunda bir istisna mesajı olacaktır ve gönderen meta veri boş olacaktır.
Dolayısıyla doğru yazı şöyle olmalıdır:
Neden sadece bir parametrenin değerinin olduğuna gelince, aşağıdaki kaynak kod analizinde tek tek açıklanacaktır.
Şimdi sadece temel mesaj gönderme konusunda ustalaştım.Gönderirken bazı parametre yapılandırmalarını derinlemesine anlamak istiyorsanız, yine de kaynak kodunda son söz hakkına sahipsiniz.
Öncelikle tüm mesaj gönderme sürecinden bahsedelim Kafka mesajı brokere ağ üzerinden göndermez, Java ile optimize edilmiş ve tasarlanmıştır.
Gönderme süreci
Gönderme sürecini sezgisel olarak anlamak için, gönderme sürecindeki birkaç temel adım basitçe çizilir.
Baştan aşağı:
Adım analizi
Ardından, her adımı ayrıntılı olarak açıklayın.
başlatma
Bu inşa yöntemini başlatma için çağırırken, basitçe KafkaProducer'a temel parametreleri yazmaktan daha fazlasıdır. Daha zahmetli olan şey, Sender iş parçacığını arabellek tüketimi için başlatmaktır.
IO iş parçacığını başlatın:
Gönderen iş parçacığının aşağıdakiler gibi üye değişkenlere ihtiyaç duyduğunu görebilirsiniz:
acks, yeniden denemeler, requestTimeoutBekle, bu parametreler daha sonra analiz edilecek.
Serileştirilmiş mesaj
Aslında send () işlevini çağırdıktan sonraki ilk adım serileştirmedir Sonuçta mesajlarımızın ağ üzerinden Kafka'ya gönderilmesi gerekir.
ValueSerializer.serialize (record.topic (), record.value ()); bir arayüzdür ve başlatma sırasında serileştirme uygulama sınıfını belirtmemiz gerekir.
Sadece org.apache.kafka.common.serialization.Serializer arayüzünü uygulayarak serileştirmeyi kendimiz de gerçekleştirebiliriz.
Yönlendirme bölümü
Bir sonraki adım, bölümlerin yönlendirilmesidir.Kullandığımız Konu genellikle ölçeklenebilirlik ve yüksek performans için birden çok bölüm oluşturur.
Eğer bir bölüm ise, söylemesi kolay, içine tüm mesajlar yazılabilir.
Bununla birlikte, birden fazla bölümün kaçınılmaz olarak hangi bölüme yazılacağını bilmesi gerekir.
Genellikle üç yol vardır.
Özel bölüm
ProducerRecord oluştururken her mesaj için bölüm belirtebilirsiniz.
Bu şekilde yönlendirme sırasında bir atama olup olmadığına karar verilecek ve varsa bölüm direkt olarak kullanılacaktır.
Bu genellikle özel senaryolarda kullanılır.
Özel yönlendirme stratejisi
Herhangi bir bölüm belirtilmezse, partitioner.partition arabirimi özel bir bölüm stratejisi yürütmek için çağrılır.
Ve sadece org.apache.kafka.clients.producer.Partitioner arayüzünü uygulamak için bir sınıfı özelleştirmemiz ve bir KafkaProducer örneği oluştururken partitioner.class parametresini yapılandırmamız gerekir.
Genellikle bölümü özelleştirmeniz gerekir, genellikle mesajların sırasını olabildiğince sağlamaya çalışır.
Veya bazı benzersiz bölümlere yazılabilir ve özel tüketiciler tarafından işlenebilir.
Varsayılan strateji
Sonuncusu, hiçbir şey yapmazsak yürütülecek varsayılan yönlendirme stratejisidir.
Bu strateji aynı zamanda mesaj dağıtımını daha eşit hale getirecektir.
Uygulamasına bir göz atın:
Kısaca şu adımlara ayrılmıştır:
Aslında, bu tipik bir yoklama algoritmasıdır, bu nedenle bölümlerin sayısı sık sık değişmediği sürece, bu yöntem daha tek tip olacaktır.
Dahili önbelleğe yaz
Send () yöntemi bölümü aldıktan sonra, bir append () işlevi çağrılır:
Bu işlev, grupları dahili bir önbelleğe yazmak için bir getOrCreateDeque () çağırır.
Tüketim önbelleği
En başında başlatılan IO iş parçacığı, aslında bu verileri her zaman tüketen bir arka plan programı iş parçacığıdır.
Önceden yazılmış veriler, şekildeki çeşitli işlevlerle elde edilebilir. Bu içeriğe girmenize gerek yok, ancak çok önemli olan bir completeBatch yöntemi var.
Bu yöntem çağrıldığında mesajın gönderilmiş olması gerekir, bu nedenle send () yönteminde tanımladığımız geri çağrı arayüzünü tamamlamak için batch.done () çağrılacaktır.
İletim tamamlandıktan sonra yalnızca bir meta veri ve istisna bilgisinin görünmesinin neden söylendiği buradan da görülebilir.
Gönderim işlemi bittikten sonra, Producer'daki daha önemli parametrelere bir göz atalım.
acks
acks, mesaj verimini etkileyen önemli bir parametredir.
Esas olarak bu seçenekler vardır ve varsayılan 1'dir.
Kafka aktif / bekleme modunda değil, Zookeeper'a benzer aktif / bekleme modunda olduğundan.
Öncül, Konu yapılandırma kopyalarının sayısıdır > 1.
Acks = all / -1 olduğunda:
Bu, geri dönmeden önce tüm takipçi kopyalarının veri yazmayı tamamladığından emin olacağı anlamına gelir.
Bu, mesajın kaybolmamasını sağlar!
Ancak aynı zamanda performans ve verim en düşük seviyededir.
Acks = 0 olduğunda:
Yapımcı, mesajlarını kaybetmenin en kolay yolu olan kopyadan herhangi bir yanıt beklemeyecek, ancak aynı zamanda performans en iyisidir!
Acks = 1 olduğunda:
Bu uzlaşmacı bir çözümdür ve çoğaltma Liderinin yanıt vermesini bekleyecek, ancak izleyicinin yanıt vermesini beklemeyecektir.
Lider telefonu kapatır kapatmaz mesaj kaybolur. Ancak performans ve mesaj güvenliği garanti altına alınmıştır.
Parti boyutu
Bu parametre, dahili arabellek alanının boyut sınırı olarak adıyla anılabilir.Uygun şekilde ayarlanması verimi artırabilir.
Ancak aşırı olamaz, çok büyük bellek israfına neden olur. Küçükse işe yaramaz, aynı zamanda tipik bir zaman ve mekan değiş tokuşudur.
Yukarıdaki resim, çeşitli kullanımların bir tezahürüdür.
yeniden dener
yeniden denemeler Bu parametre esas olarak yeniden denemeler için kullanılır.Bazı ağ titreşimleri meydana geldiğinde, yeniden denemelere neden olur.
Bu parametre aynı zamanda yeniden deneme sayısını da sınırlar.
Ancak başka sorunlar da var.
Mesaj hacmi gerçekten çok büyükse ve mesajların bir an önce Kafka'ya gönderilmesi gerekiyorsa. Bir üretici her zaman önbelleğin boyutundan etkilenir.
Göndermek için birden fazla üretici oluşturmak mümkün mü?
Bu, büyük ve sık bir mesaj gönderme senaryosunda iletim verimliliğini artırabilir ve tek bir üreticinin baskısını azaltabilir.
Sonuncusu, Üreticinin kapatılmasıdır.Üretici kullanım sırasında çok fazla kaynak (iş parçacığı, bellek, ağ vb.) Tüketir, bu nedenle bu kaynakları geri kazanmak için açıkça kapatılması gerekir.
Hem varsayılan close () yöntemi hem de zaman aşımı süresi olan yöntem, belirli bir süre sonra kapanmaya zorlanır.
Ancak kalan görevler sona erme tarihinden önce işlenecektir.
Yani hangisinin kullanılacağı duruma bağlıdır.
Bu makale, Kafka üreticilerini örnekler ve kaynak kodlar açısından analiz eden çok sayıda içerik içermektedir.
Umarım kitabı okuyan arkadaşlar ödüllendirilir ve ayrıca tartışma için bir mesaj bırakabilirler.
Şaşırtıcı olmayan bir şekilde, Kafka tüketicileri bir sonraki sayıda tartışılacak.
Blog yazarı (genel hesap): crossoverJie
Günlük blog sütunu, her gün sizin için mükemmel blog yazarlarından yüksek kaliteli teknik makaleler önerir. Aynı zamanda, kullanıcılar katkıda bulunabilirler.Makale resmi hesaba dahil edildikten sonra, web sitesinin ana sayfasında önereceğiz. Dikkat Açık kaynak Çin OSC Her gün yüksek kaliteli itin, tıklayın " daha fazlasını anla "Orijinal makaleyi okuyun.