Agentic Bot for asset trace

PHOTO EMBED

Tue Aug 27 2024 04:49:58 GMT+0000 (Coordinated Universal Time)

Saved by @vineethnj

import os
import sys
import re
import json
import logging
from pathlib import Path
from dotenv import load_dotenv
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from django.db import connection
from langchain_community.agent_toolkits import create_sql_agent
from langchain_community.utilities import SQLDatabase
from langchain_groq import ChatGroq
from langchain.tools import BaseTool
from langchain.chains import LLMChain
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from .models import AssetData
from langchain.agents import load_tools, initialize_agent, AgentType
# Setup Django environment
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(project_root)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "asset_management_system.settings")
import django
django.setup()

# Setup logging
logger = logging.getLogger(__name__)

# Load environment variables and set up API keys
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')
groq_api_key = os.getenv("GROQ_API_KEY")
os.environ["GROQ_API_KEY"] = groq_api_key

# Initialize LLM
llm = ChatOpenAI(api_key=api_key, model="gpt-4o-mini", temperature=0)

# Constants
WEBAPP_INFO = """
The Asset Management System features a login system with three roles: Master Admin, Super Admin, and Admin, each with specific permissions. The system supports various asset types, with QR codes generated for each asset for easy tracking and management. Assets can be transferred between branches, with movement history tracked for audits.
"""

offense_words = ['asdf','asdfasdf']

# Create SQLDatabase instance
db_engine = connection.settings_dict['ENGINE']
db_name = connection.settings_dict['NAME']
if isinstance(db_name, (str, Path)):
    db_name = str(db_name)
else:
    raise ValueError(f"Unexpected type for db_name: {type(db_name)}")
db_url = f"sqlite:///{db_name}" if 'sqlite' in db_engine else f"{db_engine}://{db_name}"
db = SQLDatabase.from_uri(db_url)

# Create SQL agent
sql_agent = create_sql_agent(llm, db=db, agent_type="openai-tools", verbose=True)


class GreetingTool(BaseTool):
    name = "greeting_tool"
    description = "Use this to respond to greetings or simple queries that don't require complex reasoning."

    def _run(self, query: str) -> str:
        greetings = ["hello", "hi", "hey", "greetings", "good morning", "good afternoon", "good evening"]
        if any(greeting in query.lower() for greeting in greetings):
            return "Hello! I'm the Asset Trace AI assistant. How can I help you with asset management today?"
        return "I'm here to help with asset management. What specific information or assistance do you need?"

    async def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")

# Tool classes
class QueryValidationTool(BaseTool):
    name = "query_validation_tool"
    description = f"""Evaluate the given query to ensure it is appropriate, respectful, and suitable for general audiences. Check for offensive language, hate speech, explicit content, harassment, sensitive topics, and respectfulness."""

    def _run(self, query: str) -> str:
        return "VALID" if len(query) > 5 and all(word.lower() not in query.lower() for word in offense_words) else "INVALID"

    async def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")

class CybersecurityTool(BaseTool):
    name = "cybersecurity_tool"
    description = """Checks if the query contains any potentially malicious content, including injection attacks, XSS, malware links, obfuscation techniques, and known vulnerabilities."""

    def _run(self, query: str) -> str:
        return "SAFE" if all(word.lower() not in query.lower() for word in ["hack", "exploit", "vulnerability", "attack", "'; DROP TABLE", "<script>"]) else "UNSAFE"

    async def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")

class AssetAdditionTool(BaseTool):
    name = "asset_addition_tool"
    description = "Provides instructions on how to add an asset to the Asset Management System."

    def _run(self, query: str) -> str:
        return """
        To add an asset to the Asset Management System:
        1. Log in to the system.
        2. Navigate to 'Assets' section.
        3. Click 'Add New Asset'.
        4. Fill in asset details.
        5. Click 'Save'.
        6. A QR code will be generated for tracking.
        Contact support for further assistance.
        """

    async def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")

# Create instances of specialized tools
query_validation_tool = QueryValidationTool()
cybersecurity_tool = CybersecurityTool()
asset_addition_tool = AssetAdditionTool()
greeting_tool = GreetingTool()

