Automated VPS Security & Availability Monitoring

Project Description

I was really excited to jump into this project because it made a previous idea of mine much more streamlined and efficient. A while back, while hosting a VPS, I realized I was spending a lot of time manually maintaining the server and monitoring log files. That gave me the idea to write a Python program that used OpenAI’s ChatGPT to analyze my logs and provide a dashboard where I could easily view the information. I put too much effort into that initial project not to showcase some of my work listed below. I stepped away from it for a bit to focus on other priorities, but recently I started learning more about agentic AI and realized this would be a perfect opportunity to revisit the project. I built an n8n workflow that now automatically updates and upgrades my VPS. Since the VPS isn’t running anything critical at the moment, these automated updates carry very little risk of breaking anything. The workflow also collects key log files such as Auth logs, Fail2Ban logs, and Apache access logs, and it even curls my website to check for availability. All of this information is then sent to Gemini with a tailored prompt, and every morning at 7:00 AM I receive a summarized update by email with the latest VPS health status and any important log insights I should know about.

N8N Workflow

N8N VPS Health Monitor Workflow

Previous Project with Python

Log Parser

import re

def parse_apache(line):
    pattern = r'(\d+\.\d+\.\d+\.\d+) - - \[(.*?)\] "(.*?)" (\d+) (\d+)'
    match = re.search(pattern, line)
    if match:
        ip, timestamp, request, code, size = match.groups()
        return {'type': 'Apache', 'ip': ip, 'request': request, 'code': code, 'size': size, 'timestamp': timestamp}
    return None

def parse_ufw(line):
    if 'BLOCK' in line:
        return {'type': 'firewall', 'message': line.strip()}
    return None

def parse_fail2ban(line):
    if 'Ban' in line or 'Unban' in line:
        return {'Type': "fail2ban", 'message': line.strip()}
    return None

def parse_auth(line):
    if 'Failed Password' in line:
        pattern = r'Failed password for (?:invalid user )?(\S+) from (\d+\.\d+\.\d+\.\d+)'
        match = re.search(pattern, line)
        if match:
            user, ip = match.groups()
            return {'type': 'Auth', 'user': user, 'ip': ip, 'message': line.strip()}
    return None

AI Summary

import openai
import os
from dotenv import load_dotenv

# Load variables from .env file (optional, but recommended)
load_dotenv()

openai.api_key = os.getenv("OPENAI_API_KEY")

def summarize_logs(events):
    formatted_events = "\n".join(
        f"[{e['type']}] {e.get('user','')} {e.get('ip','')} {e.get('message', e.get('request',''))}"
        for e in events
    )

    prompt = f"""
You are a cybersecurity assistant. Analyze these security logs:
{formatted_events}

Instructions:
- Group by event type
- Highlight suspicious activity
- Summarize in 1–2 sentences per issue
- Output as bullet points, plain text
- do not include raw logs only your analysis
"""

    response = openai.ChatCompletion.create(
        model="gpt-4.1-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.5
    )
    return response.choices[0].message.content

Flask Dashboard

from flask import Flask, jsonify
import threading, os, json
from tail_logs import tail
from log_parser import parse_auth, parse_ufw, parse_fail2ban, parse_apache
from summary import summarize_logs
from collections import defaultdict

app = Flask(__name__)
log_events = []
stats = {
    "failed_logins": 0,
    "ufw_blocks": 0,
    "apache_requests": defaultdict(int)
}

# Paths on VPS
log_files = [
    (Path to log files)
]

def monitor_logs():
    global log_events, stats
    while True:
        for file_path, parser in log_files:
            for line in tail(file_path):
                event = parser(line)
                if event:
                    log_events.append(event)
                    # Update metrics
                    if event["type"].lower() == "auth":
                        stats["failed_logins"] += 1
                    elif event["type"].lower() == "firewall":
                        stats["ufw_blocks"] += 1
                    elif event["type"].lower() == "apache":
                        endpoint = event["request"].split(" ")[1]
                        stats["apache_requests"][endpoint] += 1

                if len(log_events) >= 5:
                    summary = summarize_logs(log_events)
                    with open("/home/username/log_ai_analyzer/latest_summary.json", "w") as f:
                        json.dump({"summary": summary, "stats": stats}, f)
                    log_events.clear()

# Start background monitoring
threading.Thread(target=monitor_logs, daemon=True).start()

@app.route("/api/summary")
def get_summary():
    if os.path.exists("/home/username/log_ai_analyzer/latest_summary.json"):
        with open("/home/username/log_ai_analyzer/latest_summary.json") as f:
            return f.read()
    return jsonify({"summary": "No logs yet.", "stats": {}})

if __name__ == "__main__":
    app.run(host="127.0.0.1", port=5000)