1070 lines
53 KiB
Python
Executable File
1070 lines
53 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Albumik - lekka prywatna galeria zdjęć
|
|
# Backend bez zewnętrznych zależności: Python stdlib + SQLite.
|
|
|
|
import base64
|
|
import datetime as dt
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import posixpath
|
|
import re
|
|
import secrets
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import urllib.parse
|
|
import uuid
|
|
from http import HTTPStatus
|
|
from http.cookies import SimpleCookie
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
|
|
APP_VERSION = "0.1.0"
|
|
PERM_VALUE = {"none": 0, "view": 1, "upload": 2, "manage": 3, "admin": 4}
|
|
VALUE_PERM = {v: k for k, v in PERM_VALUE.items()}
|
|
IMAGE_EXT = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".heic", ".heif", ".bmp"}
|
|
VIDEO_EXT = {".mp4", ".mov", ".m4v", ".webm", ".avi", ".mkv"}
|
|
|
|
|
|
def now_iso():
|
|
return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
|
|
|
|
|
def make_id(prefix=""):
|
|
return prefix + uuid.uuid4().hex
|
|
|
|
|
|
def hash_password(password: str, salt: bytes | None = None) -> str:
|
|
if salt is None:
|
|
salt = secrets.token_bytes(16)
|
|
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 120_000)
|
|
return "pbkdf2_sha256$120000$" + base64.b64encode(salt).decode() + "$" + base64.b64encode(dk).decode()
|
|
|
|
|
|
def verify_password(password: str, encoded: str) -> bool:
|
|
try:
|
|
algo, rounds, salt_b64, hash_b64 = encoded.split("$", 3)
|
|
if algo != "pbkdf2_sha256":
|
|
return False
|
|
salt = base64.b64decode(salt_b64)
|
|
expected = base64.b64decode(hash_b64)
|
|
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, int(rounds))
|
|
return hmac.compare_digest(dk, expected)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def token_hash(token: str) -> str:
|
|
return hashlib.sha256(token.encode("utf-8")).hexdigest()
|
|
|
|
|
|
class AlbumikApp:
|
|
def __init__(self, config_path: str):
|
|
self.config_path = config_path
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
self.config = json.load(f)
|
|
self.data_dir = Path(self.config.get("data_dir", "./data")).resolve()
|
|
self.db_path = Path(self.config.get("database", str(self.data_dir / "albumik.db"))).resolve()
|
|
self.media_dir = Path(self.config.get("media_dir", str(self.data_dir / "media"))).resolve()
|
|
self.web_dir = Path(self.config.get("web_dir", "./web")).resolve()
|
|
self.listen = self.config.get("listen", "127.0.0.1:8097")
|
|
self.base_path = self.config.get("base_path", "/").rstrip("/") or ""
|
|
self.app_name = self.config.get("app_name", "Albumik")
|
|
self.admin_user = self.config.get("admin_user", "admin")
|
|
self.admin_password = self.config.get("admin_password", "admin")
|
|
self.session_days = int(self.config.get("session_days", 14))
|
|
self.max_upload_mb = int(self.config.get("max_upload_mb", 512))
|
|
self.allow_public_register = False
|
|
|
|
for p in [self.data_dir, self.db_path.parent, self.media_dir / "originals", self.media_dir / "thumbs", self.media_dir / "previews", self.media_dir / "pending", self.data_dir / "logs"]:
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
self.init_db()
|
|
|
|
def db(self):
|
|
conn = sqlite3.connect(str(self.db_path), timeout=30)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=5000")
|
|
return conn
|
|
|
|
def init_db(self):
|
|
with self.db() as con:
|
|
con.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id TEXT PRIMARY KEY,
|
|
username TEXT UNIQUE NOT NULL,
|
|
display_name TEXT NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
role TEXT NOT NULL CHECK(role IN ('admin','user','guest')),
|
|
can_create_folders INTEGER DEFAULT 0,
|
|
can_upload INTEGER DEFAULT 0,
|
|
can_tag INTEGER DEFAULT 0,
|
|
active INTEGER DEFAULT 1,
|
|
expires_at TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id TEXT PRIMARY KEY,
|
|
token_hash TEXT UNIQUE NOT NULL,
|
|
user_id TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
expires_at TEXT NOT NULL,
|
|
last_seen_at TEXT,
|
|
ip TEXT,
|
|
user_agent TEXT,
|
|
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS folders (
|
|
id TEXT PRIMARY KEY,
|
|
parent_id TEXT,
|
|
name TEXT NOT NULL,
|
|
path TEXT NOT NULL,
|
|
owner_id TEXT NOT NULL,
|
|
created_by TEXT NOT NULL,
|
|
inherit_permissions INTEGER DEFAULT 1,
|
|
is_deleted INTEGER DEFAULT 0,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT,
|
|
FOREIGN KEY(parent_id) REFERENCES folders(id) ON DELETE SET NULL,
|
|
FOREIGN KEY(owner_id) REFERENCES users(id),
|
|
FOREIGN KEY(created_by) REFERENCES users(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS folder_permissions (
|
|
id TEXT PRIMARY KEY,
|
|
folder_id TEXT NOT NULL,
|
|
user_id TEXT NOT NULL,
|
|
level TEXT NOT NULL CHECK(level IN ('none','view','upload','manage','admin')),
|
|
created_at TEXT NOT NULL,
|
|
UNIQUE(folder_id, user_id),
|
|
FOREIGN KEY(folder_id) REFERENCES folders(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS photos (
|
|
id TEXT PRIMARY KEY,
|
|
folder_id TEXT NOT NULL,
|
|
file_name TEXT NOT NULL,
|
|
original_name TEXT NOT NULL,
|
|
stored_path TEXT NOT NULL,
|
|
thumb_path TEXT,
|
|
preview_path TEXT,
|
|
mime_type TEXT NOT NULL,
|
|
size_bytes INTEGER NOT NULL,
|
|
sha256 TEXT NOT NULL,
|
|
width INTEGER,
|
|
height INTEGER,
|
|
duration_ms INTEGER,
|
|
is_video INTEGER DEFAULT 0,
|
|
taken_at TEXT,
|
|
uploaded_at TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'approved' CHECK(status IN ('pending','approved','rejected')),
|
|
submitted_by TEXT NOT NULL,
|
|
approved_by TEXT,
|
|
approved_at TEXT,
|
|
rejected_by TEXT,
|
|
rejected_at TEXT,
|
|
reject_reason TEXT,
|
|
is_deleted INTEGER DEFAULT 0,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT,
|
|
FOREIGN KEY(folder_id) REFERENCES folders(id),
|
|
FOREIGN KEY(submitted_by) REFERENCES users(id),
|
|
FOREIGN KEY(approved_by) REFERENCES users(id),
|
|
FOREIGN KEY(rejected_by) REFERENCES users(id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_photos_folder ON photos(folder_id);
|
|
CREATE INDEX IF NOT EXISTS idx_photos_status ON photos(status);
|
|
CREATE INDEX IF NOT EXISTS idx_photos_taken_at ON photos(taken_at);
|
|
CREATE INDEX IF NOT EXISTS idx_photos_sha ON photos(sha256);
|
|
|
|
CREATE TABLE IF NOT EXISTS tags (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT UNIQUE NOT NULL,
|
|
color TEXT NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
CREATE TABLE IF NOT EXISTS photo_tags (
|
|
photo_id TEXT NOT NULL,
|
|
tag_id TEXT NOT NULL,
|
|
PRIMARY KEY(photo_id, tag_id),
|
|
FOREIGN KEY(photo_id) REFERENCES photos(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(tag_id) REFERENCES tags(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id TEXT PRIMARY KEY,
|
|
actor_id TEXT,
|
|
action TEXT NOT NULL,
|
|
target_type TEXT,
|
|
target_id TEXT,
|
|
message TEXT NOT NULL,
|
|
ip TEXT,
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY(actor_id) REFERENCES users(id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at);
|
|
"""
|
|
)
|
|
# Create admin if no users exist
|
|
count = con.execute("SELECT COUNT(*) AS c FROM users").fetchone()["c"]
|
|
if count == 0:
|
|
admin_id = make_id("usr_")
|
|
con.execute(
|
|
"INSERT INTO users(id,username,display_name,password_hash,role,can_create_folders,can_upload,can_tag,created_at) VALUES(?,?,?,?,?,?,?,?,?)",
|
|
(admin_id, self.admin_user, "Administrator", hash_password(self.admin_password), "admin", 1, 1, 1, now_iso()),
|
|
)
|
|
root_id = make_id("fld_")
|
|
con.execute(
|
|
"INSERT INTO folders(id,parent_id,name,path,owner_id,created_by,inherit_permissions,created_at) VALUES(?,?,?,?,?,?,?,?)",
|
|
(root_id, None, "Albumik", "/Albumik", admin_id, admin_id, 1, now_iso()),
|
|
)
|
|
self.log_event(con, admin_id, "init", "system", "albumik", "Utworzono pierwszego administratora i katalog główny", None)
|
|
else:
|
|
# Make sure there is a root folder
|
|
root = con.execute("SELECT id FROM folders WHERE parent_id IS NULL AND is_deleted=0 ORDER BY created_at LIMIT 1").fetchone()
|
|
if not root:
|
|
admin = con.execute("SELECT id FROM users WHERE role='admin' ORDER BY created_at LIMIT 1").fetchone()
|
|
if admin:
|
|
root_id = make_id("fld_")
|
|
con.execute(
|
|
"INSERT INTO folders(id,parent_id,name,path,owner_id,created_by,inherit_permissions,created_at) VALUES(?,?,?,?,?,?,?,?)",
|
|
(root_id, None, "Albumik", "/Albumik", admin["id"], admin["id"], 1, now_iso()),
|
|
)
|
|
|
|
def log_event(self, con, actor_id, action, target_type, target_id, message, ip):
|
|
con.execute(
|
|
"INSERT INTO events(id,actor_id,action,target_type,target_id,message,ip,created_at) VALUES(?,?,?,?,?,?,?,?)",
|
|
(make_id("evt_"), actor_id, action, target_type, target_id, message, ip, now_iso()),
|
|
)
|
|
|
|
def sanitize_name(self, name: str) -> str:
|
|
name = (name or "").strip().replace("/", "-").replace("\\", "-")
|
|
name = re.sub(r"[\x00-\x1f]", "", name)
|
|
return name[:120] or "Bez nazwy"
|
|
|
|
def resolve_folder_path(self, con, folder_id: str):
|
|
folder = con.execute("SELECT id,parent_id,name FROM folders WHERE id=? AND is_deleted=0", (folder_id,)).fetchone()
|
|
if not folder:
|
|
return None
|
|
names = [folder["name"]]
|
|
parent_id = folder["parent_id"]
|
|
safety = 0
|
|
while parent_id and safety < 50:
|
|
p = con.execute("SELECT id,parent_id,name FROM folders WHERE id=? AND is_deleted=0", (parent_id,)).fetchone()
|
|
if not p:
|
|
break
|
|
names.append(p["name"])
|
|
parent_id = p["parent_id"]
|
|
safety += 1
|
|
return "/" + "/".join(reversed(names))
|
|
|
|
def recompute_folder_path(self, con, folder_id: str):
|
|
path = self.resolve_folder_path(con, folder_id)
|
|
if path:
|
|
con.execute("UPDATE folders SET path=?, updated_at=? WHERE id=?", (path, now_iso(), folder_id))
|
|
return path
|
|
|
|
def get_user_from_request(self, handler):
|
|
cookie = SimpleCookie(handler.headers.get("Cookie", ""))
|
|
token = None
|
|
if "albumik_session" in cookie:
|
|
token = cookie["albumik_session"].value
|
|
auth = handler.headers.get("Authorization", "")
|
|
if auth.lower().startswith("bearer "):
|
|
token = auth.split(" ", 1)[1].strip()
|
|
if not token:
|
|
return None
|
|
th = token_hash(token)
|
|
with self.db() as con:
|
|
row = con.execute(
|
|
"""
|
|
SELECT u.* FROM sessions s
|
|
JOIN users u ON u.id=s.user_id
|
|
WHERE s.token_hash=? AND s.expires_at>? AND u.active=1
|
|
""",
|
|
(th, now_iso()),
|
|
).fetchone()
|
|
if row:
|
|
con.execute("UPDATE sessions SET last_seen_at=? WHERE token_hash=?", (now_iso(), th))
|
|
return dict(row)
|
|
return None
|
|
|
|
def folder_ancestors(self, con, folder_id: str):
|
|
ids = []
|
|
current = folder_id
|
|
safety = 0
|
|
while current and safety < 50:
|
|
row = con.execute("SELECT id,parent_id,inherit_permissions FROM folders WHERE id=? AND is_deleted=0", (current,)).fetchone()
|
|
if not row:
|
|
break
|
|
ids.append(row["id"])
|
|
if row["inherit_permissions"] == 0 and row["id"] != folder_id:
|
|
break
|
|
current = row["parent_id"]
|
|
safety += 1
|
|
return ids
|
|
|
|
def permission_value(self, con, user, folder_id: str) -> int:
|
|
if not user or not folder_id:
|
|
return 0
|
|
if user["role"] == "admin":
|
|
return 4
|
|
folder = con.execute("SELECT * FROM folders WHERE id=? AND is_deleted=0", (folder_id,)).fetchone()
|
|
if not folder:
|
|
return 0
|
|
if folder["created_by"] == user["id"] or folder["owner_id"] == user["id"]:
|
|
return 3 if user["role"] == "user" else 1
|
|
best = 0
|
|
# Direct permission first, then inherited from parents while allowed.
|
|
current = folder_id
|
|
safety = 0
|
|
while current and safety < 50:
|
|
p = con.execute("SELECT level FROM folder_permissions WHERE folder_id=? AND user_id=?", (current, user["id"])).fetchone()
|
|
if p:
|
|
best = max(best, PERM_VALUE.get(p["level"], 0))
|
|
f = con.execute("SELECT parent_id, inherit_permissions FROM folders WHERE id=? AND is_deleted=0", (current,)).fetchone()
|
|
if not f:
|
|
break
|
|
# Folder itself can inherit from parent only when inherit_permissions=1
|
|
if f["inherit_permissions"] == 0 and current != folder_id:
|
|
break
|
|
current = f["parent_id"]
|
|
safety += 1
|
|
return best
|
|
|
|
def can_view_folder(self, con, user, folder_id):
|
|
return self.permission_value(con, user, folder_id) >= 1
|
|
|
|
def can_upload_to_folder(self, con, user, folder_id):
|
|
if not user:
|
|
return False
|
|
if user["role"] == "admin":
|
|
return True
|
|
if user["role"] == "guest":
|
|
return self.permission_value(con, user, folder_id) >= 2
|
|
return bool(user["can_upload"]) and self.permission_value(con, user, folder_id) >= 2
|
|
|
|
def can_manage_folder(self, con, user, folder_id):
|
|
if not user:
|
|
return False
|
|
if user["role"] == "admin":
|
|
return True
|
|
if user["role"] == "guest":
|
|
return False
|
|
return self.permission_value(con, user, folder_id) >= 3
|
|
|
|
def visible_folder_ids(self, con, user):
|
|
rows = con.execute("SELECT id FROM folders WHERE is_deleted=0").fetchall()
|
|
return [r["id"] for r in rows if self.can_view_folder(con, user, r["id"])]
|
|
|
|
def descendant_folder_ids(self, con, root_id: str):
|
|
ids = [root_id]
|
|
queue = [root_id]
|
|
safety = 0
|
|
while queue and safety < 10000:
|
|
current = queue.pop(0)
|
|
children = con.execute("SELECT id FROM folders WHERE parent_id=? AND is_deleted=0", (current,)).fetchall()
|
|
for c in children:
|
|
ids.append(c["id"])
|
|
queue.append(c["id"])
|
|
safety += 1
|
|
return ids
|
|
|
|
def parse_json(self, handler):
|
|
length = int(handler.headers.get("Content-Length", "0") or 0)
|
|
if length <= 0:
|
|
return {}
|
|
body = handler.rfile.read(length)
|
|
return json.loads(body.decode("utf-8"))
|
|
|
|
def parse_multipart(self, handler):
|
|
content_type = handler.headers.get("Content-Type", "")
|
|
m = re.search(r"boundary=(.+)", content_type)
|
|
if not m:
|
|
raise ValueError("Brak boundary multipart")
|
|
boundary = m.group(1).strip().strip('"')
|
|
length = int(handler.headers.get("Content-Length", "0") or 0)
|
|
if length > self.max_upload_mb * 1024 * 1024:
|
|
raise ValueError(f"Plik za duży. Limit: {self.max_upload_mb} MB")
|
|
body = handler.rfile.read(length)
|
|
boundary_bytes = ("--" + boundary).encode()
|
|
fields = {}
|
|
files = []
|
|
for raw_part in body.split(boundary_bytes):
|
|
raw_part = raw_part.strip(b"\r\n")
|
|
if not raw_part or raw_part == b"--":
|
|
continue
|
|
if raw_part.endswith(b"--"):
|
|
raw_part = raw_part[:-2].strip(b"\r\n")
|
|
header_blob, sep, data = raw_part.partition(b"\r\n\r\n")
|
|
if not sep:
|
|
continue
|
|
headers_text = header_blob.decode("utf-8", "replace")
|
|
disp = ""
|
|
part_type = "application/octet-stream"
|
|
for line in headers_text.split("\r\n"):
|
|
if line.lower().startswith("content-disposition:"):
|
|
disp = line.split(":", 1)[1].strip()
|
|
if line.lower().startswith("content-type:"):
|
|
part_type = line.split(":", 1)[1].strip()
|
|
name_match = re.search(r'name="([^"]+)"', disp)
|
|
if not name_match:
|
|
continue
|
|
name = name_match.group(1)
|
|
filename_match = re.search(r'filename="([^"]*)"', disp)
|
|
if filename_match:
|
|
filename = filename_match.group(1)
|
|
files.append({"field": name, "filename": filename, "content_type": part_type, "data": data})
|
|
else:
|
|
fields[name] = data.decode("utf-8", "replace")
|
|
return fields, files
|
|
|
|
def response_json(self, handler, data, status=200, extra_headers=None):
|
|
payload = json.dumps(data, ensure_ascii=False).encode("utf-8")
|
|
handler.send_response(status)
|
|
handler.send_header("Content-Type", "application/json; charset=utf-8")
|
|
handler.send_header("Content-Length", str(len(payload)))
|
|
handler.send_header("Cache-Control", "no-store")
|
|
if extra_headers:
|
|
for k, v in extra_headers.items():
|
|
handler.send_header(k, v)
|
|
handler.end_headers()
|
|
handler.wfile.write(payload)
|
|
|
|
def error(self, handler, message, status=400):
|
|
self.response_json(handler, {"ok": False, "error": message}, status)
|
|
|
|
def clean_expired_sessions(self):
|
|
with self.db() as con:
|
|
con.execute("DELETE FROM sessions WHERE expires_at<=?", (now_iso(),))
|
|
|
|
def handle_api(self, handler, method, path, query):
|
|
try:
|
|
if path == "/api/health":
|
|
return self.response_json(handler, {"ok": True, "name": self.app_name, "version": APP_VERSION})
|
|
|
|
if path == "/api/login" and method == "POST":
|
|
data = self.parse_json(handler)
|
|
username = (data.get("username") or "").strip()
|
|
password = data.get("password") or ""
|
|
with self.db() as con:
|
|
user = con.execute("SELECT * FROM users WHERE username=? AND active=1", (username,)).fetchone()
|
|
if not user or not verify_password(password, user["password_hash"]):
|
|
return self.error(handler, "Nieprawidłowy login albo hasło", 401)
|
|
if user["expires_at"] and user["expires_at"] < now_iso():
|
|
return self.error(handler, "Konto wygasło", 403)
|
|
token = secrets.token_urlsafe(40)
|
|
expires = (dt.datetime.utcnow() + dt.timedelta(days=self.session_days)).replace(microsecond=0).isoformat() + "Z"
|
|
con.execute(
|
|
"INSERT INTO sessions(id,token_hash,user_id,created_at,expires_at,last_seen_at,ip,user_agent) VALUES(?,?,?,?,?,?,?,?)",
|
|
(make_id("ses_"), token_hash(token), user["id"], now_iso(), expires, now_iso(), handler.client_address[0], handler.headers.get("User-Agent", "")),
|
|
)
|
|
self.log_event(con, user["id"], "login", "user", user["id"], f"Logowanie użytkownika {username}", handler.client_address[0])
|
|
cookie = f"albumik_session={token}; Path=/; HttpOnly; SameSite=Lax; Max-Age={self.session_days*86400}"
|
|
return self.response_json(handler, {"ok": True, "user": self.public_user(dict(user))}, extra_headers={"Set-Cookie": cookie})
|
|
|
|
if path == "/api/logout" and method == "POST":
|
|
cookie = SimpleCookie(handler.headers.get("Cookie", ""))
|
|
if "albumik_session" in cookie:
|
|
th = token_hash(cookie["albumik_session"].value)
|
|
with self.db() as con:
|
|
con.execute("DELETE FROM sessions WHERE token_hash=?", (th,))
|
|
return self.response_json(handler, {"ok": True}, extra_headers={"Set-Cookie": "albumik_session=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax"})
|
|
|
|
user = self.get_user_from_request(handler)
|
|
if not user:
|
|
return self.error(handler, "Brak logowania", 401)
|
|
|
|
if path == "/api/me" and method == "GET":
|
|
return self.response_json(handler, {"ok": True, "user": self.public_user(user)})
|
|
|
|
if path == "/api/stats" and method == "GET":
|
|
return self.api_stats(handler, user)
|
|
|
|
if path == "/api/users" and method == "GET":
|
|
return self.api_list_users(handler, user)
|
|
if path == "/api/users" and method == "POST":
|
|
return self.api_create_user(handler, user)
|
|
m = re.match(r"^/api/users/([^/]+)$", path)
|
|
if m and method == "PUT":
|
|
return self.api_update_user(handler, user, m.group(1))
|
|
|
|
if path == "/api/folders" and method == "GET":
|
|
return self.api_list_folders(handler, user)
|
|
if path == "/api/folders" and method == "POST":
|
|
return self.api_create_folder(handler, user)
|
|
m = re.match(r"^/api/folders/([^/]+)$", path)
|
|
if m and method == "PUT":
|
|
return self.api_update_folder(handler, user, m.group(1))
|
|
if m and method == "DELETE":
|
|
return self.api_delete_folder(handler, user, m.group(1))
|
|
if path == "/api/folder-permissions" and method == "POST":
|
|
return self.api_set_folder_permission(handler, user)
|
|
|
|
if path == "/api/photos" and method == "GET":
|
|
return self.api_list_photos(handler, user, query)
|
|
if path == "/api/photos/upload" and method == "POST":
|
|
return self.api_upload(handler, user)
|
|
m = re.match(r"^/api/photos/([^/]+)/approve$", path)
|
|
if m and method == "POST":
|
|
return self.api_approve_photo(handler, user, m.group(1))
|
|
m = re.match(r"^/api/photos/([^/]+)/reject$", path)
|
|
if m and method == "POST":
|
|
return self.api_reject_photo(handler, user, m.group(1))
|
|
m = re.match(r"^/api/photos/([^/]+)/move$", path)
|
|
if m and method == "POST":
|
|
return self.api_move_photo(handler, user, m.group(1))
|
|
m = re.match(r"^/api/photos/([^/]+)/tags$", path)
|
|
if m and method == "POST":
|
|
return self.api_set_photo_tags(handler, user, m.group(1))
|
|
m = re.match(r"^/api/photos/([^/]+)$", path)
|
|
if m and method == "DELETE":
|
|
return self.api_delete_photo(handler, user, m.group(1))
|
|
|
|
if path == "/api/tags" and method == "GET":
|
|
return self.api_list_tags(handler, user)
|
|
if path == "/api/tags" and method == "POST":
|
|
return self.api_create_tag(handler, user)
|
|
|
|
if path == "/api/events" and method == "GET":
|
|
return self.api_events(handler, user)
|
|
|
|
return self.error(handler, "Nieznany endpoint API", 404)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return self.error(handler, "Błąd serwera: " + str(e), 500)
|
|
|
|
def public_user(self, user):
|
|
return {
|
|
"id": user["id"],
|
|
"username": user["username"],
|
|
"display_name": user["display_name"],
|
|
"role": user["role"],
|
|
"can_create_folders": bool(user["can_create_folders"]),
|
|
"can_upload": bool(user["can_upload"]),
|
|
"can_tag": bool(user["can_tag"]),
|
|
"active": bool(user["active"]),
|
|
"expires_at": user.get("expires_at"),
|
|
}
|
|
|
|
def api_stats(self, handler, user):
|
|
with self.db() as con:
|
|
visible = self.visible_folder_ids(con, user)
|
|
if not visible:
|
|
return self.response_json(handler, {"ok": True, "stats": {"photos": 0, "pending": 0, "folders": 0, "tags": 0, "users": 0, "bytes": 0}})
|
|
ph = ",".join("?" for _ in visible)
|
|
photos = con.execute(f"SELECT COUNT(*) c, COALESCE(SUM(size_bytes),0) b FROM photos WHERE is_deleted=0 AND folder_id IN ({ph}) AND status='approved'", visible).fetchone()
|
|
pending = con.execute(f"SELECT COUNT(*) c FROM photos WHERE is_deleted=0 AND folder_id IN ({ph}) AND status='pending'", visible).fetchone()
|
|
tags = con.execute("SELECT COUNT(*) c FROM tags").fetchone()
|
|
users = con.execute("SELECT COUNT(*) c FROM users WHERE active=1").fetchone() if user["role"] == "admin" else {"c": None}
|
|
return self.response_json(handler, {"ok": True, "stats": {"photos": photos["c"], "pending": pending["c"], "folders": len(visible), "tags": tags["c"], "users": users["c"], "bytes": photos["b"]}})
|
|
|
|
def api_list_users(self, handler, user):
|
|
if user["role"] != "admin":
|
|
return self.error(handler, "Tylko administrator może widzieć użytkowników", 403)
|
|
with self.db() as con:
|
|
rows = con.execute("SELECT * FROM users ORDER BY created_at DESC").fetchall()
|
|
return self.response_json(handler, {"ok": True, "users": [self.public_user(dict(r)) for r in rows]})
|
|
|
|
def api_create_user(self, handler, user):
|
|
if user["role"] != "admin":
|
|
return self.error(handler, "Tylko administrator może tworzyć konta", 403)
|
|
data = self.parse_json(handler)
|
|
username = self.sanitize_name(data.get("username", "")).lower().replace(" ", "")
|
|
password = data.get("password") or secrets.token_urlsafe(8)
|
|
role = data.get("role", "guest")
|
|
if role not in ("admin", "user", "guest"):
|
|
return self.error(handler, "Nieprawidłowa rola")
|
|
uid = make_id("usr_")
|
|
with self.db() as con:
|
|
con.execute(
|
|
"INSERT INTO users(id,username,display_name,password_hash,role,can_create_folders,can_upload,can_tag,active,expires_at,created_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
|
|
(
|
|
uid,
|
|
username,
|
|
self.sanitize_name(data.get("display_name") or username),
|
|
hash_password(password),
|
|
role,
|
|
1 if data.get("can_create_folders") else 0,
|
|
1 if data.get("can_upload") else 0,
|
|
1 if data.get("can_tag") else 0,
|
|
1,
|
|
data.get("expires_at") or None,
|
|
now_iso(),
|
|
),
|
|
)
|
|
self.log_event(con, user["id"], "create_user", "user", uid, f"Utworzono konto {username} ({role})", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True, "id": uid, "password": password})
|
|
|
|
def api_update_user(self, handler, user, uid):
|
|
if user["role"] != "admin":
|
|
return self.error(handler, "Tylko administrator może edytować konta", 403)
|
|
data = self.parse_json(handler)
|
|
allowed = []
|
|
params = []
|
|
for field in ["display_name", "role", "expires_at"]:
|
|
if field in data:
|
|
allowed.append(f"{field}=?")
|
|
params.append(data[field])
|
|
for field in ["can_create_folders", "can_upload", "can_tag", "active"]:
|
|
if field in data:
|
|
allowed.append(f"{field}=?")
|
|
params.append(1 if data[field] else 0)
|
|
if data.get("password"):
|
|
allowed.append("password_hash=?")
|
|
params.append(hash_password(data["password"]))
|
|
if not allowed:
|
|
return self.response_json(handler, {"ok": True})
|
|
allowed.append("updated_at=?")
|
|
params.append(now_iso())
|
|
params.append(uid)
|
|
with self.db() as con:
|
|
con.execute(f"UPDATE users SET {', '.join(allowed)} WHERE id=?", params)
|
|
self.log_event(con, user["id"], "update_user", "user", uid, "Zaktualizowano konto użytkownika", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_list_folders(self, handler, user):
|
|
with self.db() as con:
|
|
rows = con.execute("SELECT * FROM folders WHERE is_deleted=0 ORDER BY path COLLATE NOCASE").fetchall()
|
|
out = []
|
|
for r in rows:
|
|
perm = self.permission_value(con, user, r["id"])
|
|
if perm >= 1:
|
|
d = dict(r)
|
|
d["permission"] = VALUE_PERM.get(perm, "none")
|
|
d["can_manage"] = perm >= 3 or user["role"] == "admin"
|
|
d["can_upload"] = self.can_upload_to_folder(con, user, r["id"])
|
|
out.append(d)
|
|
return self.response_json(handler, {"ok": True, "folders": out})
|
|
|
|
def api_create_folder(self, handler, user):
|
|
data = self.parse_json(handler)
|
|
parent_id = data.get("parent_id")
|
|
name = self.sanitize_name(data.get("name", ""))
|
|
with self.db() as con:
|
|
if parent_id:
|
|
if not self.can_manage_folder(con, user, parent_id) and not (user["role"] == "user" and user["can_create_folders"] and self.can_view_folder(con, user, parent_id)):
|
|
return self.error(handler, "Brak uprawnień do tworzenia podkatalogu", 403)
|
|
else:
|
|
if user["role"] != "admin" and not (user["role"] == "user" and user["can_create_folders"]):
|
|
return self.error(handler, "Brak uprawnień do tworzenia katalogu", 403)
|
|
fid = make_id("fld_")
|
|
# temporary path; recomputed after insert
|
|
con.execute(
|
|
"INSERT INTO folders(id,parent_id,name,path,owner_id,created_by,inherit_permissions,created_at) VALUES(?,?,?,?,?,?,?,?)",
|
|
(fid, parent_id, name, "/" + name, user["id"], user["id"], 1 if data.get("inherit_permissions", True) else 0, now_iso()),
|
|
)
|
|
path = self.recompute_folder_path(con, fid)
|
|
# creator gets manage permission
|
|
con.execute(
|
|
"INSERT OR REPLACE INTO folder_permissions(id,folder_id,user_id,level,created_at) VALUES(COALESCE((SELECT id FROM folder_permissions WHERE folder_id=? AND user_id=?), ?),?,?,?,?)",
|
|
(fid, user["id"], make_id("fpe_"), fid, user["id"], "manage", now_iso()),
|
|
)
|
|
self.log_event(con, user["id"], "create_folder", "folder", fid, f"Utworzono katalog {path}", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True, "id": fid, "path": path})
|
|
|
|
def api_update_folder(self, handler, user, folder_id):
|
|
data = self.parse_json(handler)
|
|
with self.db() as con:
|
|
if not self.can_manage_folder(con, user, folder_id):
|
|
return self.error(handler, "Brak uprawnień do edycji katalogu", 403)
|
|
changes = []
|
|
params = []
|
|
if "name" in data:
|
|
changes.append("name=?")
|
|
params.append(self.sanitize_name(data["name"]))
|
|
if "inherit_permissions" in data:
|
|
changes.append("inherit_permissions=?")
|
|
params.append(1 if data["inherit_permissions"] else 0)
|
|
if "parent_id" in data:
|
|
parent_id = data["parent_id"] or None
|
|
if parent_id == folder_id:
|
|
return self.error(handler, "Katalog nie może być swoim rodzicem")
|
|
if parent_id and not self.can_manage_folder(con, user, parent_id):
|
|
return self.error(handler, "Brak uprawnień do katalogu docelowego", 403)
|
|
changes.append("parent_id=?")
|
|
params.append(parent_id)
|
|
if not changes:
|
|
return self.response_json(handler, {"ok": True})
|
|
changes.append("updated_at=?")
|
|
params.append(now_iso())
|
|
params.append(folder_id)
|
|
con.execute(f"UPDATE folders SET {', '.join(changes)} WHERE id=?", params)
|
|
path = self.recompute_folder_path(con, folder_id)
|
|
# Update descendants paths
|
|
for d in self.descendant_folder_ids(con, folder_id):
|
|
self.recompute_folder_path(con, d)
|
|
self.log_event(con, user["id"], "update_folder", "folder", folder_id, f"Zaktualizowano katalog {path}", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_delete_folder(self, handler, user, folder_id):
|
|
with self.db() as con:
|
|
if not self.can_manage_folder(con, user, folder_id):
|
|
return self.error(handler, "Brak uprawnień do usunięcia katalogu", 403)
|
|
con.execute("UPDATE folders SET is_deleted=1, updated_at=? WHERE id=?", (now_iso(), folder_id))
|
|
self.log_event(con, user["id"], "delete_folder", "folder", folder_id, "Usunięto katalog", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_set_folder_permission(self, handler, user):
|
|
if user["role"] != "admin":
|
|
return self.error(handler, "Tylko administrator może nadawać dostępy", 403)
|
|
data = self.parse_json(handler)
|
|
folder_id = data.get("folder_id")
|
|
user_id = data.get("user_id")
|
|
level = data.get("level", "view")
|
|
if level not in PERM_VALUE:
|
|
return self.error(handler, "Nieprawidłowy poziom uprawnień")
|
|
with self.db() as con:
|
|
if level == "none":
|
|
con.execute("DELETE FROM folder_permissions WHERE folder_id=? AND user_id=?", (folder_id, user_id))
|
|
else:
|
|
existing = con.execute("SELECT id FROM folder_permissions WHERE folder_id=? AND user_id=?", (folder_id, user_id)).fetchone()
|
|
if existing:
|
|
con.execute("UPDATE folder_permissions SET level=? WHERE id=?", (level, existing["id"]))
|
|
else:
|
|
con.execute("INSERT INTO folder_permissions(id,folder_id,user_id,level,created_at) VALUES(?,?,?,?,?)", (make_id("fpe_"), folder_id, user_id, level, now_iso()))
|
|
self.log_event(con, user["id"], "set_permission", "folder", folder_id, f"Nadano dostęp {level} użytkownikowi {user_id}", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_upload(self, handler, user):
|
|
fields, files = self.parse_multipart(handler)
|
|
folder_id = fields.get("folder_id")
|
|
tags = [t.strip() for t in (fields.get("tags") or "").split(",") if t.strip()]
|
|
taken_at = fields.get("taken_at") or None
|
|
if not folder_id:
|
|
return self.error(handler, "Brak katalogu")
|
|
if not files:
|
|
return self.error(handler, "Brak plików")
|
|
created = []
|
|
with self.db() as con:
|
|
if not self.can_upload_to_folder(con, user, folder_id):
|
|
return self.error(handler, "Brak uprawnień do wysyłania zdjęć w tym katalogu", 403)
|
|
status = "pending" if user["role"] == "guest" else "approved"
|
|
for f in files:
|
|
original = self.sanitize_name(os.path.basename(f["filename"] or "plik"))
|
|
ext = os.path.splitext(original)[1].lower()
|
|
is_video = 1 if ext in VIDEO_EXT or f["content_type"].startswith("video/") else 0
|
|
if not (ext in IMAGE_EXT or ext in VIDEO_EXT or f["content_type"].startswith("image/") or f["content_type"].startswith("video/")):
|
|
continue
|
|
content = f["data"]
|
|
sha = hashlib.sha256(content).hexdigest()
|
|
duplicate = con.execute("SELECT id FROM photos WHERE sha256=? AND size_bytes=? AND is_deleted=0", (sha, len(content))).fetchone()
|
|
if duplicate:
|
|
created.append({"id": duplicate["id"], "status": "already_exists"})
|
|
continue
|
|
pid = make_id("pho_")
|
|
subdir = "pending" if status == "pending" else "originals"
|
|
now = dt.datetime.utcnow()
|
|
rel_dir = f"{subdir}/{now.year:04d}/{now.month:02d}"
|
|
abs_dir = self.media_dir / rel_dir
|
|
abs_dir.mkdir(parents=True, exist_ok=True)
|
|
stored_name = f"{pid}{ext or '.bin'}"
|
|
abs_path = abs_dir / stored_name
|
|
with open(abs_path, "wb") as out:
|
|
out.write(content)
|
|
rel_path = f"{rel_dir}/{stored_name}"
|
|
mime_type = f["content_type"] or mimetypes.guess_type(original)[0] or "application/octet-stream"
|
|
con.execute(
|
|
"""
|
|
INSERT INTO photos(id,folder_id,file_name,original_name,stored_path,mime_type,size_bytes,sha256,is_video,taken_at,uploaded_at,status,submitted_by,approved_by,approved_at,created_at)
|
|
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
(pid, folder_id, original, original, rel_path, mime_type, len(content), sha, is_video, taken_at, now_iso(), status, user["id"], user["id"] if status == "approved" else None, now_iso() if status == "approved" else None, now_iso()),
|
|
)
|
|
if tags and user["role"] != "guest":
|
|
self.assign_tags(con, pid, tags)
|
|
created.append({"id": pid, "status": status})
|
|
self.log_event(con, user["id"], "upload", "folder", folder_id, f"Wysłano {len(created)} plików. Status: {status}", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True, "items": created})
|
|
|
|
def api_list_photos(self, handler, user, query):
|
|
folder_id = query.get("folder_id", [None])[0]
|
|
include_sub = query.get("include_subfolders", ["1"])[0] != "0"
|
|
status = query.get("status", ["approved"])[0]
|
|
q = (query.get("q", [""])[0] or "").strip().lower()
|
|
tag = (query.get("tag", [""])[0] or "").strip().lower()
|
|
date_from = query.get("date_from", [""])[0] or ""
|
|
date_to = query.get("date_to", [""])[0] or ""
|
|
limit = min(int(query.get("limit", [120])[0] or 120), 500)
|
|
with self.db() as con:
|
|
if folder_id:
|
|
ids = self.descendant_folder_ids(con, folder_id) if include_sub else [folder_id]
|
|
ids = [i for i in ids if self.can_view_folder(con, user, i)]
|
|
else:
|
|
ids = self.visible_folder_ids(con, user)
|
|
if not ids:
|
|
return self.response_json(handler, {"ok": True, "photos": []})
|
|
ph = ",".join("?" for _ in ids)
|
|
where = [f"p.is_deleted=0", f"p.folder_id IN ({ph})"]
|
|
params = list(ids)
|
|
if status and status != "all":
|
|
where.append("p.status=?")
|
|
params.append(status)
|
|
else:
|
|
# Guests cannot see rejected photos of others; users see all only where visible.
|
|
if user["role"] == "guest":
|
|
where.append("(p.status='approved' OR p.submitted_by=?)")
|
|
params.append(user["id"])
|
|
if q:
|
|
where.append("(LOWER(p.file_name) LIKE ? OR LOWER(p.original_name) LIKE ? OR LOWER(f.path) LIKE ?)")
|
|
params += [f"%{q}%", f"%{q}%", f"%{q}%"]
|
|
if date_from:
|
|
where.append("COALESCE(p.taken_at,p.uploaded_at,p.created_at)>=?")
|
|
params.append(date_from)
|
|
if date_to:
|
|
where.append("COALESCE(p.taken_at,p.uploaded_at,p.created_at)<=?")
|
|
params.append(date_to + "T23:59:59Z" if len(date_to) == 10 else date_to)
|
|
if tag:
|
|
where.append("EXISTS (SELECT 1 FROM photo_tags pt JOIN tags t ON t.id=pt.tag_id WHERE pt.photo_id=p.id AND LOWER(t.name)=?)")
|
|
params.append(tag)
|
|
sql = f"""
|
|
SELECT p.*, f.path AS folder_path, u.display_name AS submitted_by_name
|
|
FROM photos p
|
|
JOIN folders f ON f.id=p.folder_id
|
|
JOIN users u ON u.id=p.submitted_by
|
|
WHERE {' AND '.join(where)}
|
|
ORDER BY COALESCE(p.taken_at,p.uploaded_at,p.created_at) DESC
|
|
LIMIT ?
|
|
"""
|
|
params.append(limit)
|
|
rows = con.execute(sql, params).fetchall()
|
|
photos = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
d["tags"] = [dict(x) for x in con.execute("SELECT t.id,t.name,t.color FROM photo_tags pt JOIN tags t ON t.id=pt.tag_id WHERE pt.photo_id=? ORDER BY t.name", (r["id"],)).fetchall()]
|
|
d["url"] = self.base_path + f"/media/{r['id']}/original"
|
|
d["can_manage"] = self.can_manage_folder(con, user, r["folder_id"])
|
|
photos.append(d)
|
|
return self.response_json(handler, {"ok": True, "photos": photos})
|
|
|
|
def assign_tags(self, con, photo_id, names):
|
|
colors = ["#f472b6", "#a78bfa", "#60a5fa", "#34d399", "#f59e0b", "#fb7185", "#22c55e", "#38bdf8"]
|
|
for name in names:
|
|
clean = self.sanitize_name(name).lower()
|
|
if not clean:
|
|
continue
|
|
tag = con.execute("SELECT id FROM tags WHERE LOWER(name)=?", (clean,)).fetchone()
|
|
if tag:
|
|
tid = tag["id"]
|
|
else:
|
|
tid = make_id("tag_")
|
|
color = colors[abs(hash(clean)) % len(colors)]
|
|
con.execute("INSERT INTO tags(id,name,color,created_at) VALUES(?,?,?,?)", (tid, clean, color, now_iso()))
|
|
con.execute("INSERT OR IGNORE INTO photo_tags(photo_id,tag_id) VALUES(?,?)", (photo_id, tid))
|
|
|
|
def api_approve_photo(self, handler, user, photo_id):
|
|
data = self.parse_json(handler) if handler.headers.get("Content-Length") else {}
|
|
target_folder = data.get("folder_id")
|
|
tags = data.get("tags") or []
|
|
with self.db() as con:
|
|
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
|
|
if not p:
|
|
return self.error(handler, "Nie znaleziono zdjęcia", 404)
|
|
folder_id = target_folder or p["folder_id"]
|
|
if not self.can_manage_folder(con, user, p["folder_id"]) or not self.can_manage_folder(con, user, folder_id):
|
|
return self.error(handler, "Brak uprawnień do akceptacji", 403)
|
|
rel_path = p["stored_path"]
|
|
if rel_path.startswith("pending/"):
|
|
new_rel = rel_path.replace("pending/", "originals/", 1)
|
|
(self.media_dir / os.path.dirname(new_rel)).mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
shutil.move(str(self.media_dir / rel_path), str(self.media_dir / new_rel))
|
|
rel_path = new_rel
|
|
except FileNotFoundError:
|
|
pass
|
|
con.execute("UPDATE photos SET status='approved', folder_id=?, stored_path=?, approved_by=?, approved_at=?, updated_at=? WHERE id=?", (folder_id, rel_path, user["id"], now_iso(), now_iso(), photo_id))
|
|
if tags:
|
|
self.assign_tags(con, photo_id, tags)
|
|
self.log_event(con, user["id"], "approve_photo", "photo", photo_id, "Zaakceptowano zdjęcie", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_reject_photo(self, handler, user, photo_id):
|
|
data = self.parse_json(handler) if handler.headers.get("Content-Length") else {}
|
|
reason = data.get("reason") or ""
|
|
with self.db() as con:
|
|
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
|
|
if not p:
|
|
return self.error(handler, "Nie znaleziono zdjęcia", 404)
|
|
if not self.can_manage_folder(con, user, p["folder_id"]):
|
|
return self.error(handler, "Brak uprawnień do odrzucenia", 403)
|
|
con.execute("UPDATE photos SET status='rejected', rejected_by=?, rejected_at=?, reject_reason=?, updated_at=? WHERE id=?", (user["id"], now_iso(), reason, now_iso(), photo_id))
|
|
self.log_event(con, user["id"], "reject_photo", "photo", photo_id, "Odrzucono zdjęcie", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_move_photo(self, handler, user, photo_id):
|
|
data = self.parse_json(handler)
|
|
target = data.get("folder_id")
|
|
if not target:
|
|
return self.error(handler, "Brak katalogu docelowego")
|
|
with self.db() as con:
|
|
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
|
|
if not p:
|
|
return self.error(handler, "Nie znaleziono zdjęcia", 404)
|
|
if not self.can_manage_folder(con, user, p["folder_id"]) or not self.can_manage_folder(con, user, target):
|
|
return self.error(handler, "Brak uprawnień do przeniesienia", 403)
|
|
con.execute("UPDATE photos SET folder_id=?, updated_at=? WHERE id=?", (target, now_iso(), photo_id))
|
|
self.log_event(con, user["id"], "move_photo", "photo", photo_id, f"Przeniesiono zdjęcie do katalogu {target}", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_set_photo_tags(self, handler, user, photo_id):
|
|
data = self.parse_json(handler)
|
|
names = data.get("tags") or []
|
|
with self.db() as con:
|
|
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
|
|
if not p:
|
|
return self.error(handler, "Nie znaleziono zdjęcia", 404)
|
|
if user["role"] == "guest" or not (user["role"] == "admin" or (user["can_tag"] and self.can_view_folder(con, user, p["folder_id"]))):
|
|
return self.error(handler, "Brak uprawnień do tagowania", 403)
|
|
con.execute("DELETE FROM photo_tags WHERE photo_id=?", (photo_id,))
|
|
self.assign_tags(con, photo_id, names)
|
|
self.log_event(con, user["id"], "tag_photo", "photo", photo_id, "Zmieniono tagi zdjęcia", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_delete_photo(self, handler, user, photo_id):
|
|
with self.db() as con:
|
|
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
|
|
if not p:
|
|
return self.error(handler, "Nie znaleziono zdjęcia", 404)
|
|
if not self.can_manage_folder(con, user, p["folder_id"]):
|
|
return self.error(handler, "Brak uprawnień do usunięcia", 403)
|
|
con.execute("UPDATE photos SET is_deleted=1, updated_at=? WHERE id=?", (now_iso(), photo_id))
|
|
self.log_event(con, user["id"], "delete_photo", "photo", photo_id, "Usunięto zdjęcie", handler.client_address[0])
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_list_tags(self, handler, user):
|
|
with self.db() as con:
|
|
rows = con.execute("SELECT t.*, COUNT(pt.photo_id) AS count FROM tags t LEFT JOIN photo_tags pt ON pt.tag_id=t.id GROUP BY t.id ORDER BY t.name").fetchall()
|
|
return self.response_json(handler, {"ok": True, "tags": [dict(r) for r in rows]})
|
|
|
|
def api_create_tag(self, handler, user):
|
|
if user["role"] == "guest":
|
|
return self.error(handler, "Gość nie może tworzyć tagów", 403)
|
|
data = self.parse_json(handler)
|
|
name = self.sanitize_name(data.get("name", "")).lower()
|
|
color = data.get("color") or "#a78bfa"
|
|
tid = make_id("tag_")
|
|
with self.db() as con:
|
|
con.execute("INSERT OR IGNORE INTO tags(id,name,color,created_at) VALUES(?,?,?,?)", (tid, name, color, now_iso()))
|
|
return self.response_json(handler, {"ok": True})
|
|
|
|
def api_events(self, handler, user):
|
|
if user["role"] != "admin":
|
|
return self.error(handler, "Tylko administrator może widzieć dziennik", 403)
|
|
with self.db() as con:
|
|
rows = con.execute("SELECT e.*, u.display_name AS actor_name FROM events e LEFT JOIN users u ON u.id=e.actor_id ORDER BY e.created_at DESC LIMIT 200").fetchall()
|
|
return self.response_json(handler, {"ok": True, "events": [dict(r) for r in rows]})
|
|
|
|
def serve_media(self, handler, photo_id, variant="original"):
|
|
user = self.get_user_from_request(handler)
|
|
if not user:
|
|
handler.send_error(401)
|
|
return
|
|
with self.db() as con:
|
|
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
|
|
if not p:
|
|
handler.send_error(404)
|
|
return
|
|
if not self.can_view_folder(con, user, p["folder_id"]):
|
|
handler.send_error(403)
|
|
return
|
|
if p["status"] != "approved" and not (p["submitted_by"] == user["id"] or self.can_manage_folder(con, user, p["folder_id"])):
|
|
handler.send_error(403)
|
|
return
|
|
rel = p["stored_path"] if variant == "original" else (p[f"{variant}_path"] or p["stored_path"])
|
|
path = (self.media_dir / rel).resolve()
|
|
if not str(path).startswith(str(self.media_dir.resolve())) or not path.exists():
|
|
handler.send_error(404)
|
|
return
|
|
ctype = p["mime_type"] or mimetypes.guess_type(str(path))[0] or "application/octet-stream"
|
|
handler.send_response(200)
|
|
handler.send_header("Content-Type", ctype)
|
|
handler.send_header("Content-Length", str(path.stat().st_size))
|
|
handler.send_header("Cache-Control", "private, max-age=86400")
|
|
handler.end_headers()
|
|
with open(path, "rb") as f:
|
|
shutil.copyfileobj(f, handler.wfile)
|
|
|
|
def serve_static(self, handler, path):
|
|
if path == "/" or path == "":
|
|
path = "/index.html"
|
|
# strip base path when app is installed in subpath
|
|
rel = posixpath.normpath(urllib.parse.unquote(path)).lstrip("/")
|
|
if rel.startswith(".."):
|
|
handler.send_error(403)
|
|
return
|
|
file_path = (self.web_dir / rel).resolve()
|
|
if not str(file_path).startswith(str(self.web_dir)) or not file_path.exists() or file_path.is_dir():
|
|
file_path = self.web_dir / "index.html"
|
|
ctype = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
|
|
data = file_path.read_bytes()
|
|
handler.send_response(200)
|
|
handler.send_header("Content-Type", ctype)
|
|
handler.send_header("Content-Length", str(len(data)))
|
|
handler.send_header("Cache-Control", "no-cache" if file_path.name == "index.html" else "public, max-age=3600")
|
|
handler.end_headers()
|
|
handler.wfile.write(data)
|
|
|
|
|
|
def make_handler(app: AlbumikApp):
|
|
class Handler(BaseHTTPRequestHandler):
|
|
server_version = f"Albumik/{APP_VERSION}"
|
|
|
|
def do_GET(self):
|
|
self.route("GET")
|
|
|
|
def do_POST(self):
|
|
self.route("POST")
|
|
|
|
def do_PUT(self):
|
|
self.route("PUT")
|
|
|
|
def do_DELETE(self):
|
|
self.route("DELETE")
|
|
|
|
def route(self, method):
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
path = parsed.path
|
|
# If proxied on subpath, remove it for routing.
|
|
if app.base_path and path.startswith(app.base_path + "/"):
|
|
path = path[len(app.base_path):]
|
|
query = urllib.parse.parse_qs(parsed.query)
|
|
if path.startswith("/api/"):
|
|
return app.handle_api(self, method, path, query)
|
|
m = re.match(r"^/media/([^/]+)/(original|thumb|preview)$", path)
|
|
if m and method == "GET":
|
|
return app.serve_media(self, m.group(1), m.group(2))
|
|
return app.serve_static(self, path)
|
|
|
|
def log_message(self, fmt, *args):
|
|
sys.stdout.write("%s - - [%s] %s\n" % (self.client_address[0], self.log_date_time_string(), fmt % args))
|
|
sys.stdout.flush()
|
|
|
|
return Handler
|
|
|
|
|
|
def main():
|
|
config_path = os.environ.get("ALBUMIK_CONFIG") or (sys.argv[1] if len(sys.argv) > 1 else "./config.json")
|
|
app = AlbumikApp(config_path)
|
|
host, port_s = app.listen.rsplit(":", 1)
|
|
server = ThreadingHTTPServer((host, int(port_s)), make_handler(app))
|
|
print(f"Albumik {APP_VERSION} listening on http://{app.listen}", flush=True)
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|