chat client test
Mon Mar 14 2022 07:56:56 GMT+0000 (Coordinated Universal Time)
Saved by @maorgur #python #json
import socket
from threading import Thread
from tkinter import *
from tkinter import messagebox
from tkinter import scrolledtext
import json
import time
from sys import exit
enter_connection_number = Tk()
enter_connection_number.geometry('100x100')
#create a entry to enter port number
connection_number_entry = Entry(enter_connection_number, width=10)
connection_number_entry.insert(0, '')
connection_number_entry.pack()
connection_number_label = Label(enter_connection_number, text='Enter connection IP:')
connection_number_label.pack()
#create a 'set' button for the entry
def set_connection_number():
global connection_number, connection_number_entry, connection_number_label, connection_number_button
#set connection_number
answer = connection_number_entry.get()
if answer.count('.') == 3 and not ' ' in answer:
connection_number = connection_number_entry.get()
#destroy connection_number_entry and connection_number_button
connection_number_entry.destroy()
connection_number_button.destroy()
connection_number_label.destroy()
#close window
enter_connection_number.destroy()
else:
messagebox.showerror('Invalid connection IP', 'Please enter a connection IP in the format: xxx.xxx.xxx.xxx (no spaces)')
connection_number_entry.delete(0, END)
connection_number_button = Button(text='Set connection number', command=set_connection_number)
connection_number_button.pack()
enter_connection_number.mainloop()
def send_message_and_get_message(message, start_timer = False):
'''Sends a message to the server until response is valid and returns the response'''
if start_timer:
global timer, my_socket
timer = time.perf_counter()
my_socket.send(message.encode())
response = my_socket.recv(1024).decode()
while response.startswith('0'):
if start_timer:
timer = time.perf_counter()
my_socket.send(message.encode())
response = my_socket.recv(1024).decode()
return response
def send_message(message, mode = 1, func=None):
'''sends message/command to server and running the command if needed'''
global closed, username, timer
try:
message = str(mode) + message
data = send_message_and_get_message(message, func == 'timer')
except Exception as e:
print(f'ERROR1: {e}\nclosing connection')
exit()
else:
try:
if func == 'timer':
timer = time.perf_counter() - timer
messagebox.showinfo('Ping message', f'Ping Message sent in {int(timer * 10000)/10000} seconds')
elif func == 'server data':
messagebox.showinfo('Server data', data)
elif func == 'connected clients':
messagebox.showinfo('Connected clients', f' there are {len(data.split(" "))} user(s) online: {data}')
elif func == 'set username':
if data.endswith('1'):
messagebox.showinfo('Username changed', data[:-1])
username.destroy()
username = Label(root, text=f'username: {message.split(" ")[2]}')
username.pack()
else:
messagebox.showerror('Username change failed', data[:-1])
name_entry.config(state=NORMAL)
name_button.config(text='Change name', command=change_name)
elif data == 'quit':
print('quiting')
closed = True
exit()
else:
print('\n' + data)
except Exception as e:
print(f'ERROR2: {e}')
closed = True
closed = False
root = Tk()
#create a tkinter window with label frame
root.title('Chat')
root.geometry('400x400')
frame = Frame(root)
frame.pack()
#create a label and text entry widget
label = Label(frame, text='Enter your message:')
label.pack()
entry = Entry(frame)
entry.pack()
#create a button to send message
button = Button(frame, text='Send', command=lambda: send_message(entry.get()))
button.pack()
#create scrolled text widget to display messages
messages_box = scrolledtext.ScrolledText(frame, height=10, width=100)
#lock messages box
messages_box.config(state=DISABLED)
messages_box.pack()
#create a frame for commands buttons
commands_frame = Frame(root)
commands_frame.pack()
#create a button to quit
quit_button = Button(commands_frame, text='Quit', command=lambda: send_message('QUIT', 2))
quit_button.pack(side=RIGHT)
#create a button to clear messages
def clear_messages():
messages_box.config(state=NORMAL)
messages_box.delete(1.0, END)
messages_box.config(state=DISABLED)
clear_button = Button(commands_frame, text='Clear', command=clear_messages)
clear_button.pack(side=LEFT)
#create a button to send PING commands to server
ping_button = Button(commands_frame, text='Ping', command=lambda: send_message('PING', 2, 'timer'))
ping_button.pack(side=LEFT)
#create a DATA command button
data_button = Button(commands_frame, text='server data', command=lambda: send_message('SERVER', 2, 'server data'))
data_button.pack(side=LEFT)
#create connected clients command button
clients_button = Button(commands_frame, text='Connected clients', command=lambda: send_message('CONNECTED CLIENTS', 2, 'connected clients'))
clients_button.pack(side=LEFT)
#create an entry to change name
name_frame = Frame(root)
name_frame.pack()
name_label = Label(name_frame, text='Enter new username:')
name_label.pack()
name_entry = Entry(name_frame)
name_entry.pack()
#create a button to change name
def change_name():
global name
name = name_entry.get()
send_message(f'SET USERNAME {name}', 2, 'set username')
print(f'SET USERNAME {name}', 2, 'set username')
name_entry.delete(0, END)
name_entry.insert(0, name)
name_button.config(text='Change name', command=change_name)
name_button = Button(name_frame, text=f'Change name', command=change_name)
name_button.pack()
#create a socket
my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
my_socket.settimeout(5)
#label that shows the socket address
try:
my_socket.connect((connection_number, 8820))
username = Label(root, text=f'username: {my_socket.getsockname()[1]}')
username.pack()
except:
#show messagebox: server is down
root.withdraw()
messagebox.showerror(f'Server {connection_number} is down', 'Server is down, please try again later')
root.destroy()
quit()
def check_new_messages():
global last_message, closed
try:
while True:
root.update()
time.sleep(1)
try:
my_socket.send(f'0{last_message}'.encode())
except:
if not closed:
messagebox.showerror('Server is down', 'Server is down, please try again later')
root.destroy()
quit()
data = str(f'[{my_socket.recv(1024).decode()}]')
#check if not first letter of data is upper case
if data[1] == '0':
data = data[2:-1]
print(data)
try:
data = json.loads(data)
except Exception as e:
print(f'ERROR3: {e}\n{data}')
else:
if data[len(data)-1][0] > last_message:
for i in data:
if i[0] > last_message:
messages_box.config(state=NORMAL)
messages_box.insert(END, f'{i[1]} wrote: {i[2]}\n')
messages_box.config(state=DISABLED)
last_message = i[0]
else:
continue
else:
print(f'ignored message {data}')
except Exception as e:
pass
last_message = -1
Thread(target=check_new_messages).start()
root.mainloop()
send_message('QUIT', 2)
closed = True
my_socket.close()



Comments