Files
albumik_ynh/backend/app.py
2026-05-01 09:06:44 +02:00

1070 lines
53 KiB
Python
Executable File

#!/usr/bin/env python3
# Albumik - lekka prywatna galeria zdjęć
# Backend bez zewnętrznych zależności: Python stdlib + SQLite.
import base64
import datetime as dt
import hashlib
import hmac
import json
import mimetypes
import os
import posixpath
import re
import secrets
import shutil
import sqlite3
import sys
import time
import traceback
import urllib.parse
import uuid
from http import HTTPStatus
from http.cookies import SimpleCookie
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
APP_VERSION = "0.1.0"
PERM_VALUE = {"none": 0, "view": 1, "upload": 2, "manage": 3, "admin": 4}
VALUE_PERM = {v: k for k, v in PERM_VALUE.items()}
IMAGE_EXT = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".heic", ".heif", ".bmp"}
VIDEO_EXT = {".mp4", ".mov", ".m4v", ".webm", ".avi", ".mkv"}
def now_iso():
return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def make_id(prefix=""):
return prefix + uuid.uuid4().hex
def hash_password(password: str, salt: bytes | None = None) -> str:
if salt is None:
salt = secrets.token_bytes(16)
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 120_000)
return "pbkdf2_sha256$120000$" + base64.b64encode(salt).decode() + "$" + base64.b64encode(dk).decode()
def verify_password(password: str, encoded: str) -> bool:
try:
algo, rounds, salt_b64, hash_b64 = encoded.split("$", 3)
if algo != "pbkdf2_sha256":
return False
salt = base64.b64decode(salt_b64)
expected = base64.b64decode(hash_b64)
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, int(rounds))
return hmac.compare_digest(dk, expected)
except Exception:
return False
def token_hash(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
class AlbumikApp:
def __init__(self, config_path: str):
self.config_path = config_path
with open(config_path, "r", encoding="utf-8") as f:
self.config = json.load(f)
self.data_dir = Path(self.config.get("data_dir", "./data")).resolve()
self.db_path = Path(self.config.get("database", str(self.data_dir / "albumik.db"))).resolve()
self.media_dir = Path(self.config.get("media_dir", str(self.data_dir / "media"))).resolve()
self.web_dir = Path(self.config.get("web_dir", "./web")).resolve()
self.listen = self.config.get("listen", "127.0.0.1:8097")
self.base_path = self.config.get("base_path", "/").rstrip("/") or ""
self.app_name = self.config.get("app_name", "Albumik")
self.admin_user = self.config.get("admin_user", "admin")
self.admin_password = self.config.get("admin_password", "admin")
self.session_days = int(self.config.get("session_days", 14))
self.max_upload_mb = int(self.config.get("max_upload_mb", 512))
self.allow_public_register = False
for p in [self.data_dir, self.db_path.parent, self.media_dir / "originals", self.media_dir / "thumbs", self.media_dir / "previews", self.media_dir / "pending", self.data_dir / "logs"]:
p.mkdir(parents=True, exist_ok=True)
self.init_db()
def db(self):
conn = sqlite3.connect(str(self.db_path), timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def init_db(self):
with self.db() as con:
con.executescript(
"""
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
display_name TEXT NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL CHECK(role IN ('admin','user','guest')),
can_create_folders INTEGER DEFAULT 0,
can_upload INTEGER DEFAULT 0,
can_tag INTEGER DEFAULT 0,
active INTEGER DEFAULT 1,
expires_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT
);
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
token_hash TEXT UNIQUE NOT NULL,
user_id TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
last_seen_at TEXT,
ip TEXT,
user_agent TEXT,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS folders (
id TEXT PRIMARY KEY,
parent_id TEXT,
name TEXT NOT NULL,
path TEXT NOT NULL,
owner_id TEXT NOT NULL,
created_by TEXT NOT NULL,
inherit_permissions INTEGER DEFAULT 1,
is_deleted INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT,
FOREIGN KEY(parent_id) REFERENCES folders(id) ON DELETE SET NULL,
FOREIGN KEY(owner_id) REFERENCES users(id),
FOREIGN KEY(created_by) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS folder_permissions (
id TEXT PRIMARY KEY,
folder_id TEXT NOT NULL,
user_id TEXT NOT NULL,
level TEXT NOT NULL CHECK(level IN ('none','view','upload','manage','admin')),
created_at TEXT NOT NULL,
UNIQUE(folder_id, user_id),
FOREIGN KEY(folder_id) REFERENCES folders(id) ON DELETE CASCADE,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS photos (
id TEXT PRIMARY KEY,
folder_id TEXT NOT NULL,
file_name TEXT NOT NULL,
original_name TEXT NOT NULL,
stored_path TEXT NOT NULL,
thumb_path TEXT,
preview_path TEXT,
mime_type TEXT NOT NULL,
size_bytes INTEGER NOT NULL,
sha256 TEXT NOT NULL,
width INTEGER,
height INTEGER,
duration_ms INTEGER,
is_video INTEGER DEFAULT 0,
taken_at TEXT,
uploaded_at TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'approved' CHECK(status IN ('pending','approved','rejected')),
submitted_by TEXT NOT NULL,
approved_by TEXT,
approved_at TEXT,
rejected_by TEXT,
rejected_at TEXT,
reject_reason TEXT,
is_deleted INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT,
FOREIGN KEY(folder_id) REFERENCES folders(id),
FOREIGN KEY(submitted_by) REFERENCES users(id),
FOREIGN KEY(approved_by) REFERENCES users(id),
FOREIGN KEY(rejected_by) REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS idx_photos_folder ON photos(folder_id);
CREATE INDEX IF NOT EXISTS idx_photos_status ON photos(status);
CREATE INDEX IF NOT EXISTS idx_photos_taken_at ON photos(taken_at);
CREATE INDEX IF NOT EXISTS idx_photos_sha ON photos(sha256);
CREATE TABLE IF NOT EXISTS tags (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
color TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS photo_tags (
photo_id TEXT NOT NULL,
tag_id TEXT NOT NULL,
PRIMARY KEY(photo_id, tag_id),
FOREIGN KEY(photo_id) REFERENCES photos(id) ON DELETE CASCADE,
FOREIGN KEY(tag_id) REFERENCES tags(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
actor_id TEXT,
action TEXT NOT NULL,
target_type TEXT,
target_id TEXT,
message TEXT NOT NULL,
ip TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY(actor_id) REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at);
"""
)
# Create admin if no users exist
count = con.execute("SELECT COUNT(*) AS c FROM users").fetchone()["c"]
if count == 0:
admin_id = make_id("usr_")
con.execute(
"INSERT INTO users(id,username,display_name,password_hash,role,can_create_folders,can_upload,can_tag,created_at) VALUES(?,?,?,?,?,?,?,?,?)",
(admin_id, self.admin_user, "Administrator", hash_password(self.admin_password), "admin", 1, 1, 1, now_iso()),
)
root_id = make_id("fld_")
con.execute(
"INSERT INTO folders(id,parent_id,name,path,owner_id,created_by,inherit_permissions,created_at) VALUES(?,?,?,?,?,?,?,?)",
(root_id, None, "Albumik", "/Albumik", admin_id, admin_id, 1, now_iso()),
)
self.log_event(con, admin_id, "init", "system", "albumik", "Utworzono pierwszego administratora i katalog główny", None)
else:
# Make sure there is a root folder
root = con.execute("SELECT id FROM folders WHERE parent_id IS NULL AND is_deleted=0 ORDER BY created_at LIMIT 1").fetchone()
if not root:
admin = con.execute("SELECT id FROM users WHERE role='admin' ORDER BY created_at LIMIT 1").fetchone()
if admin:
root_id = make_id("fld_")
con.execute(
"INSERT INTO folders(id,parent_id,name,path,owner_id,created_by,inherit_permissions,created_at) VALUES(?,?,?,?,?,?,?,?)",
(root_id, None, "Albumik", "/Albumik", admin["id"], admin["id"], 1, now_iso()),
)
def log_event(self, con, actor_id, action, target_type, target_id, message, ip):
con.execute(
"INSERT INTO events(id,actor_id,action,target_type,target_id,message,ip,created_at) VALUES(?,?,?,?,?,?,?,?)",
(make_id("evt_"), actor_id, action, target_type, target_id, message, ip, now_iso()),
)
def sanitize_name(self, name: str) -> str:
name = (name or "").strip().replace("/", "-").replace("\\", "-")
name = re.sub(r"[\x00-\x1f]", "", name)
return name[:120] or "Bez nazwy"
def resolve_folder_path(self, con, folder_id: str):
folder = con.execute("SELECT id,parent_id,name FROM folders WHERE id=? AND is_deleted=0", (folder_id,)).fetchone()
if not folder:
return None
names = [folder["name"]]
parent_id = folder["parent_id"]
safety = 0
while parent_id and safety < 50:
p = con.execute("SELECT id,parent_id,name FROM folders WHERE id=? AND is_deleted=0", (parent_id,)).fetchone()
if not p:
break
names.append(p["name"])
parent_id = p["parent_id"]
safety += 1
return "/" + "/".join(reversed(names))
def recompute_folder_path(self, con, folder_id: str):
path = self.resolve_folder_path(con, folder_id)
if path:
con.execute("UPDATE folders SET path=?, updated_at=? WHERE id=?", (path, now_iso(), folder_id))
return path
def get_user_from_request(self, handler):
cookie = SimpleCookie(handler.headers.get("Cookie", ""))
token = None
if "albumik_session" in cookie:
token = cookie["albumik_session"].value
auth = handler.headers.get("Authorization", "")
if auth.lower().startswith("bearer "):
token = auth.split(" ", 1)[1].strip()
if not token:
return None
th = token_hash(token)
with self.db() as con:
row = con.execute(
"""
SELECT u.* FROM sessions s
JOIN users u ON u.id=s.user_id
WHERE s.token_hash=? AND s.expires_at>? AND u.active=1
""",
(th, now_iso()),
).fetchone()
if row:
con.execute("UPDATE sessions SET last_seen_at=? WHERE token_hash=?", (now_iso(), th))
return dict(row)
return None
def folder_ancestors(self, con, folder_id: str):
ids = []
current = folder_id
safety = 0
while current and safety < 50:
row = con.execute("SELECT id,parent_id,inherit_permissions FROM folders WHERE id=? AND is_deleted=0", (current,)).fetchone()
if not row:
break
ids.append(row["id"])
if row["inherit_permissions"] == 0 and row["id"] != folder_id:
break
current = row["parent_id"]
safety += 1
return ids
def permission_value(self, con, user, folder_id: str) -> int:
if not user or not folder_id:
return 0
if user["role"] == "admin":
return 4
folder = con.execute("SELECT * FROM folders WHERE id=? AND is_deleted=0", (folder_id,)).fetchone()
if not folder:
return 0
if folder["created_by"] == user["id"] or folder["owner_id"] == user["id"]:
return 3 if user["role"] == "user" else 1
best = 0
# Direct permission first, then inherited from parents while allowed.
current = folder_id
safety = 0
while current and safety < 50:
p = con.execute("SELECT level FROM folder_permissions WHERE folder_id=? AND user_id=?", (current, user["id"])).fetchone()
if p:
best = max(best, PERM_VALUE.get(p["level"], 0))
f = con.execute("SELECT parent_id, inherit_permissions FROM folders WHERE id=? AND is_deleted=0", (current,)).fetchone()
if not f:
break
# Folder itself can inherit from parent only when inherit_permissions=1
if f["inherit_permissions"] == 0 and current != folder_id:
break
current = f["parent_id"]
safety += 1
return best
def can_view_folder(self, con, user, folder_id):
return self.permission_value(con, user, folder_id) >= 1
def can_upload_to_folder(self, con, user, folder_id):
if not user:
return False
if user["role"] == "admin":
return True
if user["role"] == "guest":
return self.permission_value(con, user, folder_id) >= 2
return bool(user["can_upload"]) and self.permission_value(con, user, folder_id) >= 2
def can_manage_folder(self, con, user, folder_id):
if not user:
return False
if user["role"] == "admin":
return True
if user["role"] == "guest":
return False
return self.permission_value(con, user, folder_id) >= 3
def visible_folder_ids(self, con, user):
rows = con.execute("SELECT id FROM folders WHERE is_deleted=0").fetchall()
return [r["id"] for r in rows if self.can_view_folder(con, user, r["id"])]
def descendant_folder_ids(self, con, root_id: str):
ids = [root_id]
queue = [root_id]
safety = 0
while queue and safety < 10000:
current = queue.pop(0)
children = con.execute("SELECT id FROM folders WHERE parent_id=? AND is_deleted=0", (current,)).fetchall()
for c in children:
ids.append(c["id"])
queue.append(c["id"])
safety += 1
return ids
def parse_json(self, handler):
length = int(handler.headers.get("Content-Length", "0") or 0)
if length <= 0:
return {}
body = handler.rfile.read(length)
return json.loads(body.decode("utf-8"))
def parse_multipart(self, handler):
content_type = handler.headers.get("Content-Type", "")
m = re.search(r"boundary=(.+)", content_type)
if not m:
raise ValueError("Brak boundary multipart")
boundary = m.group(1).strip().strip('"')
length = int(handler.headers.get("Content-Length", "0") or 0)
if length > self.max_upload_mb * 1024 * 1024:
raise ValueError(f"Plik za duży. Limit: {self.max_upload_mb} MB")
body = handler.rfile.read(length)
boundary_bytes = ("--" + boundary).encode()
fields = {}
files = []
for raw_part in body.split(boundary_bytes):
raw_part = raw_part.strip(b"\r\n")
if not raw_part or raw_part == b"--":
continue
if raw_part.endswith(b"--"):
raw_part = raw_part[:-2].strip(b"\r\n")
header_blob, sep, data = raw_part.partition(b"\r\n\r\n")
if not sep:
continue
headers_text = header_blob.decode("utf-8", "replace")
disp = ""
part_type = "application/octet-stream"
for line in headers_text.split("\r\n"):
if line.lower().startswith("content-disposition:"):
disp = line.split(":", 1)[1].strip()
if line.lower().startswith("content-type:"):
part_type = line.split(":", 1)[1].strip()
name_match = re.search(r'name="([^"]+)"', disp)
if not name_match:
continue
name = name_match.group(1)
filename_match = re.search(r'filename="([^"]*)"', disp)
if filename_match:
filename = filename_match.group(1)
files.append({"field": name, "filename": filename, "content_type": part_type, "data": data})
else:
fields[name] = data.decode("utf-8", "replace")
return fields, files
def response_json(self, handler, data, status=200, extra_headers=None):
payload = json.dumps(data, ensure_ascii=False).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(payload)))
handler.send_header("Cache-Control", "no-store")
if extra_headers:
for k, v in extra_headers.items():
handler.send_header(k, v)
handler.end_headers()
handler.wfile.write(payload)
def error(self, handler, message, status=400):
self.response_json(handler, {"ok": False, "error": message}, status)
def clean_expired_sessions(self):
with self.db() as con:
con.execute("DELETE FROM sessions WHERE expires_at<=?", (now_iso(),))
def handle_api(self, handler, method, path, query):
try:
if path == "/api/health":
return self.response_json(handler, {"ok": True, "name": self.app_name, "version": APP_VERSION})
if path == "/api/login" and method == "POST":
data = self.parse_json(handler)
username = (data.get("username") or "").strip()
password = data.get("password") or ""
with self.db() as con:
user = con.execute("SELECT * FROM users WHERE username=? AND active=1", (username,)).fetchone()
if not user or not verify_password(password, user["password_hash"]):
return self.error(handler, "Nieprawidłowy login albo hasło", 401)
if user["expires_at"] and user["expires_at"] < now_iso():
return self.error(handler, "Konto wygasło", 403)
token = secrets.token_urlsafe(40)
expires = (dt.datetime.utcnow() + dt.timedelta(days=self.session_days)).replace(microsecond=0).isoformat() + "Z"
con.execute(
"INSERT INTO sessions(id,token_hash,user_id,created_at,expires_at,last_seen_at,ip,user_agent) VALUES(?,?,?,?,?,?,?,?)",
(make_id("ses_"), token_hash(token), user["id"], now_iso(), expires, now_iso(), handler.client_address[0], handler.headers.get("User-Agent", "")),
)
self.log_event(con, user["id"], "login", "user", user["id"], f"Logowanie użytkownika {username}", handler.client_address[0])
cookie = f"albumik_session={token}; Path=/; HttpOnly; SameSite=Lax; Max-Age={self.session_days*86400}"
return self.response_json(handler, {"ok": True, "user": self.public_user(dict(user))}, extra_headers={"Set-Cookie": cookie})
if path == "/api/logout" and method == "POST":
cookie = SimpleCookie(handler.headers.get("Cookie", ""))
if "albumik_session" in cookie:
th = token_hash(cookie["albumik_session"].value)
with self.db() as con:
con.execute("DELETE FROM sessions WHERE token_hash=?", (th,))
return self.response_json(handler, {"ok": True}, extra_headers={"Set-Cookie": "albumik_session=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax"})
user = self.get_user_from_request(handler)
if not user:
return self.error(handler, "Brak logowania", 401)
if path == "/api/me" and method == "GET":
return self.response_json(handler, {"ok": True, "user": self.public_user(user)})
if path == "/api/stats" and method == "GET":
return self.api_stats(handler, user)
if path == "/api/users" and method == "GET":
return self.api_list_users(handler, user)
if path == "/api/users" and method == "POST":
return self.api_create_user(handler, user)
m = re.match(r"^/api/users/([^/]+)$", path)
if m and method == "PUT":
return self.api_update_user(handler, user, m.group(1))
if path == "/api/folders" and method == "GET":
return self.api_list_folders(handler, user)
if path == "/api/folders" and method == "POST":
return self.api_create_folder(handler, user)
m = re.match(r"^/api/folders/([^/]+)$", path)
if m and method == "PUT":
return self.api_update_folder(handler, user, m.group(1))
if m and method == "DELETE":
return self.api_delete_folder(handler, user, m.group(1))
if path == "/api/folder-permissions" and method == "POST":
return self.api_set_folder_permission(handler, user)
if path == "/api/photos" and method == "GET":
return self.api_list_photos(handler, user, query)
if path == "/api/photos/upload" and method == "POST":
return self.api_upload(handler, user)
m = re.match(r"^/api/photos/([^/]+)/approve$", path)
if m and method == "POST":
return self.api_approve_photo(handler, user, m.group(1))
m = re.match(r"^/api/photos/([^/]+)/reject$", path)
if m and method == "POST":
return self.api_reject_photo(handler, user, m.group(1))
m = re.match(r"^/api/photos/([^/]+)/move$", path)
if m and method == "POST":
return self.api_move_photo(handler, user, m.group(1))
m = re.match(r"^/api/photos/([^/]+)/tags$", path)
if m and method == "POST":
return self.api_set_photo_tags(handler, user, m.group(1))
m = re.match(r"^/api/photos/([^/]+)$", path)
if m and method == "DELETE":
return self.api_delete_photo(handler, user, m.group(1))
if path == "/api/tags" and method == "GET":
return self.api_list_tags(handler, user)
if path == "/api/tags" and method == "POST":
return self.api_create_tag(handler, user)
if path == "/api/events" and method == "GET":
return self.api_events(handler, user)
return self.error(handler, "Nieznany endpoint API", 404)
except Exception as e:
traceback.print_exc()
return self.error(handler, "Błąd serwera: " + str(e), 500)
def public_user(self, user):
return {
"id": user["id"],
"username": user["username"],
"display_name": user["display_name"],
"role": user["role"],
"can_create_folders": bool(user["can_create_folders"]),
"can_upload": bool(user["can_upload"]),
"can_tag": bool(user["can_tag"]),
"active": bool(user["active"]),
"expires_at": user.get("expires_at"),
}
def api_stats(self, handler, user):
with self.db() as con:
visible = self.visible_folder_ids(con, user)
if not visible:
return self.response_json(handler, {"ok": True, "stats": {"photos": 0, "pending": 0, "folders": 0, "tags": 0, "users": 0, "bytes": 0}})
ph = ",".join("?" for _ in visible)
photos = con.execute(f"SELECT COUNT(*) c, COALESCE(SUM(size_bytes),0) b FROM photos WHERE is_deleted=0 AND folder_id IN ({ph}) AND status='approved'", visible).fetchone()
pending = con.execute(f"SELECT COUNT(*) c FROM photos WHERE is_deleted=0 AND folder_id IN ({ph}) AND status='pending'", visible).fetchone()
tags = con.execute("SELECT COUNT(*) c FROM tags").fetchone()
users = con.execute("SELECT COUNT(*) c FROM users WHERE active=1").fetchone() if user["role"] == "admin" else {"c": None}
return self.response_json(handler, {"ok": True, "stats": {"photos": photos["c"], "pending": pending["c"], "folders": len(visible), "tags": tags["c"], "users": users["c"], "bytes": photos["b"]}})
def api_list_users(self, handler, user):
if user["role"] != "admin":
return self.error(handler, "Tylko administrator może widzieć użytkowników", 403)
with self.db() as con:
rows = con.execute("SELECT * FROM users ORDER BY created_at DESC").fetchall()
return self.response_json(handler, {"ok": True, "users": [self.public_user(dict(r)) for r in rows]})
def api_create_user(self, handler, user):
if user["role"] != "admin":
return self.error(handler, "Tylko administrator może tworzyć konta", 403)
data = self.parse_json(handler)
username = self.sanitize_name(data.get("username", "")).lower().replace(" ", "")
password = data.get("password") or secrets.token_urlsafe(8)
role = data.get("role", "guest")
if role not in ("admin", "user", "guest"):
return self.error(handler, "Nieprawidłowa rola")
uid = make_id("usr_")
with self.db() as con:
con.execute(
"INSERT INTO users(id,username,display_name,password_hash,role,can_create_folders,can_upload,can_tag,active,expires_at,created_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
(
uid,
username,
self.sanitize_name(data.get("display_name") or username),
hash_password(password),
role,
1 if data.get("can_create_folders") else 0,
1 if data.get("can_upload") else 0,
1 if data.get("can_tag") else 0,
1,
data.get("expires_at") or None,
now_iso(),
),
)
self.log_event(con, user["id"], "create_user", "user", uid, f"Utworzono konto {username} ({role})", handler.client_address[0])
return self.response_json(handler, {"ok": True, "id": uid, "password": password})
def api_update_user(self, handler, user, uid):
if user["role"] != "admin":
return self.error(handler, "Tylko administrator może edytować konta", 403)
data = self.parse_json(handler)
allowed = []
params = []
for field in ["display_name", "role", "expires_at"]:
if field in data:
allowed.append(f"{field}=?")
params.append(data[field])
for field in ["can_create_folders", "can_upload", "can_tag", "active"]:
if field in data:
allowed.append(f"{field}=?")
params.append(1 if data[field] else 0)
if data.get("password"):
allowed.append("password_hash=?")
params.append(hash_password(data["password"]))
if not allowed:
return self.response_json(handler, {"ok": True})
allowed.append("updated_at=?")
params.append(now_iso())
params.append(uid)
with self.db() as con:
con.execute(f"UPDATE users SET {', '.join(allowed)} WHERE id=?", params)
self.log_event(con, user["id"], "update_user", "user", uid, "Zaktualizowano konto użytkownika", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_list_folders(self, handler, user):
with self.db() as con:
rows = con.execute("SELECT * FROM folders WHERE is_deleted=0 ORDER BY path COLLATE NOCASE").fetchall()
out = []
for r in rows:
perm = self.permission_value(con, user, r["id"])
if perm >= 1:
d = dict(r)
d["permission"] = VALUE_PERM.get(perm, "none")
d["can_manage"] = perm >= 3 or user["role"] == "admin"
d["can_upload"] = self.can_upload_to_folder(con, user, r["id"])
out.append(d)
return self.response_json(handler, {"ok": True, "folders": out})
def api_create_folder(self, handler, user):
data = self.parse_json(handler)
parent_id = data.get("parent_id")
name = self.sanitize_name(data.get("name", ""))
with self.db() as con:
if parent_id:
if not self.can_manage_folder(con, user, parent_id) and not (user["role"] == "user" and user["can_create_folders"] and self.can_view_folder(con, user, parent_id)):
return self.error(handler, "Brak uprawnień do tworzenia podkatalogu", 403)
else:
if user["role"] != "admin" and not (user["role"] == "user" and user["can_create_folders"]):
return self.error(handler, "Brak uprawnień do tworzenia katalogu", 403)
fid = make_id("fld_")
# temporary path; recomputed after insert
con.execute(
"INSERT INTO folders(id,parent_id,name,path,owner_id,created_by,inherit_permissions,created_at) VALUES(?,?,?,?,?,?,?,?)",
(fid, parent_id, name, "/" + name, user["id"], user["id"], 1 if data.get("inherit_permissions", True) else 0, now_iso()),
)
path = self.recompute_folder_path(con, fid)
# creator gets manage permission
con.execute(
"INSERT OR REPLACE INTO folder_permissions(id,folder_id,user_id,level,created_at) VALUES(COALESCE((SELECT id FROM folder_permissions WHERE folder_id=? AND user_id=?), ?),?,?,?,?)",
(fid, user["id"], make_id("fpe_"), fid, user["id"], "manage", now_iso()),
)
self.log_event(con, user["id"], "create_folder", "folder", fid, f"Utworzono katalog {path}", handler.client_address[0])
return self.response_json(handler, {"ok": True, "id": fid, "path": path})
def api_update_folder(self, handler, user, folder_id):
data = self.parse_json(handler)
with self.db() as con:
if not self.can_manage_folder(con, user, folder_id):
return self.error(handler, "Brak uprawnień do edycji katalogu", 403)
changes = []
params = []
if "name" in data:
changes.append("name=?")
params.append(self.sanitize_name(data["name"]))
if "inherit_permissions" in data:
changes.append("inherit_permissions=?")
params.append(1 if data["inherit_permissions"] else 0)
if "parent_id" in data:
parent_id = data["parent_id"] or None
if parent_id == folder_id:
return self.error(handler, "Katalog nie może być swoim rodzicem")
if parent_id and not self.can_manage_folder(con, user, parent_id):
return self.error(handler, "Brak uprawnień do katalogu docelowego", 403)
changes.append("parent_id=?")
params.append(parent_id)
if not changes:
return self.response_json(handler, {"ok": True})
changes.append("updated_at=?")
params.append(now_iso())
params.append(folder_id)
con.execute(f"UPDATE folders SET {', '.join(changes)} WHERE id=?", params)
path = self.recompute_folder_path(con, folder_id)
# Update descendants paths
for d in self.descendant_folder_ids(con, folder_id):
self.recompute_folder_path(con, d)
self.log_event(con, user["id"], "update_folder", "folder", folder_id, f"Zaktualizowano katalog {path}", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_delete_folder(self, handler, user, folder_id):
with self.db() as con:
if not self.can_manage_folder(con, user, folder_id):
return self.error(handler, "Brak uprawnień do usunięcia katalogu", 403)
con.execute("UPDATE folders SET is_deleted=1, updated_at=? WHERE id=?", (now_iso(), folder_id))
self.log_event(con, user["id"], "delete_folder", "folder", folder_id, "Usunięto katalog", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_set_folder_permission(self, handler, user):
if user["role"] != "admin":
return self.error(handler, "Tylko administrator może nadawać dostępy", 403)
data = self.parse_json(handler)
folder_id = data.get("folder_id")
user_id = data.get("user_id")
level = data.get("level", "view")
if level not in PERM_VALUE:
return self.error(handler, "Nieprawidłowy poziom uprawnień")
with self.db() as con:
if level == "none":
con.execute("DELETE FROM folder_permissions WHERE folder_id=? AND user_id=?", (folder_id, user_id))
else:
existing = con.execute("SELECT id FROM folder_permissions WHERE folder_id=? AND user_id=?", (folder_id, user_id)).fetchone()
if existing:
con.execute("UPDATE folder_permissions SET level=? WHERE id=?", (level, existing["id"]))
else:
con.execute("INSERT INTO folder_permissions(id,folder_id,user_id,level,created_at) VALUES(?,?,?,?,?)", (make_id("fpe_"), folder_id, user_id, level, now_iso()))
self.log_event(con, user["id"], "set_permission", "folder", folder_id, f"Nadano dostęp {level} użytkownikowi {user_id}", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_upload(self, handler, user):
fields, files = self.parse_multipart(handler)
folder_id = fields.get("folder_id")
tags = [t.strip() for t in (fields.get("tags") or "").split(",") if t.strip()]
taken_at = fields.get("taken_at") or None
if not folder_id:
return self.error(handler, "Brak katalogu")
if not files:
return self.error(handler, "Brak plików")
created = []
with self.db() as con:
if not self.can_upload_to_folder(con, user, folder_id):
return self.error(handler, "Brak uprawnień do wysyłania zdjęć w tym katalogu", 403)
status = "pending" if user["role"] == "guest" else "approved"
for f in files:
original = self.sanitize_name(os.path.basename(f["filename"] or "plik"))
ext = os.path.splitext(original)[1].lower()
is_video = 1 if ext in VIDEO_EXT or f["content_type"].startswith("video/") else 0
if not (ext in IMAGE_EXT or ext in VIDEO_EXT or f["content_type"].startswith("image/") or f["content_type"].startswith("video/")):
continue
content = f["data"]
sha = hashlib.sha256(content).hexdigest()
duplicate = con.execute("SELECT id FROM photos WHERE sha256=? AND size_bytes=? AND is_deleted=0", (sha, len(content))).fetchone()
if duplicate:
created.append({"id": duplicate["id"], "status": "already_exists"})
continue
pid = make_id("pho_")
subdir = "pending" if status == "pending" else "originals"
now = dt.datetime.utcnow()
rel_dir = f"{subdir}/{now.year:04d}/{now.month:02d}"
abs_dir = self.media_dir / rel_dir
abs_dir.mkdir(parents=True, exist_ok=True)
stored_name = f"{pid}{ext or '.bin'}"
abs_path = abs_dir / stored_name
with open(abs_path, "wb") as out:
out.write(content)
rel_path = f"{rel_dir}/{stored_name}"
mime_type = f["content_type"] or mimetypes.guess_type(original)[0] or "application/octet-stream"
con.execute(
"""
INSERT INTO photos(id,folder_id,file_name,original_name,stored_path,mime_type,size_bytes,sha256,is_video,taken_at,uploaded_at,status,submitted_by,approved_by,approved_at,created_at)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""",
(pid, folder_id, original, original, rel_path, mime_type, len(content), sha, is_video, taken_at, now_iso(), status, user["id"], user["id"] if status == "approved" else None, now_iso() if status == "approved" else None, now_iso()),
)
if tags and user["role"] != "guest":
self.assign_tags(con, pid, tags)
created.append({"id": pid, "status": status})
self.log_event(con, user["id"], "upload", "folder", folder_id, f"Wysłano {len(created)} plików. Status: {status}", handler.client_address[0])
return self.response_json(handler, {"ok": True, "items": created})
def api_list_photos(self, handler, user, query):
folder_id = query.get("folder_id", [None])[0]
include_sub = query.get("include_subfolders", ["1"])[0] != "0"
status = query.get("status", ["approved"])[0]
q = (query.get("q", [""])[0] or "").strip().lower()
tag = (query.get("tag", [""])[0] or "").strip().lower()
date_from = query.get("date_from", [""])[0] or ""
date_to = query.get("date_to", [""])[0] or ""
limit = min(int(query.get("limit", [120])[0] or 120), 500)
with self.db() as con:
if folder_id:
ids = self.descendant_folder_ids(con, folder_id) if include_sub else [folder_id]
ids = [i for i in ids if self.can_view_folder(con, user, i)]
else:
ids = self.visible_folder_ids(con, user)
if not ids:
return self.response_json(handler, {"ok": True, "photos": []})
ph = ",".join("?" for _ in ids)
where = [f"p.is_deleted=0", f"p.folder_id IN ({ph})"]
params = list(ids)
if status and status != "all":
where.append("p.status=?")
params.append(status)
else:
# Guests cannot see rejected photos of others; users see all only where visible.
if user["role"] == "guest":
where.append("(p.status='approved' OR p.submitted_by=?)")
params.append(user["id"])
if q:
where.append("(LOWER(p.file_name) LIKE ? OR LOWER(p.original_name) LIKE ? OR LOWER(f.path) LIKE ?)")
params += [f"%{q}%", f"%{q}%", f"%{q}%"]
if date_from:
where.append("COALESCE(p.taken_at,p.uploaded_at,p.created_at)>=?")
params.append(date_from)
if date_to:
where.append("COALESCE(p.taken_at,p.uploaded_at,p.created_at)<=?")
params.append(date_to + "T23:59:59Z" if len(date_to) == 10 else date_to)
if tag:
where.append("EXISTS (SELECT 1 FROM photo_tags pt JOIN tags t ON t.id=pt.tag_id WHERE pt.photo_id=p.id AND LOWER(t.name)=?)")
params.append(tag)
sql = f"""
SELECT p.*, f.path AS folder_path, u.display_name AS submitted_by_name
FROM photos p
JOIN folders f ON f.id=p.folder_id
JOIN users u ON u.id=p.submitted_by
WHERE {' AND '.join(where)}
ORDER BY COALESCE(p.taken_at,p.uploaded_at,p.created_at) DESC
LIMIT ?
"""
params.append(limit)
rows = con.execute(sql, params).fetchall()
photos = []
for r in rows:
d = dict(r)
d["tags"] = [dict(x) for x in con.execute("SELECT t.id,t.name,t.color FROM photo_tags pt JOIN tags t ON t.id=pt.tag_id WHERE pt.photo_id=? ORDER BY t.name", (r["id"],)).fetchall()]
d["url"] = self.base_path + f"/media/{r['id']}/original"
d["can_manage"] = self.can_manage_folder(con, user, r["folder_id"])
photos.append(d)
return self.response_json(handler, {"ok": True, "photos": photos})
def assign_tags(self, con, photo_id, names):
colors = ["#f472b6", "#a78bfa", "#60a5fa", "#34d399", "#f59e0b", "#fb7185", "#22c55e", "#38bdf8"]
for name in names:
clean = self.sanitize_name(name).lower()
if not clean:
continue
tag = con.execute("SELECT id FROM tags WHERE LOWER(name)=?", (clean,)).fetchone()
if tag:
tid = tag["id"]
else:
tid = make_id("tag_")
color = colors[abs(hash(clean)) % len(colors)]
con.execute("INSERT INTO tags(id,name,color,created_at) VALUES(?,?,?,?)", (tid, clean, color, now_iso()))
con.execute("INSERT OR IGNORE INTO photo_tags(photo_id,tag_id) VALUES(?,?)", (photo_id, tid))
def api_approve_photo(self, handler, user, photo_id):
data = self.parse_json(handler) if handler.headers.get("Content-Length") else {}
target_folder = data.get("folder_id")
tags = data.get("tags") or []
with self.db() as con:
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
if not p:
return self.error(handler, "Nie znaleziono zdjęcia", 404)
folder_id = target_folder or p["folder_id"]
if not self.can_manage_folder(con, user, p["folder_id"]) or not self.can_manage_folder(con, user, folder_id):
return self.error(handler, "Brak uprawnień do akceptacji", 403)
rel_path = p["stored_path"]
if rel_path.startswith("pending/"):
new_rel = rel_path.replace("pending/", "originals/", 1)
(self.media_dir / os.path.dirname(new_rel)).mkdir(parents=True, exist_ok=True)
try:
shutil.move(str(self.media_dir / rel_path), str(self.media_dir / new_rel))
rel_path = new_rel
except FileNotFoundError:
pass
con.execute("UPDATE photos SET status='approved', folder_id=?, stored_path=?, approved_by=?, approved_at=?, updated_at=? WHERE id=?", (folder_id, rel_path, user["id"], now_iso(), now_iso(), photo_id))
if tags:
self.assign_tags(con, photo_id, tags)
self.log_event(con, user["id"], "approve_photo", "photo", photo_id, "Zaakceptowano zdjęcie", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_reject_photo(self, handler, user, photo_id):
data = self.parse_json(handler) if handler.headers.get("Content-Length") else {}
reason = data.get("reason") or ""
with self.db() as con:
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
if not p:
return self.error(handler, "Nie znaleziono zdjęcia", 404)
if not self.can_manage_folder(con, user, p["folder_id"]):
return self.error(handler, "Brak uprawnień do odrzucenia", 403)
con.execute("UPDATE photos SET status='rejected', rejected_by=?, rejected_at=?, reject_reason=?, updated_at=? WHERE id=?", (user["id"], now_iso(), reason, now_iso(), photo_id))
self.log_event(con, user["id"], "reject_photo", "photo", photo_id, "Odrzucono zdjęcie", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_move_photo(self, handler, user, photo_id):
data = self.parse_json(handler)
target = data.get("folder_id")
if not target:
return self.error(handler, "Brak katalogu docelowego")
with self.db() as con:
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
if not p:
return self.error(handler, "Nie znaleziono zdjęcia", 404)
if not self.can_manage_folder(con, user, p["folder_id"]) or not self.can_manage_folder(con, user, target):
return self.error(handler, "Brak uprawnień do przeniesienia", 403)
con.execute("UPDATE photos SET folder_id=?, updated_at=? WHERE id=?", (target, now_iso(), photo_id))
self.log_event(con, user["id"], "move_photo", "photo", photo_id, f"Przeniesiono zdjęcie do katalogu {target}", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_set_photo_tags(self, handler, user, photo_id):
data = self.parse_json(handler)
names = data.get("tags") or []
with self.db() as con:
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
if not p:
return self.error(handler, "Nie znaleziono zdjęcia", 404)
if user["role"] == "guest" or not (user["role"] == "admin" or (user["can_tag"] and self.can_view_folder(con, user, p["folder_id"]))):
return self.error(handler, "Brak uprawnień do tagowania", 403)
con.execute("DELETE FROM photo_tags WHERE photo_id=?", (photo_id,))
self.assign_tags(con, photo_id, names)
self.log_event(con, user["id"], "tag_photo", "photo", photo_id, "Zmieniono tagi zdjęcia", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_delete_photo(self, handler, user, photo_id):
with self.db() as con:
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
if not p:
return self.error(handler, "Nie znaleziono zdjęcia", 404)
if not self.can_manage_folder(con, user, p["folder_id"]):
return self.error(handler, "Brak uprawnień do usunięcia", 403)
con.execute("UPDATE photos SET is_deleted=1, updated_at=? WHERE id=?", (now_iso(), photo_id))
self.log_event(con, user["id"], "delete_photo", "photo", photo_id, "Usunięto zdjęcie", handler.client_address[0])
return self.response_json(handler, {"ok": True})
def api_list_tags(self, handler, user):
with self.db() as con:
rows = con.execute("SELECT t.*, COUNT(pt.photo_id) AS count FROM tags t LEFT JOIN photo_tags pt ON pt.tag_id=t.id GROUP BY t.id ORDER BY t.name").fetchall()
return self.response_json(handler, {"ok": True, "tags": [dict(r) for r in rows]})
def api_create_tag(self, handler, user):
if user["role"] == "guest":
return self.error(handler, "Gość nie może tworzyć tagów", 403)
data = self.parse_json(handler)
name = self.sanitize_name(data.get("name", "")).lower()
color = data.get("color") or "#a78bfa"
tid = make_id("tag_")
with self.db() as con:
con.execute("INSERT OR IGNORE INTO tags(id,name,color,created_at) VALUES(?,?,?,?)", (tid, name, color, now_iso()))
return self.response_json(handler, {"ok": True})
def api_events(self, handler, user):
if user["role"] != "admin":
return self.error(handler, "Tylko administrator może widzieć dziennik", 403)
with self.db() as con:
rows = con.execute("SELECT e.*, u.display_name AS actor_name FROM events e LEFT JOIN users u ON u.id=e.actor_id ORDER BY e.created_at DESC LIMIT 200").fetchall()
return self.response_json(handler, {"ok": True, "events": [dict(r) for r in rows]})
def serve_media(self, handler, photo_id, variant="original"):
user = self.get_user_from_request(handler)
if not user:
handler.send_error(401)
return
with self.db() as con:
p = con.execute("SELECT * FROM photos WHERE id=? AND is_deleted=0", (photo_id,)).fetchone()
if not p:
handler.send_error(404)
return
if not self.can_view_folder(con, user, p["folder_id"]):
handler.send_error(403)
return
if p["status"] != "approved" and not (p["submitted_by"] == user["id"] or self.can_manage_folder(con, user, p["folder_id"])):
handler.send_error(403)
return
rel = p["stored_path"] if variant == "original" else (p[f"{variant}_path"] or p["stored_path"])
path = (self.media_dir / rel).resolve()
if not str(path).startswith(str(self.media_dir.resolve())) or not path.exists():
handler.send_error(404)
return
ctype = p["mime_type"] or mimetypes.guess_type(str(path))[0] or "application/octet-stream"
handler.send_response(200)
handler.send_header("Content-Type", ctype)
handler.send_header("Content-Length", str(path.stat().st_size))
handler.send_header("Cache-Control", "private, max-age=86400")
handler.end_headers()
with open(path, "rb") as f:
shutil.copyfileobj(f, handler.wfile)
def serve_static(self, handler, path):
if path == "/" or path == "":
path = "/index.html"
# strip base path when app is installed in subpath
rel = posixpath.normpath(urllib.parse.unquote(path)).lstrip("/")
if rel.startswith(".."):
handler.send_error(403)
return
file_path = (self.web_dir / rel).resolve()
if not str(file_path).startswith(str(self.web_dir)) or not file_path.exists() or file_path.is_dir():
file_path = self.web_dir / "index.html"
ctype = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
data = file_path.read_bytes()
handler.send_response(200)
handler.send_header("Content-Type", ctype)
handler.send_header("Content-Length", str(len(data)))
handler.send_header("Cache-Control", "no-cache" if file_path.name == "index.html" else "public, max-age=3600")
handler.end_headers()
handler.wfile.write(data)
def make_handler(app: AlbumikApp):
class Handler(BaseHTTPRequestHandler):
server_version = f"Albumik/{APP_VERSION}"
def do_GET(self):
self.route("GET")
def do_POST(self):
self.route("POST")
def do_PUT(self):
self.route("PUT")
def do_DELETE(self):
self.route("DELETE")
def route(self, method):
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
# If proxied on subpath, remove it for routing.
if app.base_path and path.startswith(app.base_path + "/"):
path = path[len(app.base_path):]
query = urllib.parse.parse_qs(parsed.query)
if path.startswith("/api/"):
return app.handle_api(self, method, path, query)
m = re.match(r"^/media/([^/]+)/(original|thumb|preview)$", path)
if m and method == "GET":
return app.serve_media(self, m.group(1), m.group(2))
return app.serve_static(self, path)
def log_message(self, fmt, *args):
sys.stdout.write("%s - - [%s] %s\n" % (self.client_address[0], self.log_date_time_string(), fmt % args))
sys.stdout.flush()
return Handler
def main():
config_path = os.environ.get("ALBUMIK_CONFIG") or (sys.argv[1] if len(sys.argv) > 1 else "./config.json")
app = AlbumikApp(config_path)
host, port_s = app.listen.rsplit(":", 1)
server = ThreadingHTTPServer((host, int(port_s)), make_handler(app))
print(f"Albumik {APP_VERSION} listening on http://{app.listen}", flush=True)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()