I was really excited to jump into this project because it made a previous idea of mine much more streamlined and efficient. A while back, while hosting a VPS, I realized I was spending a lot of time manually maintaining the server and monitoring log files. That gave me the idea to write a Python program that used OpenAI’s ChatGPT to analyze my logs and provide a dashboard where I could easily view the information. I put too much effort into that initial project not to showcase some of my work listed below. I stepped away from it for a bit to focus on other priorities, but recently I started learning more about agentic AI and realized this would be a perfect opportunity to revisit the project. I built an n8n workflow that now automatically updates and upgrades my VPS. Since the VPS isn’t running anything critical at the moment, these automated updates carry very little risk of breaking anything. The workflow also collects key log files such as Auth logs, Fail2Ban logs, and Apache access logs, and it even curls my website to check for availability. All of this information is then sent to Gemini with a tailored prompt, and every morning at 7:00 AM I receive a summarized update by email with the latest VPS health status and any important log insights I should know about.
import re
def parse_apache(line):
pattern = r'(\d+\.\d+\.\d+\.\d+) - - \[(.*?)\] "(.*?)" (\d+) (\d+)'
match = re.search(pattern, line)
if match:
ip, timestamp, request, code, size = match.groups()
return {'type': 'Apache', 'ip': ip, 'request': request, 'code': code, 'size': size, 'timestamp': timestamp}
return None
def parse_ufw(line):
if 'BLOCK' in line:
return {'type': 'firewall', 'message': line.strip()}
return None
def parse_fail2ban(line):
if 'Ban' in line or 'Unban' in line:
return {'Type': "fail2ban", 'message': line.strip()}
return None
def parse_auth(line):
if 'Failed Password' in line:
pattern = r'Failed password for (?:invalid user )?(\S+) from (\d+\.\d+\.\d+\.\d+)'
match = re.search(pattern, line)
if match:
user, ip = match.groups()
return {'type': 'Auth', 'user': user, 'ip': ip, 'message': line.strip()}
return None
import openai
import os
from dotenv import load_dotenv
# Load variables from .env file (optional, but recommended)
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
def summarize_logs(events):
formatted_events = "\n".join(
f"[{e['type']}] {e.get('user','')} {e.get('ip','')} {e.get('message', e.get('request',''))}"
for e in events
)
prompt = f"""
You are a cybersecurity assistant. Analyze these security logs:
{formatted_events}
Instructions:
- Group by event type
- Highlight suspicious activity
- Summarize in 1–2 sentences per issue
- Output as bullet points, plain text
- do not include raw logs only your analysis
"""
response = openai.ChatCompletion.create(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.5
)
return response.choices[0].message.content
from flask import Flask, jsonify
import threading, os, json
from tail_logs import tail
from log_parser import parse_auth, parse_ufw, parse_fail2ban, parse_apache
from summary import summarize_logs
from collections import defaultdict
app = Flask(__name__)
log_events = []
stats = {
"failed_logins": 0,
"ufw_blocks": 0,
"apache_requests": defaultdict(int)
}
# Paths on VPS
log_files = [
(Path to log files)
]
def monitor_logs():
global log_events, stats
while True:
for file_path, parser in log_files:
for line in tail(file_path):
event = parser(line)
if event:
log_events.append(event)
# Update metrics
if event["type"].lower() == "auth":
stats["failed_logins"] += 1
elif event["type"].lower() == "firewall":
stats["ufw_blocks"] += 1
elif event["type"].lower() == "apache":
endpoint = event["request"].split(" ")[1]
stats["apache_requests"][endpoint] += 1
if len(log_events) >= 5:
summary = summarize_logs(log_events)
with open("/home/username/log_ai_analyzer/latest_summary.json", "w") as f:
json.dump({"summary": summary, "stats": stats}, f)
log_events.clear()
# Start background monitoring
threading.Thread(target=monitor_logs, daemon=True).start()
@app.route("/api/summary")
def get_summary():
if os.path.exists("/home/username/log_ai_analyzer/latest_summary.json"):
with open("/home/username/log_ai_analyzer/latest_summary.json") as f:
return f.read()
return jsonify({"summary": "No logs yet.", "stats": {}})
if __name__ == "__main__":
app.run(host="127.0.0.1", port=5000)