200 lines
6.2 KiB
Python
200 lines
6.2 KiB
Python
import os
|
|
import urllib.request
|
|
import json
|
|
import subprocess
|
|
from typing import List, Tuple
|
|
|
|
LOG_FILE = "telemetry.log"
|
|
OFFSET_FILE = "telemetry.offset"
|
|
DB_URL = "http://localhost:8000/sql"
|
|
NAMESPACE = "omega"
|
|
DATABASE = "fces"
|
|
|
|
|
|
def push_to_surrealdb(entries: List[Tuple[str, str, str]]) -> bool:
|
|
if not entries:
|
|
return True
|
|
|
|
# Construct SurrealQL queries
|
|
queries = []
|
|
for entry in entries:
|
|
level, event, detail = entry
|
|
# Escape quotes for safety in SurrealQL
|
|
level_esc = level.replace("'", "\\'")
|
|
event_esc = event.replace("'", "\\'")
|
|
detail_esc = detail.replace("'", "\\'")
|
|
|
|
query = f"CREATE telemetry CONTENT {{ level: '{level_esc}', event: '{event_esc}', detail: '{detail_esc}', timestamp: time::now() }};"
|
|
queries.append(query)
|
|
|
|
prefix = f"DEFINE NAMESPACE {NAMESPACE};\nUSE NS {NAMESPACE};\nDEFINE DATABASE {DATABASE};\nUSE DB {DATABASE};\n"
|
|
sql_script = prefix + "\n".join(queries)
|
|
|
|
import base64
|
|
|
|
auth_str = base64.b64encode(b"root:root").decode("utf-8")
|
|
headers = {"Accept": "application/json", "Authorization": f"Basic {auth_str}"}
|
|
|
|
req = urllib.request.Request(
|
|
DB_URL, data=sql_script.encode("utf-8"), headers=headers, method="POST"
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=5) as response:
|
|
res_data = json.loads(response.read().decode("utf-8"))
|
|
# Filter query results: ignore "AlreadyExists" status ERR
|
|
db_errors = []
|
|
if isinstance(res_data, list):
|
|
for r in res_data:
|
|
if r.get("status") == "ERR":
|
|
kind = r.get("kind")
|
|
if kind != "AlreadyExists":
|
|
db_errors.append(r)
|
|
else:
|
|
db_errors.append(res_data)
|
|
|
|
if not db_errors:
|
|
print(f"Successfully pushed {len(entries)} entries to SurrealDB.")
|
|
return True
|
|
else:
|
|
print(f"SurrealDB query warning/error: {db_errors}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Failed to connect/push to SurrealDB: {e}")
|
|
return False
|
|
|
|
|
|
def push_to_mariadb(entries: List[Tuple[str, str, str]]) -> bool:
|
|
if not entries:
|
|
return True
|
|
|
|
# Connection details
|
|
host = os.getenv("SQL_HOST", "zky.de")
|
|
port_str = os.getenv("SQL_PORT", "3306")
|
|
try:
|
|
port = int(port_str)
|
|
except ValueError:
|
|
port = 3306
|
|
user = os.getenv("SQL_USER", "c1_kaggle")
|
|
password = os.getenv("SQL_PASS", "!Dommke2026")
|
|
db = os.getenv("SQL_DB", "c1_kaggle")
|
|
|
|
try:
|
|
import mysql.connector
|
|
|
|
conn = mysql.connector.connect(
|
|
host=host,
|
|
port=port,
|
|
user=user,
|
|
password=password,
|
|
database=db,
|
|
connect_timeout=5,
|
|
)
|
|
cursor = conn.cursor()
|
|
|
|
# Create table if not exists
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS telemetry (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
level VARCHAR(50),
|
|
event VARCHAR(255),
|
|
detail TEXT,
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# Insert entries
|
|
sql = "INSERT INTO telemetry (level, event, detail) VALUES (%s, %s, %s)"
|
|
cursor.executemany(sql, entries)
|
|
conn.commit()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
print(f"Successfully pushed {len(entries)} entries to MariaDB.")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Failed to connect/push to MariaDB: {e}")
|
|
return False
|
|
|
|
|
|
def push_to_git() -> None:
|
|
try:
|
|
# Check if there are changes in telemetry.log
|
|
status = subprocess.run(
|
|
["git", "status", "--porcelain", LOG_FILE], capture_output=True, text=True
|
|
)
|
|
if status.stdout.strip():
|
|
print("Committing and pushing telemetry.log to git.zky.de...")
|
|
subprocess.run(["git", "add", LOG_FILE], check=True)
|
|
subprocess.run(
|
|
["git", "commit", "-m", "chore: update telemetry log [skip ci]"],
|
|
check=True,
|
|
)
|
|
subprocess.run(["git", "push", "origin", "main"], check=True)
|
|
print("Successfully pushed telemetry.log to git.zky.de.")
|
|
else:
|
|
print("No changes in telemetry.log to push to git.")
|
|
except Exception as e:
|
|
print(f"Git push failed: {e}")
|
|
|
|
|
|
def main() -> None:
|
|
if not os.path.exists(LOG_FILE):
|
|
print(f"Log file {LOG_FILE} does not exist.")
|
|
return
|
|
|
|
# Read offset
|
|
offset = 0
|
|
if os.path.exists(OFFSET_FILE):
|
|
try:
|
|
with open(OFFSET_FILE, "r") as f:
|
|
offset = int(f.read().strip())
|
|
except Exception:
|
|
offset = 0
|
|
|
|
# Read new lines from log file
|
|
new_entries: List[Tuple[str, str, str]] = []
|
|
file_size = os.path.getsize(LOG_FILE)
|
|
|
|
if file_size > offset:
|
|
with open(LOG_FILE, "r", encoding="utf-8") as f:
|
|
f.seek(offset)
|
|
new_lines = f.readlines()
|
|
|
|
for line in new_lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Parse line format: [LEVEL] event | detail
|
|
# or just [LEVEL] event
|
|
try:
|
|
if "] " in line:
|
|
level_part, rest = line.split("] ", 1)
|
|
level = level_part.replace("[", "").strip()
|
|
if " | " in rest:
|
|
event, detail = rest.split(" | ", 1)
|
|
else:
|
|
event = rest
|
|
detail = ""
|
|
new_entries.append((level, event, detail))
|
|
except Exception as pe:
|
|
print(f"Failed to parse line: {line} ({pe})")
|
|
|
|
# Push to SurrealDB
|
|
db_success = push_to_surrealdb(new_entries)
|
|
# Push to MariaDB
|
|
maria_success = push_to_mariadb(new_entries)
|
|
|
|
if db_success or maria_success:
|
|
# Update offset if either DB push succeeded
|
|
with open(OFFSET_FILE, "w") as f:
|
|
f.write(str(file_size))
|
|
|
|
# Always try to push to Git if there are any updates
|
|
push_to_git()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|