Installing kafla tgz file: https://archive.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz Copy using win scp Stop zookeeper: sudo service zookeeper-server stop Steps: 1. Terminal 1: Start Zookeeper cd ~/kafka_2.10-0.8.2.2 bin/zookeeper-server-start.sh config/zookeeper.properties 2. Terminal 2: Start Kafka broker cd ~/kafka_2.10-0.8.2.2 bin/kafka-server-start.sh config/server.properties 3. Terminal 3: Create a topic cd ~/kafka_2.10-0.8.2.2 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic events-topic Checking if topic is created or not: bin/kafka-topics.sh --list --zookeeper localhost:2181 4. Terminal 4: Start Kafka producer cd ~/kafka_2.10-0.8.2.2 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic events-topic # Type messages like: hi, hello, event1 5. Terminal 5: Start Kafka consumer (optional to verify messages) cd ~/kafka_2.10-0.8.2.2 bin/kafka-console-consumer.sh --topic events-topic --from-beginning --zookeeper localhost:2181 6. Terminal 6: Create and run Spark Streaming subscriber gedit spark-socket-consumer.py spark-submit --master local[2] spark-socket-consumer.py spark-socket-consumer.py from pyspark import SparkConf, SparkContext from pyspark.streaming import StreamingContext # Setup Spark conf = SparkConf().setAppName("SocketKafkaForwardConsumer").setMaster("local[2]") sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) # 2-second batch interval # Listen to socket (Kafka messages will be forwarded to this socket later) lines = ssc.socketTextStream("localhost", 9999) def process(rdd): count = rdd.count() if count > 0: print("received & count: {} records in this batch".format(count)) for i, record in enumerate(rdd.take(10)): print("[{}] {}".format(i, record)) else: print("no records in this batch") lines.foreachRDD(process) ssc.start() ssc.awaitTermination()