Yazar: Pinar Ersoy
Çeviri: Sun Taochun
Redaksiyon: Chen Zhendong
Bu makale hakkında 2500 kelime , Okumanız tavsiye edilir 10 dakika
Bu makale, Python'da Apache Spark uygulamasını sunarak veri işlemeye yönelik ortak işlevleri yürütmek için PySpark paketinin nasıl kullanılacağını açıklar.
Apache Spark, geliştiricilere eksiksiz kitaplıklar ve API'ler sağlayan ve Java, Python, R ve Scala dahil olmak üzere birden çok dili destekleyen bir küme bilgi işlem sistemidir. SparkSQL, yapılandırılmamış verileri DataFrame API yardımıyla işlemek için kullanılabilen bir Apache Spark modülüne eşdeğerdir.
PySpark adlı Spark Python API'si aracılığıyla Python, yapılandırılmış verileri işlemek için Spark programlama modelini uygular.
Bu makalenin amacı Spark'ın PySpark üzerinden nasıl çalıştırılacağını ve ortak işlevlerin nasıl yürütüleceğini göstermektir.
Python programlama dili, kurulu bir IDE gerektirir. En kolay yol, Python'u Anaconda aracılığıyla kullanmaktır, çünkü yeterli IDE paketi kurar ve diğer önemli paketlerle birlikte gelir.
Bu bağlantı üzerinden Anaconda'yı indirebilirsiniz. Windows, macOS ve Linux işletim sistemleri ile 64 bit / 32 bit grafik yükleyici türleri arasından seçim yapabilirsiniz. Python'un en son sürümünü yüklemenizi öneririz.
Anaconda kurulum sayfası (https://www.anaconda.com/distribution/)
Uygun Anaconda versiyonunu indirdikten sonra, kurmak için üzerine tıklayın Kurulum adımları Anaconda Dokümantasyonunda detaylandırılmıştır.
Kurulum tamamlandığında Anaconda Navigator Ana Sayfası açılacaktır. Yalnızca Python kullandığınız için, "Not Defteri" modülündeki "Başlat" düğmesini tıklamanız yeterlidir.
Anaconda Navigasyon Ana Sayfası
Spark'ı Anaconda'da kullanmak için lütfen aşağıdaki paket kurulum adımlarını izleyin.
Adım 1: Bilgisayarınızdan "Anaconda Prompt" terminalini açın.
Adım 2: Anaconda Prompt terminalinde "conda install pyspark" yazın ve PySpark paketini kurmak için Enter tuşuna basın.
Adım 3: Anaconda Prompt terminaline "conda install pyarrow" yazın ve PyArrow paketini kurmak için Enter tuşuna basın.
PySpark ve PyArrow paketleri kurulduğunda, sadece terminali kapatın, Jupyter Notebook'a geri dönün ve kodunuzun üst kısmından gerekli paketleri içe aktarın.
pandaları pdf olarak pyspark.sql olarak içe aktarın SparkSession'ı pyspark.context'den içe aktarın SparkContext'i pyspark.sql'den içe aktarın.Öncelikle, bir Kıvılcım Oturumu (SparkSession) başlatmanız gerekir. SparkSession yardımıyla, bir DataFrame bir tablo şeklinde oluşturulabilir ve kaydedilebilir. İkinci olarak, SQL tablolarını çalıştırabilir, tabloları önbelleğe alabilir ve belgeleri parquet / json / csv / avro veri formatında okuyabilirsiniz.
sc = SparkSession.builder.appName ("PysparkExample") \ .config ("spark.sql.shuffle.partitions", "50") \ .config ("spark.driver.maxResultSize", "5g") \ .config ( "spark.sql.execution.arrow.enabled", "true") \ .getOrCreate ()SparkSession'ın her parametresinin ayrıntılı açıklaması için lütfen pyspark.sql.SparkSession adresini ziyaret edin.
Bir DataFrame, ilişkisel bir veritabanındaki bir tabloya benzer şekilde, her sütun için başlıklara sahip dağıtılmış bir liste koleksiyonu olarak düşünülebilir. Bu makalede, veri setini işlerken PySpark API'deki DataFrame işlemini kullanacağız.
Kaggle veri kümesini https://www.kaggle.com/cmenca/new-york-times-hardcover-fiction-best-sellers adresinden indirebilirsiniz.
3.1, Spark veri kaynağından başlayın
DataFrame txt, csv, json ve parke dosya formatları okunarak oluşturulabilir. Bu makale örneğinde, dosyayı .json biçiminde kullanacağız.Ayrıca, metin, csv ve parke dosya biçimlerini bulmak ve okumak için aşağıda listelenen ilgili okuma işlevlerini de kullanabilirsiniz.
# Raw_data olarak adlandırılan bir kıvılcım veri çerçevesi oluşturur. # JSONdataframe = sc.read.json ('dataset / nyt2.json') # TXT FILES # dataframe_txt = sc.read.text ('text_data.txt') # CSV FILES # dataframe_csv = sc.read.csv ('csv_data.csv') # PARQUET FILES # dataframe_parquet = sc.read.load ('parquet_data.parquet')Tablodaki yinelenen değerler dropDuplicates () işlevi kullanılarak elimine edilebilir.
dataframe = sc.read.json ('veri kümesi / nyt2.json') dataframe.show (10)DropDuplicates () işlevini kullandıktan sonra, yinelenen değerlerin veri kümesinden kaldırıldığını görebiliriz.
dataframe_dropdup = dataframe.dropDuplicates () dataframe_dropdup.show (10)Sorgu işlemleri, bir sütunun bir alt kümesini seçmek için "seçme", koşulları eklemek için "ne zaman" ve sütun içeriğini filtrelemek için "beğenme" gibi birden çok amaç için kullanılabilir. En sık kullanılan işlemlerin bazı örnekleri aşağıda verilmiştir. Sorgu işlemlerinin tam listesi için Apache Spark belgelerine bakın.
5.1, "Seç" işlemi
Sütunlar, öznitelikler ("yazar") veya dizin (veri çerçevesi) ile elde edilebilir.
# Başlık sütunundaki tüm girişleri gösterdataframe.select ("yazar"). Show (10) # Başlık, yazar, sıra, fiyat sütunlarındaki tüm girişleri gösterdataframe.select ("yazar", "başlık", "sıra", "fiyat" ). gösterisi (10)İlk sonuç tablosu "yazar" sütunundaki sorgu sonuçlarını gösterir ve ikinci sonuç tablosu çok sütunlu sorguyu gösterir.
5.2, "Ne zaman" işlemi
İlk örnekte, "başlık" sütunu seçilmiştir ve bir "ne zaman" koşulu eklenmiştir.
# Başlığı gösterin ve başlık çerçevesine bağlı olarak 0 veya 1 atayın ("başlık", ne zaman (dataframe.title! = 'TEK SAAT', 1). Aksi takdirde (0)). Göster (10)Belirli koşullar altında 10 satır veri göster
İkinci örnekte, "ne zaman" yerine "isin" işlemi kullanılır, ayrıca satırlar için bazı koşulları tanımlamak için de kullanılabilir.
# Verilen seçeneklerdataframe .show (5) içindeyse, belirtilen yazarların bulunduğu satırları gösterBelirli koşullar altında 5 satırlık sonuç kümesi
5.3, "Beğen" işlemi
"Beğen" işlev parantezlerinde,% operatörü "THE" kelimesini içeren tüm başlıkları filtrelemek için kullanılır. Aradığımız koşul tam bir eşleşme ise,% operatörü kullanılmamalıdır.
# Başlığın titledataframe.select ("yazar", "başlık", dataframe.title.like ("% THE%")) içinde "THE" kelimesi varsa yazarı ve başlığı DOĞRU olarak göster. Göster (15)Başlık sütununda "THE" kelimesini içeren yargı sonuç kümesi
5.4, "startswith" - "endswith"
StartsWith, taramanın parantez içindeki belirli kelime / içeriğin konumundan başlatılmasını belirtir. Benzer şekilde EndsWith, kelimenin / içeriğin nerede bittiğini belirtir. Her iki işlev de büyük / küçük harfe duyarlıdır.
dataframe.select ("yazar", "başlık", dataframe.title.startswith ("THE")). show (5) dataframe.select ("yazar", "başlık", dataframe.title.endswith ("NT") ). gösterisi (5)5 satırlık veri üzerinde işlem ile başlar ve biter işleminin sonucu.
5.5, "alt dize" işlemi
Substring'in işlevi, metni belirli bir dizinin ortasından çıkarmaktır. Aşağıdaki örnekte, metin (1,3), (3,6) ve (1,6) dizin numaralarından çıkarılmıştır.
dataframe.select (dataframe.author.substr (1, 3) .alias ("title")). show (5) dataframe.select (dataframe.author.substr (3, 6) .alias ("başlık")). show (5) dataframe.select (dataframe.author.substr (1, 6) .alias ("title")). show (5)Sırasıyla (1,3), (3,6), (1,6) alt dizelerinin sonuçlarını göster
DataFrame API'de veri işleme işlevleri de vardır. Ardından, sütun ekleme / değiştirme / silme örnekleri bulabilirsiniz.
6.1, sütun ekle
Exactvalues.dataframe = dataframe.withColumn ('new_column', F.lit ('Bu yeni bir sütun')) ekranı (dataframe) ile sütunlar oluştururken # Lit () gereklidirVeri kümesinin sonuna yeni bir sütun eklendi
6.2, sütunu değiştirin
DataFrame API'sinin yeni sürümü için withColumnRenamed () işlevi iki parametre ile kullanılır.
# Column'amazon_product_url 'öğesini'URL'dataframe = dataframe.withColumnRenamed (' amazon_product_url ',' URL ') dataframe.show (5) ile güncelleyin"Amazon_Product_URL" sütun adı "URL" olarak değiştirildi
6.3, sütunu sil
Sütun silme işlemi iki şekilde gerçekleştirilebilir: drop () işlevinde bir grup sütun adı ekleyin veya drop işlevinde belirli bir sütun belirtin. Aşağıda iki örnek gösterilmektedir.
dataframe_remove = dataframe.drop ("yayıncı", "yayınlanan_tarih"). show (5) dataframe_remove2 = dataframe \ .drop (dataframe.publisher) .drop (dataframe.published_date) .show (5)"Publisher" ve "publisher_date" sütunları iki farklı yoldan kaldırılır.
Veri incelemesi için birkaç tür işlev vardır. Ardından, yaygın olarak kullanılan bazı işlevleri bulabilirsiniz. Daha fazla bilgi edinmek için Apache Spark belgesini ziyaret etmeniz gerekir.
# Dataframe sütun adlarını ve veri türlerini döndürürdataframe.dtypes # Dataframedataframe.show () içeriğini görüntüler # İlk n rowsdataframe.head () döndürür # İlk satırdataframe.first () döndürür # İlk n satırdataframe.take (5) # Hesaplar özeti Statisticsdataframe.describe (). show () # Dataframedataframe.columns sütunlarını döndürür # Dataframedataframe.count () içindeki satırların sayısını sayar # Dataframedataframe.distinct () içindeki farklı satırların sayısını sayar. count () # Fiziksel dahil planları yazdırır ve mantıksaldataframe.explain (4)GroupBy () işlevi aracılığıyla, veri sütunu belirtilen işleve göre toplanır.
# Yazara göre grupla, groupsdataframe.groupBy ("yazar"). Count (). Show (10) içindeki yazarların kitaplarını sayYazarlar, yayınlanan kitap sayısına göre gruplandırılır
Filter () işlevini kullanarak, işlevde filtreleme uygulamak için koşullu parametreler ekleyin. Bu işlev büyük / küçük harfe duyarlıdır.
# Başlık girişlerini filtreleme # Yalnızca 'HOST' dataframe.filter (dataframe == 'THE HOST') değerine sahip kayıtları tutar. Show (5)Başlık çubuğu filtrelendikten sonra, yalnızca "THE HOST" içeriği bulunur ve 5 sonuç görüntülenir.
Her bir veri kümesi için, veri ön işleme aşamasında mevcut değerleri değiştirmek, gereksiz sütunları atmak ve eksik değerleri doldurmak genellikle gereklidir. Pyspark.sql.DataFrameNaFunction kitaplığı, bu bağlamda verileri işlememize yardımcı olur. Örnekler aşağıdaki gibidir.
# Boş değerleri değiştirmedataframe.na.fill () dataFrame.fillna () dataFrameNaFunctions.fill () # Boş değerli satırları sınırlayan yeni dataframe döndürmedataframe.na.drop () dataFrame.dropna () dataFrameNaFunctions.drop () # Yeni dataframe değiştirerek döndür otherdataframe.na.replace (5, 15) dataFrame.replace () dataFrameNaFunctions.replace () ile bir değerRDD'de (Esnek Dağıtım Veri Kümesi) mevcut bölümlerin düzeyini artırmak veya azaltmak mümkündür. Yeni RDD'nin aynı / daha yüksek sayıda bölüm elde etmesini sağlayan bölümleri artırmak için yeniden bölümlemeyi (self, numPartitions) kullanın. Bölüm azaltma, yeni RDD'nin daha az sayıda bölüme sahip olmasını sağlayan birleştirme (self, numPartitions, shuffle = False) işlevi ile gerçekleştirilebilir (bu belirli bir değerdir). Daha fazla bilgi için lütfen Apache Spark belgesini ziyaret edin.
# 10 partitiondataframe.repartition (10) .rdd.getNumPartitions () ile Dataframe # 1 partitiondataframe.coalesce (1) .rdd.getNumPartitions () içeren DataframeOrijinal SQL sorgusu, SparkSession'ımızdaki "sql" işlemi aracılığıyla da kullanılabilir.Bu SQL sorgusunun işlemi gömülüdür ve DataFrame formatında bir sonuç kümesi döndürür. Daha ayrıntılı bilgi için lütfen Apache Spark belgesini ziyaret edin.
# Bir tabledataframe.registerTempTable ("df") kaydetme sc.sql ("df'den * seçin"). Show (3) sc.sql ("açıklama GİBİ '% aşk OLDUĞUNDA'% love% 'THEN'Love_Theme' \ WHEN açıklama GİBİ '% nefret%' THEN'Hate_Theme '\ AÇIKLAMA GİBİ'% mutlu% 'THEN'Happiness_Theme' \ WHEN açıklama GİBİ '% anger%' THEN'Anger_Theme '\ WHEN açıklama GİBİ'% korku% 'THEN'Horror_Theme' \ AÇIKLAMA '% death%' THEN'Criminal_Theme 'GİBİ NE ZAMAN açıklama'% detective% 'THEN'Mystery_Theme' \ ELSE'Other_Themes '\ END Temalar \ df'den "). GroupBy (' Temalar ']. Count (). göstermek()13.1 Veri yapısı
DataFrame API, SQL sorgu ifadelerini düşük seviyeli RDD işlevlerine dönüştürmek için temel olarak RDD'yi kullanır. .Rdd işlemi kullanılarak, bir veri çerçevesi RDD'ye dönüştürülebilir ve Spark Dataframe ayrıca RDD ve Pandas biçiminde bir dizeye dönüştürülebilir.
# Dataframe'i RDDrdd_convert = dataframe.rdd'ye dönüştürme # Dataframe'i stringdataframe.toJSON () 'un bir RDD'sine dönüştürme. First () # PandasdataFramedataframe.toPandas () olarak df içeriğini elde etmeFarklı veri yapılarının sonuçları
13.2 Dosyaya yazın ve kaydedin
Veri çerçevesi gibi kodumuza yüklenebilen herhangi bir veri kaynağı türü kolayca dönüştürülebilir ve .parquet ve .json dahil diğer dosya türlerine kaydedilebilir. Kaydetme, yükleme ve yazma işlevleri hakkında daha fazla ayrıntı için lütfen Apache Spark belgesini ziyaret edin.
# Dosyayı .parquet formatında yazın ve kaydedindataframe.select ("yazar", "başlık", "sıralama", "açıklama") \ .write \ .save ("Rankings_Descriptions.parquet").Write.save () işlevi işlendiğinde, Parquet dosyasının oluşturulduğunu görebilirsiniz.
# Dosyayı .json formatında yazın ve kaydedindataframe.select ("yazar", "başlık") \ .write \ .save ("Authors_Titles.json", format = "json").Write.save () işlevi işlendiğinde, JSON dosyasının oluşturulduğunu görebilirsiniz.
13.3, SparkSession'ı durdurun
Spark oturumu, stop () işlevi aşağıdaki gibi çalıştırılarak durdurulabilir.
# End Spark Sessionsc.stop ()Kod ve Jupyter Not Defteri GitHub'ımda bulunabilir.
Soru ve yorumlarınızı bekliyoruz!
Referanslar:
1.
2. https://docs.anaconda.com/anaconda/
Orjinal başlık:
PySpark ve SparkSQL Temelleri
Spark'ı Python Programlama ile uygulama
Orijinal bağlantı:
https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53
Editör: Yu Tengkai
Redaksiyon: Hong Shuyue
Çevirmen Profili
Sun Taochun , Capital Normal Üniversitesi'nde uzaktan algılama bilimi ve teknolojisi alanında son sınıf öğrencisi. Şu anda, temel bilginin ustalık ve gelişimine odaklanıyor ve gelecekte veri bilimi uygulamalarının birçok olasılığını keşfetme fırsatına sahip olmayı umuyor. Hobilerimden biri de çeviri oluşturma ... Boş zamanlarımda sizinle iletişim kurmak, paylaşmak ve birlikte ilerlemek umuduyla THU Data Pie platformundaki çeviri gönüllüleri grubuna katılıyorum.
-Bitiş-
Tsinghua-Qingdao Veri Bilimi Enstitüsü'nün resmi WeChat kamu platformunu takip edin " THU Veri Pastası "Ve kız kardeş numarası" Veri Pastası THU "Daha fazla ders avantajı ve kaliteli içerik elde edin.