wget --no-check-certificate https://archive.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
tar -xzf kafka_2.10-0.8.2.2.tgz

cd kafka_2.10-0.8.2.2

bin/zookeeper-server-start.sh config/zookeeper.properties

second terminal :
bin/kafka-server-start.sh config/server.properties

third terminal : (if test-topic is present, then change name)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 -topic test-topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test-topic --from-beginning | nc -lk 9999


Fourth terminal : (if test-topic is present, then change name)

gedit spark_socket_consumer.py

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

conf = SparkConf().setAppName("SocketKafkaForwardConsumer").setMaster("local[2]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)  # 5-second batch interval

lines = ssc.socketTextStream("localhost", 9999)

def process(rdd):
    count = rdd.count()
    if count > 0:
        print("Received {} records in this batch",count)
        for i, record in enumerate(rdd.take(10), start=1):
            print(i, record)
    else:
        print("No records in this batch")
lines.foreachRDD(process)

ssc.start()
ssc.awaitTermination()



spark-submit --master local[2] spark_socket_consumer.py



Terminal 5 :

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic