PySpark ve SparkSQL temelleri: Spark'ı çalıştırmak için Python programlamayı kullanma (kodla)

Yazar: Pinar Ersoy

Çeviri: Sun Taochun

Redaksiyon: Chen Zhendong

Bu makale hakkında 2500 kelime , Okumanız tavsiye edilir 10 dakika

Bu makale, Python'da Apache Spark uygulamasını sunarak veri işlemeye yönelik ortak işlevleri yürütmek için PySpark paketinin nasıl kullanılacağını açıklar.

Apache Spark, geliştiricilere eksiksiz kitaplıklar ve API'ler sağlayan ve Java, Python, R ve Scala dahil olmak üzere birden çok dili destekleyen bir küme bilgi işlem sistemidir. SparkSQL, yapılandırılmamış verileri DataFrame API yardımıyla işlemek için kullanılabilen bir Apache Spark modülüne eşdeğerdir.

PySpark adlı Spark Python API'si aracılığıyla Python, yapılandırılmış verileri işlemek için Spark programlama modelini uygular.

Bu makalenin amacı Spark'ın PySpark üzerinden nasıl çalıştırılacağını ve ortak işlevlerin nasıl yürütüleceğini göstermektir.

Python programlama dili, kurulu bir IDE gerektirir. En kolay yol, Python'u Anaconda aracılığıyla kullanmaktır, çünkü yeterli IDE paketi kurar ve diğer önemli paketlerle birlikte gelir.

1. Anaconda'yı indirin ve PySpark'ı kurun

Bu bağlantı üzerinden Anaconda'yı indirebilirsiniz. Windows, macOS ve Linux işletim sistemleri ile 64 bit / 32 bit grafik yükleyici türleri arasından seçim yapabilirsiniz. Python'un en son sürümünü yüklemenizi öneririz.

