Please wait...

Spark Streaming from text files using pyspark API

by in Python

Apache Spark is an open source cluster computing framework. It is a powerful engine for process speed, easy to use, higher level libraries, SQL queries, streaming data, Machine learning, and Graph processing.

Spark code API

Today I’ll show a tutorial of spark streaming from text files that files will generate dynamically in a specific directory and spark has a functionality to read that files when they created.

You need to read following:

Installations:

  • Apache Spark (https://spark.apache.org/downloads.html)
  • pyspark in virtualenv using (pip install pysprak)

Spark Python (pyspark) has many inbuilt API(http://spark.apache.org/docs/latest/api/python/index.html) to perform many operations easily with spark

Now I’m going to start coding part for spark streaming in python using pyspark library

Firstly we'll write python code for creating dynamic data files in a folder with any content. I create a file.py in a directory and also have a lorem.txt file that has dummy text data.

According to below code I'm selecting randomly content from lorem.txt file and putting that content in log.txt files. Here I'm creating 30 logs file in log directory that will generate in each 5 second. like: log1.txt, log2.txt ... log30.txt .

file.py:

from random import randint
import time

"""
This is use for create 30 file one by one in each 5 seconds interval. 
These files will store content dynamically from 'lorem.txt' using below code
"""


def main():
    a = 1
    with open('lorem.txt', 'r') as file:  # reading content from 'lorem.txt' file
        lines = file.readlines()
        while a <= 30:
            totalline = len(lines)
            linenumber = randint(0, totalline - 10)
            with open('log/log{}.txt'.format(a), 'w') as writefile:
                writefile.write(' '.join(line for line in lines[linenumber:totalline]))
            print('creating file log{}.txt'.format(a))
            a += 1
            time.sleep(5)


if __name__ == '__main__':
    main()

Now I'm writing code for the spark that will read content from each file and will calculate word count of each file dummy data.

In below code, I'm using pyspark API for implement wordcount task for each file. Spark will read a directory in each 3 seconds and read file content that generated after execution of the streaming process of spark. It'll not read already existing files in the log directory. 

streaming.py:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

"""
This is use for create streaming of text from txt files that creating dynamically 
from files.py code. This spark streaming will execute in each 3 seconds and It'll
show number of words count from each files dynamically
"""


def main():
    sc = SparkContext(appName="PysparkStreaming")
    ssc = StreamingContext(sc, 3)   #Streaming will execute in each 3 seconds
    lines = ssc.textFileStream('log/')  #'log/ mean directory name
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda x: (x, 1)) \
        .reduceByKey(lambda a, b: a + b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()


if __name__ == "__main__":
    main()
 

Now firstly we will start execution of spark streaming before file generating process. So for this execution spark should be ready in the system.

We'll run below command in terminal for start streaming process.

spark-submit streaming.py 

This will start spark streaming process.

Spark Process

Now execute file.py from python that will create log files in log directory and spark streaming will read them.

 
python file.py

File generating

 

So above screenshot showing when python file.py creating new files in log directory that same time spark also showing the count of words right side in a screenshot.

 

Spark streaming with files

 

I'm sharing a video of this tutorial.  

Download Source code from GitHub