Agentic Bot for asset trace
Tue Aug 27 2024 04:49:58 GMT+0000 (Coordinated Universal Time)
Saved by @vineethnj
import os import sys import re import json import logging from pathlib import Path from dotenv import load_dotenv from django.http import JsonResponse from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import csrf_exempt from django.db import connection from langchain_community.agent_toolkits import create_sql_agent from langchain_community.utilities import SQLDatabase from langchain_groq import ChatGroq from langchain.tools import BaseTool from langchain.chains import LLMChain from langchain.agents import ZeroShotAgent, Tool, AgentExecutor from langchain.schema import HumanMessage, SystemMessage from langchain_openai import ChatOpenAI from .models import AssetData from langchain.agents import load_tools, initialize_agent, AgentType # Setup Django environment project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "asset_management_system.settings") import django django.setup() # Setup logging logger = logging.getLogger(__name__) # Load environment variables and set up API keys load_dotenv() api_key = os.getenv('OPENAI_API_KEY') groq_api_key = os.getenv("GROQ_API_KEY") os.environ["GROQ_API_KEY"] = groq_api_key # Initialize LLM llm = ChatOpenAI(api_key=api_key, model="gpt-4o-mini", temperature=0) # Constants WEBAPP_INFO = """ The Asset Management System features a login system with three roles: Master Admin, Super Admin, and Admin, each with specific permissions. The system supports various asset types, with QR codes generated for each asset for easy tracking and management. Assets can be transferred between branches, with movement history tracked for audits. """ offense_words = ['asdf','asdfasdf'] # Create SQLDatabase instance db_engine = connection.settings_dict['ENGINE'] db_name = connection.settings_dict['NAME'] if isinstance(db_name, (str, Path)): db_name = str(db_name) else: raise ValueError(f"Unexpected type for db_name: {type(db_name)}") db_url = f"sqlite:///{db_name}" if 'sqlite' in db_engine else f"{db_engine}://{db_name}" db = SQLDatabase.from_uri(db_url) # Create SQL agent sql_agent = create_sql_agent(llm, db=db, agent_type="openai-tools", verbose=True) class GreetingTool(BaseTool): name = "greeting_tool" description = "Use this to respond to greetings or simple queries that don't require complex reasoning." def _run(self, query: str) -> str: greetings = ["hello", "hi", "hey", "greetings", "good morning", "good afternoon", "good evening"] if any(greeting in query.lower() for greeting in greetings): return "Hello! I'm the Asset Trace AI assistant. How can I help you with asset management today?" return "I'm here to help with asset management. What specific information or assistance do you need?" async def _arun(self, query: str): raise NotImplementedError("This tool does not support async") # Tool classes class QueryValidationTool(BaseTool): name = "query_validation_tool" description = f"""Evaluate the given query to ensure it is appropriate, respectful, and suitable for general audiences. Check for offensive language, hate speech, explicit content, harassment, sensitive topics, and respectfulness.""" def _run(self, query: str) -> str: return "VALID" if len(query) > 5 and all(word.lower() not in query.lower() for word in offense_words) else "INVALID" async def _arun(self, query: str): raise NotImplementedError("This tool does not support async") class CybersecurityTool(BaseTool): name = "cybersecurity_tool" description = """Checks if the query contains any potentially malicious content, including injection attacks, XSS, malware links, obfuscation techniques, and known vulnerabilities.""" def _run(self, query: str) -> str: return "SAFE" if all(word.lower() not in query.lower() for word in ["hack", "exploit", "vulnerability", "attack", "'; DROP TABLE", "<script>"]) else "UNSAFE" async def _arun(self, query: str): raise NotImplementedError("This tool does not support async") class AssetAdditionTool(BaseTool): name = "asset_addition_tool" description = "Provides instructions on how to add an asset to the Asset Management System." def _run(self, query: str) -> str: return """ To add an asset to the Asset Management System: 1. Log in to the system. 2. Navigate to 'Assets' section. 3. Click 'Add New Asset'. 4. Fill in asset details. 5. Click 'Save'. 6. A QR code will be generated for tracking. Contact support for further assistance. """ async def _arun(self, query: str): raise NotImplementedError("This tool does not support async") # Create instances of specialized tools query_validation_tool = QueryValidationTool() cybersecurity_tool = CybersecurityTool() asset_addition_tool = AssetAdditionTool() greeting_tool = GreetingTool() # Define the tools tools = [ Tool(name="Greeting", func=greeting_tool.run, description="Use this to handle greetings and simple queries"), Tool(name="Query Validation", func=query_validation_tool.run, description="Use this to validate user queries"), Tool(name="Cybersecurity Check", func=cybersecurity_tool.run, description="Use this to perform security checks on user queries"), Tool(name="Asset Addition", func=asset_addition_tool.run, description="Use this to get instructions on adding an asset"), Tool(name="Database Query", func=sql_agent.run, description="Use this to query the database for asset information and should not give the table details") ] # Define the agent prefix = """You are an AI assistant for the Asset Trace application. Your role is to provide accurate and helpful information about the application, assist users with queries, and guide them in using the app effectively. Application Details: {WEBAPP_INFO} Available Tools: [Greeting, Query Validation, Cybersecurity Check, Asset Addition, Database Query] Key Instructions: - For simple greetings or queries, use the Greeting tool directly without further reasoning. - For more complex queries related to asset management, use the appropriate tools. - Respond in the same language as the user's query. - Provide concise answers for simple questions, offering to elaborate if needed. - Maintain a professional and friendly tone. - If you're unsure about any information, state that clearly and suggest where the user might find accurate details. - For complex queries, break down your response into clear, manageable steps. - Respect user privacy and data security at all times. Please respond to user queries based on these guidelines and the information provided about the Asset Trace application. """ suffix = """Begin! Remember to focus on the current query and not be overly influenced by past interactions""" prompt = ZeroShotAgent.create_prompt( tools, prefix=prefix, suffix=suffix, input_variables=["input", "agent_scratchpad", "WEBAPP_INFO"] ) llm_chain = LLMChain(llm=llm, prompt=prompt) tool_names = [tool.name for tool in tools] agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names) # Create the agent with a maximum number of iterations agent = initialize_agent( tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_iterations=3, # Limit the number of iterations early_stopping_method="generate", agent_kwargs={ "prefix": prefix, "format_instructions": """Use the following format: Question: the input question you must answer Thought: you should always think about what to do Action: the action to take, should be one of [{tool_names}] Action Input: the input to the action Observation: the result of the action ... (this Thought/Action/Action Input/Observation can repeat N times) Thought: I now know the final answer Final Answer: the final answer to the original input question""" } ) def process_query(query: str) -> str: try: response = agent.run(input=query, WEBAPP_INFO=WEBAPP_INFO) return response except Exception as e: logger.error(f"Error processing query: {str(e)}", exc_info=True) return "I apologize, but I encountered an issue while processing your request. Could you please rephrase your question or try again later?" @csrf_exempt @require_http_methods(["POST"]) def chatbot_response(request): logger.info("Chatbot response view called") try: data = json.loads(request.body) user_message = data.get('message', '').strip() conversation_history = data.get('conversation_history', []) logger.info(f"Received message: {user_message}") system_message = f""" You are an AI assistant for an asset management system. Here's the current context: 1. User's latest message: "{user_message}" 2. System overview: {WEBAPP_INFO} Respond naturally and conversationally. If the user asks about an asset or the system's capabilities, provide relevant information. If they ask about something we don't have info on, politely explain that and offer to help with something else. Always stay focused on asset management and the system's capabilities. """ messages = [SystemMessage(content=system_message)] for msg in conversation_history[-5:]: messages.append(HumanMessage(content=msg['user_message'])) messages.append(SystemMessage(content=msg['ai_response'])) messages.append(HumanMessage(content=user_message)) response = process_query(user_message) conversation_history.append({ 'user_message': user_message, 'ai_response': response }) return JsonResponse({ 'response': response, 'conversation_history': conversation_history }) except json.JSONDecodeError: logger.error("Invalid JSON received") return JsonResponse({'error': 'We encountered an issue processing your request. Please try again.'}, status=400) except Exception as e: logger.error(f"An error occurred: {str(e)}", exc_info=True) return JsonResponse({'error': 'We apologize, but we experienced a temporary issue. Please try again later.'}, status=500)
Comments