Please wait...
Apache Spark is an open source cluster computing framework. It is a powerful engine for process speed, easy to use, higher level libraries, SQL queries, streaming data, Machine learning, and Graph processing.
Today I’ll show a tutorial of spark streaming from text files that files will generate dynamically in a specific directory and spark has a functionality to read that files when they created.
You need to read following:
Installations:
Spark Python (pyspark) has many inbuilt API(http://spark.apache.org/docs/latest/api/python/index.html) to perform many operations easily with spark
Now I’m going to start coding part for spark streaming in python using pyspark library
Firstly we'll write python code for creating dynamic data files in a folder with any content. I create a file.py in a directory and also have a lorem.txt file that has dummy text data.
According to below code I'm selecting randomly content from lorem.txt file and putting that content in log.txt files. Here I'm creating 30 logs file in log directory that will generate in each 5 second. like: log1.txt, log2.txt ... log30.txt .
file.py:
from random import randint
import time
"""
This is use for create 30 file one by one in each 5 seconds interval.
These files will store content dynamically from 'lorem.txt' using below code
"""
def main():
a = 1
with open('lorem.txt', 'r') as file: # reading content from 'lorem.txt' file
lines = file.readlines()
while a <= 30:
totalline = len(lines)
linenumber = randint(0, totalline - 10)
with open('log/log{}.txt'.format(a), 'w') as writefile:
writefile.write(' '.join(line for line in lines[linenumber:totalline]))
print('creating file log{}.txt'.format(a))
a += 1
time.sleep(5)
if __name__ == '__main__':
main()
Now I'm writing code for the spark that will read content from each file and will calculate word count of each file dummy data.
In below code, I'm using pyspark API for implement wordcount task for each file. Spark will read a directory in each 3 seconds and read file content that generated after execution of the streaming process of spark. It'll not read already existing files in the log directory.
streaming.py:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
"""
This is use for create streaming of text from txt files that creating dynamically
from files.py code. This spark streaming will execute in each 3 seconds and It'll
show number of words count from each files dynamically
"""
def main():
sc = SparkContext(appName="PysparkStreaming")
ssc = StreamingContext(sc, 3) #Streaming will execute in each 3 seconds
lines = ssc.textFileStream('log/') #'log/ mean directory name
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Now firstly we will start execution of spark streaming before file generating process. So for this execution spark should be ready in the system.
We'll run below command in terminal for start streaming process.
spark-submit streaming.py
This will start spark streaming process.
Now execute file.py from python that will create log files in log directory and spark streaming will read them.
python file.py
So above screenshot showing when python file.py creating new files in log directory that same time spark also showing the count of words right side in a screenshot.
I'm sharing a video of this tutorial.
Previous Post arrow_back
HTML Parsing using BeautifulSoup4 library of PythonNext Post arrow_forward
How to make charts in django admin interface?