wither/wither.py
Oneric 959f4b43d6 Mark deleting reblogs as broken
I didn’t test it, but this shouldn’t work atm
since reblogs are expected to be removed via
POST /api/v1/statuses/:id/unreblog
with the id being from the reblogged post, not
the id of the reblog itself
2025-08-01 00:00:00 +00:00

322 lines
8.9 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import datetime
import json
import sys
import time
import httpx
from typing import Any, Optional, TypeAlias, Union
VERSION = "0.1"
DEBUG = True
DRYRUN = False
FLUSH = True
NOW = datetime.datetime.now(datetime.timezone.utc)
QVal: TypeAlias = Union[int, str, bool]
class Config(argparse.Namespace):
# run-time-only variables
token: str
error_count: int
first_error: Optional[str]
fatal_error: bool
config_path: str
# persiting variables
api_uri: str
user_id: str
min_id: str
delete_reblogs: bool
delete_private: bool
delete_direct: bool
preserve_max_age: int
preserve_contexts: list[str]
preserve_statuses: list[str]
cooldown_delete: int
cooldown_fetch: int
def __init__(self, ns: Any) -> None:
for k in Config.__annotations__.keys():
setattr(self, k, ns.__dict__[k])
@staticmethod
def ephemerals() -> set[str]:
return set(["token", "error_count", "fatal_error", "config_path"])
# Utils
def printerr(str: str) -> None:
print(str, file=sys.stderr, flush=FLUSH)
def printdbg(str: str) -> None:
if DEBUG:
print(str, flush=FLUSH)
def is_http_error(resp: httpx.Response) -> bool:
return resp.status_code >= 300 or resp.status_code < 200
def format_http_error(resp: httpx.Response) -> str:
return f" Response Code: {resp.status_code}\n Headers: {resp.headers}\n Body: {resp.content!r}"
def datediff(masto_date_str: str) -> float:
md = datetime.datetime.fromisoformat(masto_date_str)
return (NOW - md).total_seconds()
# Work part
def delete_status(client: httpx.Client, id: str) -> bool:
printdbg(f"Deleting {id}...")
if DRYRUN:
return True
resp = client.delete(f"/api/v1/statuses/{id}")
# TODO: handle rate-limit response
if is_http_error(resp):
printerr(f"Failed to delete status {id}!\n{format_http_error(resp)}")
return False
return True
def filter_statuses(config: Config, statuses: list[dict[str, Any]]) -> tuple[bool, str, list[str]]:
"""
Filters statuses according to config.
Returns a tuple with:
- whether to continue fetchig more pages after this
- min_id to continue after successfully processing everything
(either in the following next fetch or a future rerun)
(if not all deletion ids were successfully processed, use those instead for next run)
- ids of statuses in need of deletion, ordered as in the original API response
"""
new_min = None
del_ids = []
cont = True
for stat in statuses:
id = stat.get("id")
if id is None:
continue
#date = stat.get("edited_at") or stat.get("created_at")
date = stat.get("created_at")
is_reblog = stat.get("reblog") != None
visibility = stat.get("visibility")
pleroma = stat.get("pleroma")
context = None if pleroma is None else pleroma.get("context")
if isinstance(date, str) and datediff(date) < config.preserve_max_age:
cont = False
printdbg(f"Reached too new posts at {id}; finishing")
break
# Eventhough fetches with a min_id parameter return all posts immediately newer than this,
# the resulting, N oldest, items are sorted by youngest-first
if new_min is None:
new_min = id
if is_reblog and not config.delete_reblog:
printdbg(f"Keep {id} because reblog")
continue
if visibility == "private" and not config.delete_private:
printdbg(f"Keep {id} because private")
continue
if visibility == "direct" and not config.delete_direct:
printdbg(f"Keep {id} because direct")
continue
# XXX: Convert preserves to set early-on for faster lookups here
if id in config.preserve_statuses:
printdbg(f"Keep {id} because protected id")
continue
if context != None and context in config.preserve_contexts:
printdbg(f"Keep {id} because protected context ({context})")
continue
del_ids.append(id)
printdbg(f"Got {len(statuses)} in page, deleting {len(del_ids)}...")
if new_min is not None:
min_id = new_min
else:
min_id = config.min_id
cont = False
return cont, min_id, del_ids
def fetch_statuses(config: Config, client: httpx.Client) -> Optional[list[dict[str, Any]]]:
params: dict[str, QVal] = {"min_id": config.min_id, "limit": 80}
if not config.delete_reblogs:
params["exclude_reblogs"] = True
time.sleep(config.cooldown_fetch)
printdbg(f"Fetching batch of statuses using {params}")
resp = client.get(f"/api/v1/accounts/{config.user_id}/statuses", params=params)
if is_http_error(resp):
printerr(f"Fatal error on status fetch!\n{format_http_error(resp)}")
return None
# TODO: handle rate-limit response
try:
statuses = json.loads(resp.content)
if isinstance(statuses, list):
return statuses
else:
printerr("Received data format for account statuses is not a list; abort!")
return None
except (json.JSONDecodeError, UnicodeDecodeError) as e:
printerr(f"Failed to decode status response; abort!\n{e}")
return None
def process_next_page(config: Config, client: httpx.Client) -> tuple[Config, bool]:
stats = fetch_statuses(config, client)
if stats is None:
config.fatal_error = True
return config, True
cont, next_min, del_ids = filter_statuses(config, stats)
for id in del_ids:
time.sleep(config.cooldown_delete)
succ = delete_status(client, id)
if not succ:
config.error_count += 1
if config.first_error is None:
config.first_error = id
config.min_id = next_min
return config, cont
def purge(config: Config) -> Config:
client = httpx.Client(
headers={
"Authorization": config.token,
"User-Agent": f"wither (v{VERSION}) - cli tool deleting old fedi posts"
},
base_url=config.api_uri,
http2=True
)
try:
while True:
config, cont = process_next_page(config, client)
if not cont:
break
except KeyboardInterrupt:
# still save new min_id position on Ctrl+C
pass
finally:
client.close()
return config
# Config stuff
def prune_ephemeral_vars(config: Config) -> Config:
if config.first_error is not None:
config.min_id = config.first_error
del config.token
del config.error_count
del config.config_path
del config.first_error
del config.fatal_error
return config
def store_config(config: Config) -> None:
conf_path = config.config_path
config = prune_ephemeral_vars(config)
with open(conf_path, "w", encoding="utf-8") as f:
json.dump(config.__dict__, f, indent=2, ensure_ascii=False)
def assert_config(config: Config) -> None:
def empty(v: Any) -> bool:
return v is None or v == ""
if empty(config.token):
raise ValueError("Unset token!")
if empty(config.api_uri):
raise ValueError("Unset api uri!")
if empty(config.user_id):
raise ValueError("Unset user id!")
if config.delete_reblogs:
raise ValueError("Deleting/Unreblogging reblogs not yet implemented!")
def apply_stored_config(config: Config, newvals: dict[str, Any]) -> Config:
ephemeral = Config.ephemerals()
for k in Config.__annotations__.keys():
if k not in ephemeral and newvals.get(k) != None:
setattr(config, k, newvals[k])
return config
def load_config(config: Config) -> Config:
try:
with open(config.config_path, "r") as cf:
conf_stored = json.load(cf)
apply_stored_config(config, conf_stored)
except FileNotFoundError:
pass
return config
def parse_cli() -> Config:
parser = argparse.ArgumentParser(
prog="wither",
description="""
Deletes old fedi posts with additional cirteria.
State is stored in a config file to facilitate easy reruns in the future.
However, the access token is NEVER stored and always needs to be supplied explicitly.
Options specified in an existing config file take precedence over values specified on the command line,
thus it usually doesnt make sense to specify anything but token while using a file.
Instead edit the values in the config file directly.
"""
)
parser.add_argument("-t", "--token", type=str, required=True)
parser.add_argument("--api_uri", type=str)
parser.add_argument("--user_id", type=str)
parser.add_argument("--min_id", type=str, default="0")
parser.add_argument("--delete_reblogs", type=bool, default=True)
parser.add_argument("--delete_private", type=bool, default=True)
parser.add_argument("--delete_direct", type=bool, default=True)
parser.add_argument("--preserve_max_age", type=int, default=7776000)
parser.add_argument("--preserve_contexts", action="append", default=[])
parser.add_argument("--preserve_statuses", action="append", default=[])
parser.add_argument("--cooldown_delete", type=int, default=7)
parser.add_argument("--cooldown_fetch", type=int, default=20)
parser.add_argument("config_path", type=str)
ns = parser.parse_args()
ns.error_count = 0
ns.first_error = None
ns.fatal_error = False
config = Config(ns)
return load_config(config)
# Main
def run() -> int:
printdbg(f"Starting withering at {NOW}")
config = parse_cli()
assert_config(config)
config_new = purge(config)
printdbg(f"Concluded with {config_new.error_count} non-fatal errors and fatal_error={config_new.fatal_error}")
if config_new.fatal_error:
ec = 1
elif config_new.error_count > 0:
ec = 2
else:
ec = 0
store_config(config_new)
return ec
if __name__ == "__main__":
sys.exit(run())