# Define the tools
tools = [
    Tool(name="Greeting", func=greeting_tool.run, description="Use this to handle greetings and simple queries"),
    Tool(name="Query Validation", func=query_validation_tool.run, description="Use this to validate user queries"),
    Tool(name="Cybersecurity Check", func=cybersecurity_tool.run, description="Use this to perform security checks on user queries"),
    Tool(name="Asset Addition", func=asset_addition_tool.run, description="Use this to get instructions on adding an asset"),
    Tool(name="Database Query", func=sql_agent.run, description="Use this to query the database for asset information and should not give the table details")
]

# Define the agent
prefix = """You are an AI assistant for the Asset Trace application. Your role is to provide accurate and helpful information about the application, assist users with queries, and guide them in using the app effectively.

Application Details:
{WEBAPP_INFO}

Available Tools: [Greeting, Query Validation, Cybersecurity Check, Asset Addition, Database Query]

Key Instructions:
- For simple greetings or queries, use the Greeting tool directly without further reasoning.
- For more complex queries related to asset management, use the appropriate tools.
- Respond in the same language as the user's query.
- Provide concise answers for simple questions, offering to elaborate if needed.
- Maintain a professional and friendly tone.
- If you're unsure about any information, state that clearly and suggest where the user might find accurate details.
- For complex queries, break down your response into clear, manageable steps.
- Respect user privacy and data security at all times.

Please respond to user queries based on these guidelines and the information provided about the Asset Trace application.
"""

suffix = """Begin! Remember to focus on the current query and not be overly influenced by past interactions"""

prompt = ZeroShotAgent.create_prompt(
    tools,
    prefix=prefix,
    suffix=suffix,
    input_variables=["input", "agent_scratchpad", "WEBAPP_INFO"]
)

llm_chain = LLMChain(llm=llm, prompt=prompt)

tool_names = [tool.name for tool in tools]
agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)

# Create the agent with a maximum number of iterations
agent = initialize_agent(
    tools, 
    llm, 
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, 
    verbose=True,
    max_iterations=3,  # Limit the number of iterations
    early_stopping_method="generate",
    agent_kwargs={
        "prefix": prefix,
        "format_instructions": """Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question"""
    }
)

def process_query(query: str) -> str:
    try:
        response = agent.run(input=query, WEBAPP_INFO=WEBAPP_INFO)
        return response
    except Exception as e:
        logger.error(f"Error processing query: {str(e)}", exc_info=True)
        return "I apologize, but I encountered an issue while processing your request. Could you please rephrase your question or try again later?"

@csrf_exempt
@require_http_methods(["POST"])
def chatbot_response(request):
    logger.info("Chatbot response view called")
    try:
        data = json.loads(request.body)
        user_message = data.get('message', '').strip()
        conversation_history = data.get('conversation_history', [])
        logger.info(f"Received message: {user_message}")

        system_message = f"""
        You are an AI assistant for an asset management system. Here's the current context:
        1. User's latest message: "{user_message}"
        2. System overview: {WEBAPP_INFO}
        Respond naturally and conversationally. If the user asks about an asset or the system's capabilities, provide relevant information.
        If they ask about something we don't have info on, politely explain that and offer to help with something else.
        Always stay focused on asset management and the system's capabilities.
        """

        messages = [SystemMessage(content=system_message)]
        for msg in conversation_history[-5:]:
            messages.append(HumanMessage(content=msg['user_message']))
            messages.append(SystemMessage(content=msg['ai_response']))
        messages.append(HumanMessage(content=user_message))

        response = process_query(user_message)

        conversation_history.append({
            'user_message': user_message,
            'ai_response': response
        })

        return JsonResponse({
            'response': response,
            'conversation_history': conversation_history
        })
    except json.JSONDecodeError:
        logger.error("Invalid JSON received")
        return JsonResponse({'error': 'We encountered an issue processing your request. Please try again.'}, status=400)
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}", exc_info=True)
        return JsonResponse({'error': 'We apologize, but we experienced a temporary issue. Please try again later.'}, status=500)
content_copyCOPY