Task 15 Updated
Sun Oct 26 2025 17:14:17 GMT+0000 (Coordinated Universal Time)
Saved by
@bdaplab2025
#python
wget --no-check-certificate https://archive.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
tar -xzf kafka_2.10-0.8.2.2.tgz
cd kafka_2.10-0.8.2.2
bin/zookeeper-server-start.sh config/zookeeper.properties
second terminal :
bin/kafka-server-start.sh config/server.properties
third terminal : (if test-topic is present, then change name)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 -topic test-topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test-topic --from-beginning | nc -lk 9999
Fourth terminal : (if test-topic is present, then change name)
gedit spark_socket_consumer.py
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
conf = SparkConf().setAppName("SocketKafkaForwardConsumer").setMaster("local[2]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5) # 5-second batch interval
lines = ssc.socketTextStream("localhost", 9999)
def process(rdd):
count = rdd.count()
if count > 0:
print("Received {} records in this batch",count)
for i, record in enumerate(rdd.take(10), start=1):
print(i, record)
else:
print("No records in this batch")
lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
spark-submit --master local[2] spark_socket_consumer.py
Terminal 5 :
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
content_copyCOPY
Comments