Anaconda kurulum sayfası (https://www.anaconda.com/distribution/)

Uygun Anaconda versiyonunu indirdikten sonra, kurmak için üzerine tıklayın Kurulum adımları Anaconda Dokümantasyonunda detaylandırılmıştır.

Kurulum tamamlandığında Anaconda Navigator Ana Sayfası açılacaktır. Yalnızca Python kullandığınız için, "Not Defteri" modülündeki "Başlat" düğmesini tıklamanız yeterlidir.

Anaconda Navigasyon Ana Sayfası

Spark'ı Anaconda'da kullanmak için lütfen aşağıdaki paket kurulum adımlarını izleyin.

Adım 1: Bilgisayarınızdan "Anaconda Prompt" terminalini açın.

Adım 2: Anaconda Prompt terminalinde "conda install pyspark" yazın ve PySpark paketini kurmak için Enter tuşuna basın.

Adım 3: Anaconda Prompt terminaline "conda install pyarrow" yazın ve PyArrow paketini kurmak için Enter tuşuna basın.

PySpark ve PyArrow paketleri kurulduğunda, sadece terminali kapatın, Jupyter Notebook'a geri dönün ve kodunuzun üst kısmından gerekli paketleri içe aktarın.

pandaları pdf olarak pyspark.sql olarak içe aktarın SparkSession'ı pyspark.context'den içe aktarın SparkContext'i pyspark.sql'den içe aktarın.

2. SparkSession'ı başlatın

Öncelikle, bir Kıvılcım Oturumu (SparkSession) başlatmanız gerekir. SparkSession yardımıyla, bir DataFrame bir tablo şeklinde oluşturulabilir ve kaydedilebilir. İkinci olarak, SQL tablolarını çalıştırabilir, tabloları önbelleğe alabilir ve belgeleri parquet / json / csv / avro veri formatında okuyabilirsiniz.

sc = SparkSession.builder.appName ("PysparkExample") \ .config ("spark.sql.shuffle.partitions", "50") \ .config ("spark.driver.maxResultSize", "5g") \ .config ( "spark.sql.execution.arrow.enabled", "true") \ .getOrCreate ()

SparkSession'ın her parametresinin ayrıntılı açıklaması için lütfen pyspark.sql.SparkSession adresini ziyaret edin.

3. Bir veri çerçevesi oluşturun

Bir DataFrame, ilişkisel bir veritabanındaki bir tabloya benzer şekilde, her sütun için başlıklara sahip dağıtılmış bir liste koleksiyonu olarak düşünülebilir. Bu makalede, veri setini işlerken PySpark API'deki DataFrame işlemini kullanacağız.

Kaggle veri kümesini https://www.kaggle.com/cmenca/new-york-times-hardcover-fiction-best-sellers adresinden indirebilirsiniz.

3.1, Spark veri kaynağından başlayın

DataFrame txt, csv, json ve parke dosya formatları okunarak oluşturulabilir. Bu makale örneğinde, dosyayı .json biçiminde kullanacağız.Ayrıca, metin, csv ve parke dosya biçimlerini bulmak ve okumak için aşağıda listelenen ilgili okuma işlevlerini de kullanabilirsiniz.

# Raw_data olarak adlandırılan bir kıvılcım veri çerçevesi oluşturur. # JSONdataframe = sc.read.json ('dataset / nyt2.json') # TXT FILES # dataframe_txt = sc.read.text ('text_data.txt') # CSV FILES # dataframe_csv = sc.read.csv ('csv_data.csv') # PARQUET FILES # dataframe_parquet = sc.read.load ('parquet_data.parquet')

4. Tekrarlanan değerler

Tablodaki yinelenen değerler dropDuplicates () işlevi kullanılarak elimine edilebilir.

dataframe = sc.read.json ('veri kümesi / nyt2.json') dataframe.show (10)

DropDuplicates () işlevini kullandıktan sonra, yinelenen değerlerin veri kümesinden kaldırıldığını görebiliriz.

dataframe_dropdup = dataframe.dropDuplicates () dataframe_dropdup.show (10)

5. Sorgu

Sorgu işlemleri, bir sütunun bir alt kümesini seçmek için "seçme", koşulları eklemek için "ne zaman" ve sütun içeriğini filtrelemek için "beğenme" gibi birden çok amaç için kullanılabilir. En sık kullanılan işlemlerin bazı örnekleri aşağıda verilmiştir. Sorgu işlemlerinin tam listesi için Apache Spark belgelerine bakın.

5.1, "Seç" işlemi

Sütunlar, öznitelikler ("yazar") veya dizin (veri çerçevesi) ile elde edilebilir.

# Başlık sütunundaki tüm girişleri gösterdataframe.select ("yazar"). Show (10) # Başlık, yazar, sıra, fiyat sütunlarındaki tüm girişleri gösterdataframe.select ("yazar", "başlık", "sıra", "fiyat" ). gösterisi (10)

İlk sonuç tablosu "yazar" sütunundaki sorgu sonuçlarını gösterir ve ikinci sonuç tablosu çok sütunlu sorguyu gösterir.

5.2, "Ne zaman" işlemi

İlk örnekte, "başlık" sütunu seçilmiştir ve bir "ne zaman" koşulu eklenmiştir.

# Başlığı gösterin ve başlık çerçevesine bağlı olarak 0 veya 1 atayın ("başlık", ne zaman (dataframe.title! = 'TEK SAAT', 1). Aksi takdirde (0)). Göster (10)

Belirli koşullar altında 10 satır veri göster

İkinci örnekte, "ne zaman" yerine "isin" işlemi kullanılır, ayrıca satırlar için bazı koşulları tanımlamak için de kullanılabilir.

# Verilen seçeneklerdataframe .show (5) içindeyse, belirtilen yazarların bulunduğu satırları göster

Belirli koşullar altında 5 satırlık sonuç kümesi

5.3, "Beğen" işlemi

"Beğen" işlev parantezlerinde,% operatörü "THE" kelimesini içeren tüm başlıkları filtrelemek için kullanılır. Aradığımız koşul tam bir eşleşme ise,% operatörü kullanılmamalıdır.

# Başlığın titledataframe.select ("yazar", "başlık", dataframe.title.like ("% THE%")) içinde "THE" kelimesi varsa yazarı ve başlığı DOĞRU olarak göster. Göster (15)

Başlık sütununda "THE" kelimesini içeren yargı sonuç kümesi

5.4, "startswith" - "endswith"

StartsWith, taramanın parantez içindeki belirli kelime / içeriğin konumundan başlatılmasını belirtir. Benzer şekilde EndsWith, kelimenin / içeriğin nerede bittiğini belirtir. Her iki işlev de büyük / küçük harfe duyarlıdır.

dataframe.select ("yazar", "başlık", dataframe.title.startswith ("THE")). show (5) dataframe.select ("yazar", "başlık", dataframe.title.endswith ("NT") ). gösterisi (5)

5 satırlık veri üzerinde işlem ile başlar ve biter işleminin sonucu.

5.5, "alt dize" işlemi

Substring'in işlevi, metni belirli bir dizinin ortasından çıkarmaktır. Aşağıdaki örnekte, metin (1,3), (3,6) ve (1,6) dizin numaralarından çıkarılmıştır.

dataframe.select (dataframe.author.substr (1, 3) .alias ("title")). show (5) dataframe.select (dataframe.author.substr (3, 6) .alias ("başlık")). show (5) dataframe.select (dataframe.author.substr (1, 6) .alias ("title")). show (5)

Sırasıyla (1,3), (3,6), (1,6) alt dizelerinin sonuçlarını göster

6. Sütun ekleyin, değiştirin ve silin

DataFrame API'de veri işleme işlevleri de vardır. Ardından, sütun ekleme / değiştirme / silme örnekleri bulabilirsiniz.

6.1, sütun ekle

Exactvalues.dataframe = dataframe.withColumn ('new_column', F.lit ('Bu yeni bir sütun')) ekranı (dataframe) ile sütunlar oluştururken # Lit () gereklidir

Veri kümesinin sonuna yeni bir sütun eklendi

6.2, sütunu değiştirin

DataFrame API'sinin yeni sürümü için withColumnRenamed () işlevi iki parametre ile kullanılır.

# Column'amazon_product_url 'öğesini'URL'dataframe = dataframe.withColumnRenamed (' amazon_product_url ',' URL ') dataframe.show (5) ile güncelleyin

"Amazon_Product_URL" sütun adı "URL" olarak değiştirildi

6.3, sütunu sil

Sütun silme işlemi iki şekilde gerçekleştirilebilir: drop () işlevinde bir grup sütun adı ekleyin veya drop işlevinde belirli bir sütun belirtin. Aşağıda iki örnek gösterilmektedir.

dataframe_remove = dataframe.drop ("yayıncı", "yayınlanan_tarih"). show (5) dataframe_remove2 = dataframe \ .drop (dataframe.publisher) .drop (dataframe.published_date) .show (5)

"Publisher" ve "publisher_date" sütunları iki farklı yoldan kaldırılır.

7. Veri incelemesi

Veri incelemesi için birkaç tür işlev vardır. Ardından, yaygın olarak kullanılan bazı işlevleri bulabilirsiniz. Daha fazla bilgi edinmek için Apache Spark belgesini ziyaret etmeniz gerekir.

# Dataframe sütun adlarını ve veri türlerini döndürürdataframe.dtypes # Dataframedataframe.show () içeriğini görüntüler # İlk n rowsdataframe.head () döndürür # İlk satırdataframe.first () döndürür # İlk n satırdataframe.take (5) # Hesaplar özeti Statisticsdataframe.describe (). show () # Dataframedataframe.columns sütunlarını döndürür # Dataframedataframe.count () içindeki satırların sayısını sayar # Dataframedataframe.distinct () içindeki farklı satırların sayısını sayar. count () # Fiziksel dahil planları yazdırır ve mantıksaldataframe.explain (4)

8. "GroupBy" işlemi

GroupBy () işlevi aracılığıyla, veri sütunu belirtilen işleve göre toplanır.

# Yazara göre grupla, groupsdataframe.groupBy ("yazar"). Count (). Show (10) içindeki yazarların kitaplarını say

Yazarlar, yayınlanan kitap sayısına göre gruplandırılır

9. "Filtre" işlemi

Filter () işlevini kullanarak, işlevde filtreleme uygulamak için koşullu parametreler ekleyin. Bu işlev büyük / küçük harfe duyarlıdır.

# Başlık girişlerini filtreleme # Yalnızca 'HOST' dataframe.filter (dataframe == 'THE HOST') değerine sahip kayıtları tutar. Show (5)

Başlık çubuğu filtrelendikten sonra, yalnızca "THE HOST" içeriği bulunur ve 5 sonuç görüntülenir.

10. Eksik ve değiştirilen değerler

Her bir veri kümesi için, veri ön işleme aşamasında mevcut değerleri değiştirmek, gereksiz sütunları atmak ve eksik değerleri doldurmak genellikle gereklidir. Pyspark.sql.DataFrameNaFunction kitaplığı, bu bağlamda verileri işlememize yardımcı olur. Örnekler aşağıdaki gibidir.

# Boş değerleri değiştirmedataframe.na.fill () dataFrame.fillna () dataFrameNaFunctions.fill () # Boş değerli satırları sınırlayan yeni dataframe döndürmedataframe.na.drop () dataFrame.dropna () dataFrameNaFunctions.drop () # Yeni dataframe değiştirerek döndür otherdataframe.na.replace (5, 15) dataFrame.replace () dataFrameNaFunctions.replace () ile bir değer

11. Yeniden Bölümleme

RDD'de (Esnek Dağıtım Veri Kümesi) mevcut bölümlerin düzeyini artırmak veya azaltmak mümkündür. Yeni RDD'nin aynı / daha yüksek sayıda bölüm elde etmesini sağlayan bölümleri artırmak için yeniden bölümlemeyi (self, numPartitions) kullanın. Bölüm azaltma, yeni RDD'nin daha az sayıda bölüme sahip olmasını sağlayan birleştirme (self, numPartitions, shuffle = False) işlevi ile gerçekleştirilebilir (bu belirli bir değerdir). Daha fazla bilgi için lütfen Apache Spark belgesini ziyaret edin.

# 10 partitiondataframe.repartition (10) .rdd.getNumPartitions () ile Dataframe # 1 partitiondataframe.coalesce (1) .rdd.getNumPartitions () içeren Dataframe

12. Gömülü SQL sorgusu

Orijinal SQL sorgusu, SparkSession'ımızdaki "sql" işlemi aracılığıyla da kullanılabilir.Bu SQL sorgusunun işlemi gömülüdür ve DataFrame formatında bir sonuç kümesi döndürür. Daha ayrıntılı bilgi için lütfen Apache Spark belgesini ziyaret edin.

# Bir tabledataframe.registerTempTable ("df") kaydetme sc.sql ("df'den * seçin"). Show (3) sc.sql ("açıklama GİBİ '% aşk OLDUĞUNDA'% love% 'THEN'Love_Theme' \ WHEN açıklama GİBİ '% nefret%' THEN'Hate_Theme '\ AÇIKLAMA GİBİ'% mutlu% 'THEN'Happiness_Theme' \ WHEN açıklama GİBİ '% anger%' THEN'Anger_Theme '\ WHEN açıklama GİBİ'% korku% 'THEN'Horror_Theme' \ AÇIKLAMA '% death%' THEN'Criminal_Theme 'GİBİ NE ZAMAN açıklama'% detective% 'THEN'Mystery_Theme' \ ELSE'Other_Themes '\ END Temalar \ df'den "). GroupBy (' Temalar ']. Count (). göstermek()

13, çıktı

13.1 Veri yapısı

DataFrame API, SQL sorgu ifadelerini düşük seviyeli RDD işlevlerine dönüştürmek için temel olarak RDD'yi kullanır. .Rdd işlemi kullanılarak, bir veri çerçevesi RDD'ye dönüştürülebilir ve Spark Dataframe ayrıca RDD ve Pandas biçiminde bir dizeye dönüştürülebilir.

# Dataframe'i RDDrdd_convert = dataframe.rdd'ye dönüştürme # Dataframe'i stringdataframe.toJSON () 'un bir RDD'sine dönüştürme. First () # PandasdataFramedataframe.toPandas () olarak df içeriğini elde etme

Farklı veri yapılarının sonuçları

13.2 Dosyaya yazın ve kaydedin

Veri çerçevesi gibi kodumuza yüklenebilen herhangi bir veri kaynağı türü kolayca dönüştürülebilir ve .parquet ve .json dahil diğer dosya türlerine kaydedilebilir. Kaydetme, yükleme ve yazma işlevleri hakkında daha fazla ayrıntı için lütfen Apache Spark belgesini ziyaret edin.

# Dosyayı .parquet formatında yazın ve kaydedindataframe.select ("yazar", "başlık", "sıralama", "açıklama") \ .write \ .save ("Rankings_Descriptions.parquet")

.Write.save () işlevi işlendiğinde, Parquet dosyasının oluşturulduğunu görebilirsiniz.

# Dosyayı .json formatında yazın ve kaydedindataframe.select ("yazar", "başlık") \ .write \ .save ("Authors_Titles.json", format = "json")

.Write.save () işlevi işlendiğinde, JSON dosyasının oluşturulduğunu görebilirsiniz.

13.3, SparkSession'ı durdurun

Spark oturumu, stop () işlevi aşağıdaki gibi çalıştırılarak durdurulabilir.

# End Spark Sessionsc.stop ()

Kod ve Jupyter Not Defteri GitHub'ımda bulunabilir.

Soru ve yorumlarınızı bekliyoruz!

Referanslar:

1.

2. https://docs.anaconda.com/anaconda/

Orjinal başlık:

PySpark ve SparkSQL Temelleri

Spark'ı Python Programlama ile uygulama

Orijinal bağlantı:

https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

Editör: Yu Tengkai

Redaksiyon: Hong Shuyue

Çevirmen Profili

Sun Taochun , Capital Normal Üniversitesi'nde uzaktan algılama bilimi ve teknolojisi alanında son sınıf öğrencisi. Şu anda, temel bilginin ustalık ve gelişimine odaklanıyor ve gelecekte veri bilimi uygulamalarının birçok olasılığını keşfetme fırsatına sahip olmayı umuyor. Hobilerimden biri de çeviri oluşturma ... Boş zamanlarımda sizinle iletişim kurmak, paylaşmak ve birlikte ilerlemek umuduyla THU Data Pie platformundaki çeviri gönüllüleri grubuna katılıyorum.

-Bitiş-

Tsinghua-Qingdao Veri Bilimi Enstitüsü'nün resmi WeChat kamu platformunu takip edin " THU Veri Pastası "Ve kız kardeş numarası" Veri Pastası THU "Daha fazla ders avantajı ve kaliteli içerik elde edin.

400'den fazla gazete dolandırıcılığa maruz kaldı, Çin'in Shandong kentinde yoğunlaştı, düzinelerce hastane karıştı
önceki
Çin'in yüz verileriyle ilgili ilk ceza davası: 3D avatar yapmak için yüz verilerini yasadışı olarak çalmak
Sonraki
LSTM'nin babası 2010-2020'yi yayınlıyor, gözlerimde derin öğrenmenin kısa bir tarihi
Yeni koronavirüs pnömonisinden etkilenen 1183 yardım arayanların veri portreleri
İstatistiksel bilginin özeti (çerçeve diyagram formülü ile)
Salgınla karşı karşıya kalan tıbbi robotlar, sağlık personelinin enfeksiyonlarını nasıl önler ve tedavi eder?
Vulcan Mountain Hospital'ın Çin hızı Li Chi D-Hub hızla yardımcı oluyor
Derin öğrenmede anlaşılması gereken 13 olasılık dağılımı (bağlantı ile)
Tsinghua Üniversitesi, Doğa Üzerine Mikroelektronik Enstitüsü'nün yeni araştırması olan CNN'i uygulamak için memristörleri tam olarak kullanın
Yeni taç virüsü Ermenistan'da ortaya çıkarsa, programcılar simülasyon için Python kullanır (kod eklenmiştir)
Üretken modeller ve GAN'lar nelerdir? Bilgisayar vizyonunun büyüsünü deneyimlemek için bir makale (bağlantı ile)
Kaçak pangolinler koronavirüs taşır ve 2019-nCoV'a oldukça benzerdir
Wu Enda AI öğrenme rotası, ML, DL ve diğer kurslar ve kaynak önerileri! "Ekli bağlantı"
Sorumlu planlayıcı, elinde bir resim ve bir dip ile eski şehir için bir "önleme ve kontrol haritası" çizer.
To Top