wither/wither.py

322 lines
8.9 KiB
Python
Raw Normal View History

2025-08-01 00:00:00 +00:00
#!/usr/bin/env python3
import argparse
2025-08-01 00:00:00 +00:00
import datetime
2025-08-01 00:00:00 +00:00
import json
2025-08-01 00:00:00 +00:00
import sys
import time
2025-08-01 00:00:00 +00:00
import httpx
2025-08-01 00:00:00 +00:00
from typing import Any, Optional, TypeAlias, Union
2025-08-01 00:00:00 +00:00
VERSION = "0.1"
2025-08-01 00:00:00 +00:00
DEBUG = True
DRYRUN = False
FLUSH = True
2025-08-01 00:00:00 +00:00
NOW = datetime.datetime.now(datetime.timezone.utc)
QVal: TypeAlias = Union[int, str, bool]
2025-08-01 00:00:00 +00:00
class Config(argparse.Namespace):
# run-time-only variables
token: str
error_count: int
2025-08-01 00:00:00 +00:00
first_error: Optional[str]
fatal_error: bool
2025-08-01 00:00:00 +00:00
config_path: str
# persiting variables
api_uri: str
user_id: str
min_id: str
delete_reblogs: bool
delete_private: bool
delete_direct: bool
preserve_max_age: int
preserve_contexts: list[str]
preserve_statuses: list[str]
cooldown_delete: int
cooldown_fetch: int
def __init__(self, ns: Any) -> None:
for k in Config.__annotations__.keys():
setattr(self, k, ns.__dict__[k])
2025-08-01 00:00:00 +00:00
@staticmethod
def ephemerals() -> set[str]:
2025-08-01 00:00:00 +00:00
return set(["token", "error_count", "fatal_error", "config_path"])
# Utils
def printerr(str: str) -> None:
print(str, file=sys.stderr, flush=FLUSH)
2025-08-01 00:00:00 +00:00
def printdbg(str: str) -> None:
if DEBUG:
print(str, flush=FLUSH)
2025-08-01 00:00:00 +00:00
def is_http_error(resp: httpx.Response) -> bool:
return resp.status_code >= 300 or resp.status_code < 200
def format_http_error(resp: httpx.Response) -> str:
return f" Response Code: {resp.status_code}\n Headers: {resp.headers}\n Body: {resp.content!r}"
def datediff(masto_date_str: str) -> float:
md = datetime.datetime.fromisoformat(masto_date_str)
return (NOW - md).total_seconds()
2025-08-01 00:00:00 +00:00
# Work part
2025-08-01 00:00:00 +00:00
def delete_status(client: httpx.Client, id: str) -> bool:
printdbg(f"Deleting {id}...")
if DRYRUN:
return True
resp = client.delete(f"/api/v1/statuses/{id}")
# TODO: handle rate-limit response
if is_http_error(resp):
printerr(f"Failed to delete status {id}!\n{format_http_error(resp)}")
return False
return True
def filter_statuses(config: Config, statuses: list[dict[str, Any]]) -> tuple[bool, str, list[str]]:
"""
Filters statuses according to config.
Returns a tuple with:
- whether to continue fetchig more pages after this
- min_id to continue after successfully processing everything
(either in the following next fetch or a future rerun)
(if not all deletion ids were successfully processed, use those instead for next run)
- ids of statuses in need of deletion, ordered as in the original API response
"""
new_min = None
del_ids = []
cont = True
for stat in statuses:
id = stat.get("id")
if id is None:
continue
#date = stat.get("edited_at") or stat.get("created_at")
date = stat.get("created_at")
is_reblog = stat.get("reblog") != None
visibility = stat.get("visibility")
pleroma = stat.get("pleroma")
context = None if pleroma is None else pleroma.get("context")
if isinstance(date, str) and datediff(date) < config.preserve_max_age:
cont = False
printdbg(f"Reached too new posts at {id}; finishing")
2025-08-01 00:00:00 +00:00
break
# Eventhough fetches with a min_id parameter return all posts immediately newer than this,
# the resulting, N oldest, items are sorted by youngest-first
if new_min is None:
new_min = id
2025-08-01 00:00:00 +00:00
if is_reblog and not config.delete_reblog:
printdbg(f"Keep {id} because reblog")
continue
if visibility == "private" and not config.delete_private:
printdbg(f"Keep {id} because private")
continue
if visibility == "direct" and not config.delete_direct:
printdbg(f"Keep {id} because direct")
continue
# XXX: Convert preserves to set early-on for faster lookups here
if id in config.preserve_statuses:
printdbg(f"Keep {id} because protected id")
continue
if context != None and context in config.preserve_contexts:
printdbg(f"Keep {id} because protected context ({context})")
continue
del_ids.append(id)
printdbg(f"Got {len(statuses)} in page, deleting {len(del_ids)}...")
if new_min is not None:
min_id = new_min
else:
min_id = config.min_id
2025-08-01 00:00:00 +00:00
cont = False
return cont, min_id, del_ids
def fetch_statuses(config: Config, client: httpx.Client) -> Optional[list[dict[str, Any]]]:
params: dict[str, QVal] = {"min_id": config.min_id, "limit": 80}
if not config.delete_reblogs:
params["exclude_reblogs"] = True
time.sleep(config.cooldown_fetch)
printdbg(f"Fetching batch of statuses using {params}")
resp = client.get(f"/api/v1/accounts/{config.user_id}/statuses", params=params)
if is_http_error(resp):
printerr(f"Fatal error on status fetch!\n{format_http_error(resp)}")
return None
# TODO: handle rate-limit response
try:
statuses = json.loads(resp.content)
if isinstance(statuses, list):
return statuses
else:
printerr("Received data format for account statuses is not a list; abort!")
return None
except (json.JSONDecodeError, UnicodeDecodeError) as e:
printerr(f"Failed to decode status response; abort!\n{e}")
return None
2025-08-01 00:00:00 +00:00
def process_next_page(config: Config, client: httpx.Client) -> tuple[Config, bool]:
2025-08-01 00:00:00 +00:00
stats = fetch_statuses(config, client)
if stats is None:
config.fatal_error = True
return config, True
cont, next_min, del_ids = filter_statuses(config, stats)
for id in del_ids:
time.sleep(config.cooldown_delete)
succ = delete_status(client, id)
if not succ:
config.error_count += 1
if config.first_error is None:
config.first_error = id
config.min_id = next_min
return config, cont
2025-08-01 00:00:00 +00:00
def purge(config: Config) -> Config:
client = httpx.Client(
headers={
"Authorization": config.token,
"User-Agent": f"wither (v{VERSION}) - cli tool deleting old fedi posts"
},
base_url=config.api_uri,
http2=True
)
2025-08-01 00:00:00 +00:00
try:
while True:
config, cont = process_next_page(config, client)
if not cont:
break
except KeyboardInterrupt:
# still save new min_id position on Ctrl+C
pass
finally:
client.close()
2025-08-01 00:00:00 +00:00
return config
# Config stuff
def prune_ephemeral_vars(config: Config) -> Config:
2025-08-01 00:00:00 +00:00
if config.first_error is not None:
config.min_id = config.first_error
2025-08-01 00:00:00 +00:00
del config.token
del config.error_count
del config.config_path
2025-08-01 00:00:00 +00:00
del config.first_error
del config.fatal_error
2025-08-01 00:00:00 +00:00
return config
def store_config(config: Config) -> None:
conf_path = config.config_path
config = prune_ephemeral_vars(config)
with open(conf_path, "w", encoding="utf-8") as f:
json.dump(config.__dict__, f, indent=2, ensure_ascii=False)
def assert_config(config: Config) -> None:
def empty(v: Any) -> bool:
return v is None or v == ""
if empty(config.token):
raise ValueError("Unset token!")
if empty(config.api_uri):
raise ValueError("Unset api uri!")
if empty(config.user_id):
raise ValueError("Unset user id!")
if config.delete_reblogs:
raise ValueError("Deleting/Unreblogging reblogs not yet implemented!")
2025-08-01 00:00:00 +00:00
def apply_stored_config(config: Config, newvals: dict[str, Any]) -> Config:
ephemeral = Config.ephemerals()
for k in Config.__annotations__.keys():
if k not in ephemeral and newvals.get(k) != None:
setattr(config, k, newvals[k])
return config
def load_config(config: Config) -> Config:
try:
with open(config.config_path, "r") as cf:
conf_stored = json.load(cf)
apply_stored_config(config, conf_stored)
except FileNotFoundError:
pass
return config
def parse_cli() -> Config:
parser = argparse.ArgumentParser(
prog="wither",
description="""
Deletes old fedi posts with additional cirteria.
State is stored in a config file to facilitate easy reruns in the future.
However, the access token is NEVER stored and always needs to be supplied explicitly.
Options specified in an existing config file take precedence over values specified on the command line,
thus it usually doesnt make sense to specify anything but token while using a file.
Instead edit the values in the config file directly.
"""
)
parser.add_argument("-t", "--token", type=str, required=True)
parser.add_argument("--api_uri", type=str)
parser.add_argument("--user_id", type=str)
parser.add_argument("--min_id", type=str, default="0")
parser.add_argument("--delete_reblogs", type=bool, default=True)
parser.add_argument("--delete_private", type=bool, default=True)
parser.add_argument("--delete_direct", type=bool, default=True)
parser.add_argument("--preserve_max_age", type=int, default=7776000)
parser.add_argument("--preserve_contexts", action="append", default=[])
parser.add_argument("--preserve_statuses", action="append", default=[])
parser.add_argument("--cooldown_delete", type=int, default=7)
parser.add_argument("--cooldown_fetch", type=int, default=20)
parser.add_argument("config_path", type=str)
ns = parser.parse_args()
ns.error_count = 0
ns.first_error = None
ns.fatal_error = False
config = Config(ns)
2025-08-01 00:00:00 +00:00
return load_config(config)
# Main
2025-08-01 00:00:00 +00:00
def run() -> int:
printdbg(f"Starting withering at {NOW}")
2025-08-01 00:00:00 +00:00
config = parse_cli()
assert_config(config)
config_new = purge(config)
printdbg(f"Concluded with {config_new.error_count} non-fatal errors and fatal_error={config_new.fatal_error}")
2025-08-01 00:00:00 +00:00
2025-08-01 00:00:00 +00:00
if config_new.fatal_error:
ec = 1
2025-08-01 00:00:00 +00:00
elif config_new.error_count > 0:
ec = 2
2025-08-01 00:00:00 +00:00
else:
ec = 0
store_config(config_new)
return ec
2025-08-01 00:00:00 +00:00
2025-08-01 00:00:00 +00:00
if __name__ == "__main__":
2025-08-01 00:00:00 +00:00
sys.exit(run())