Apache Spark Streaming : Örnek Uygulama

Bu yazımızda Spark Streaming ile TCP socketten datayı okuyup, okuduğumuz bu datayı map reduce fonksiyonları ile işleyeceğiz.

Programlama dili olarak Scala ve IDE olarak intellij kullanacağız.

Öncelikle Intellij üzerinden SBT-based Scala örneği oluşturmamız gerekiyor. Bunu için New Project diyip Scala kategorisinde bulunan SBT‘yi seçiyoruz.
scala-sbt-version

 

 

 

 

Next diyip projenin ismini ve sbt, scala versiyonlarını seçiyoruz. SBT versiyonunu 0.13.8, Scala versiyonunu 2.11.6 olarak seçiyoruz.

sbt-scala-version

SBT, Scala uygulamaları için bir build aracıdır. Java dünyasında Maven’a denk gelir.

 

 

Spark Streaming için gerekli olan jar bağımlılıkarını build.sbt dosyamıza ekliyoruz. Spark Streaming 1.3.1 versiyonu kullanacağız.

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.10" % "1.3.1",
"org.apache.spark" % "spark-streaming_2.10" % "1.3.1"
)

Bütün bunları yaptıktan sonra artık scala ile spark streaming işlemleri yapabiliriz.

Öncelikle StreamingContex‘i tanımlamamız gerekiyor. Aşağıdaki kod ile batch aralığı 1 saniye olan ve 2 thread ile çalışan bir StreamingContext oluşturuyoruz. 2 thread ile oluşturmamızın sebebi Receiver. 1 thread Receiver için 1 thread ise datayı işlemek için.

Not  : Eğer Kafka, TCP socket, Flume gibi receiver tabanlı data kaynakları ile streaming işlemleri yapıyorsanız her zaman için thread sayisi > receiver sayisi olmalıdır. Aksi takdirde Spark datayı okuyacak fakat işlemeyecektir.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

StreamingContext hazır. Artık DStream objesi oluşturup datayı data kaynağından okuyabiliriz.
localhost:9999 adresinden datayı okuyan DStream objemizi oluşturuyoruz.

val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)

lines DStream’inde bulunan her bir kayıt data kaynağından gelen bir text’i içerir.

Şimdi her bir text’i boşluk karakteri üzerinden kelimelere bölmek istiyoruz. Bunun için flatMap operasyonunu lines DStream‘i üzerinde gerçekliyoruz. flatMap operasyonun sonucunda yeni bir DStream objesi oluşur. flatMap işlemi one-to-many DStream operasyonudur.

streaming-dstream-ops

val words = lines.flatMap(_.split(" "))

 

words DStream’i içerisinde bulunan kelimeleri saymak istiyoruz. Her kelimenin tekrar sayısını elde etmek istiyoruz. Bunun için öncelikle word DStream’i içerisindeki her bir kayıt için K, V şeklinde bir pair çifti oluşturmak için one-to-one bir operasyon olan map fonksiyonunu kullanıyoruz. Daha sonra reduceByKey fonksiyonu ile her bir kelimenin tekrar sayısını buluyoruz.


val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

Aşağıdaki print işlemi ile wordCount DStream‘i içerisinde bulunan her bir RDD‘nin 10 elemanını consol’a basıyoruz.

Şimdiye kadar data üzerinde ne tür operasyonlar yapılacağını tanımladık. Spark Streaming’i aşağıdaki kod ile çalışmaya hazır hale getiririz.

ssc.start() // İşlemlere başla

Spark Streaming ile nerden datayı okuyacağımızı ve data üzerinde ne tür operasyonları yapacağımızı belirledik. Uygulamayı run etmek için sağ tıklayıp run diyoruz. Spark Streaming artık datayı alıp işlemeye hazır. Data göndermek için localimizde NetCat server ayağa kaldırıyoruz. Bunun için terminalde aşağıdaki komutu çalıştırıyoruz.

nc -lk 9999

Komutu çalıştırdıktan sonra “merhaba sevgili dünya” yazıp entera basıyoruz. Bunun sonucunda Spark Streaming önce bu text’i okuyup, boşluk karakterine göre split edip, her bir kelimenin tekrar sayısını consola basacaktır. Intellij consolunda aşağıdaki gibi bir çıktı generate edilecektir.

——————————————-
Time: 1432489453000 ms
——————————————-
(dünya,1)
(merhaba,1)
(sevgili,1)

——————————————-
Time: 1432489454000 ms

Not : Unutmayalım ki tüm map reduce fonksiyonları DStream’de bulunan RDD’ler üzerine gerçeklenir.

Diyelim ki biz her seferinde her kelimenin toplam tekrar sayısını tutmak istiyoruz. Bu durumda Spark Streaming içerisinde kelimeye dair bir state tutmamız gerekiyor. Ve her gelen data üzerinden bu state‘in güncellenmesini istiyoruz. Bunu UpdateStateByKey operasyonu ile gerçekleyebiliriz. UpdateStateByKey için öncelikle state’yi tanımlayıp daha sonra bu state içine update fonksiyonu tanımlamamız gerekiyor.

Update fonksiyonumuz :

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount)
}

Update fonksiyonumuz DStream üzerinde gerçekliyoruz.

wordCounts.updateStateByKey[Int](updateFunction _)

update fonksiyonu her bir kelime için çağrılacak ve kelimenin önceki sayısı yeni sayı ile toplanıp, kelimenin sayısı güncellenecektir.

Data üzerinde bir tane window operasyonu gerçekleyip örneğimizi sonlandıracağız. Spark Streaming windowed operasyonları desteler. Windowed operasyon bir zaman aralığı belirleyip RDD’leri birleştirip, bu birleşik RDD’ler üzerinde işlem yapmamızı sağlar. Bunu Türkçe dile getirmek biraz zor 🙂  Aşağıdaki şekil bir window operasyonunu gösteririr.
streaming-dstream-window

Window operasyonu için 2 parametreyi tanımlamamız gerekiyor.

1-) window lenght :  Her bir window’un süresi (Yukarıdaki şekilde bu 3tür)
2-) sliding interval :  Gerçeklenen window operasyonun aralığı (Yukarıdaki şekilde bu 2dir)

Diyelim ki bizim örneğimizde  her 2 saniyede bir gelen datanın son 10 saniyesini işlemek istiyoruz.

val windowedCount = wordCounts.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(10), Seconds(2))

Yukarıdaki kod ile her 2 saniyede bir DStream üzerindeki data’nın son 10 saniyesini okuyup işleriz.

Aşağıdaki adresten kaynak kodu indirebilirsiniz.

https://github.com/mstzn36/SparkStreamingTCPExample

EOF

 

 

 

Kaynakça :
https://spark.apache.org/docs/latest/streaming-programming-guide.html

Apache Spark Streaming : Örnek Uygulama” üzerine bir düşünce

Bir Cevap Yazın

Aşağıya bilgilerinizi girin veya oturum açmak için bir simgeye tıklayın:

WordPress.com Logosu

WordPress.com hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Twitter resmi

Twitter hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Facebook fotoğrafı

Facebook hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Connecting to %s