I didn’t test it, but this shouldn’t work atm since reblogs are expected to be removed via POST /api/v1/statuses/:id/unreblog with the id being from the reblogged post, not the id of the reblog itself
322 lines
8.9 KiB
Python
Executable file
322 lines
8.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
|
||
import argparse
|
||
import datetime
|
||
import json
|
||
import sys
|
||
import time
|
||
|
||
import httpx
|
||
|
||
from typing import Any, Optional, TypeAlias, Union
|
||
|
||
VERSION = "0.1"
|
||
DEBUG = True
|
||
DRYRUN = False
|
||
FLUSH = True
|
||
|
||
NOW = datetime.datetime.now(datetime.timezone.utc)
|
||
|
||
QVal: TypeAlias = Union[int, str, bool]
|
||
|
||
class Config(argparse.Namespace):
|
||
# run-time-only variables
|
||
token: str
|
||
error_count: int
|
||
first_error: Optional[str]
|
||
fatal_error: bool
|
||
config_path: str
|
||
# persiting variables
|
||
api_uri: str
|
||
user_id: str
|
||
min_id: str
|
||
delete_reblogs: bool
|
||
delete_private: bool
|
||
delete_direct: bool
|
||
preserve_max_age: int
|
||
preserve_contexts: list[str]
|
||
preserve_statuses: list[str]
|
||
cooldown_delete: int
|
||
cooldown_fetch: int
|
||
|
||
def __init__(self, ns: Any) -> None:
|
||
for k in Config.__annotations__.keys():
|
||
setattr(self, k, ns.__dict__[k])
|
||
|
||
@staticmethod
|
||
def ephemerals() -> set[str]:
|
||
return set(["token", "error_count", "fatal_error", "config_path"])
|
||
|
||
# Utils
|
||
def printerr(str: str) -> None:
|
||
print(str, file=sys.stderr, flush=FLUSH)
|
||
|
||
def printdbg(str: str) -> None:
|
||
if DEBUG:
|
||
print(str, flush=FLUSH)
|
||
|
||
def is_http_error(resp: httpx.Response) -> bool:
|
||
return resp.status_code >= 300 or resp.status_code < 200
|
||
|
||
def format_http_error(resp: httpx.Response) -> str:
|
||
return f" Response Code: {resp.status_code}\n Headers: {resp.headers}\n Body: {resp.content!r}"
|
||
|
||
def datediff(masto_date_str: str) -> float:
|
||
md = datetime.datetime.fromisoformat(masto_date_str)
|
||
return (NOW - md).total_seconds()
|
||
|
||
# Work part
|
||
def delete_status(client: httpx.Client, id: str) -> bool:
|
||
printdbg(f"Deleting {id}...")
|
||
|
||
if DRYRUN:
|
||
return True
|
||
|
||
resp = client.delete(f"/api/v1/statuses/{id}")
|
||
|
||
# TODO: handle rate-limit response
|
||
|
||
if is_http_error(resp):
|
||
printerr(f"Failed to delete status {id}!\n{format_http_error(resp)}")
|
||
return False
|
||
|
||
return True
|
||
|
||
def filter_statuses(config: Config, statuses: list[dict[str, Any]]) -> tuple[bool, str, list[str]]:
|
||
"""
|
||
Filters statuses according to config.
|
||
Returns a tuple with:
|
||
- whether to continue fetchig more pages after this
|
||
- min_id to continue after successfully processing everything
|
||
(either in the following next fetch or a future rerun)
|
||
(if not all deletion ids were successfully processed, use those instead for next run)
|
||
- ids of statuses in need of deletion, ordered as in the original API response
|
||
"""
|
||
new_min = None
|
||
del_ids = []
|
||
cont = True
|
||
|
||
for stat in statuses:
|
||
id = stat.get("id")
|
||
if id is None:
|
||
continue
|
||
|
||
#date = stat.get("edited_at") or stat.get("created_at")
|
||
date = stat.get("created_at")
|
||
is_reblog = stat.get("reblog") != None
|
||
visibility = stat.get("visibility")
|
||
pleroma = stat.get("pleroma")
|
||
context = None if pleroma is None else pleroma.get("context")
|
||
|
||
if isinstance(date, str) and datediff(date) < config.preserve_max_age:
|
||
cont = False
|
||
printdbg(f"Reached too new posts at {id}; finishing")
|
||
break
|
||
|
||
# Eventhough fetches with a min_id parameter return all posts immediately newer than this,
|
||
# the resulting, N oldest, items are sorted by youngest-first
|
||
if new_min is None:
|
||
new_min = id
|
||
|
||
if is_reblog and not config.delete_reblog:
|
||
printdbg(f"Keep {id} because reblog")
|
||
continue
|
||
if visibility == "private" and not config.delete_private:
|
||
printdbg(f"Keep {id} because private")
|
||
continue
|
||
if visibility == "direct" and not config.delete_direct:
|
||
printdbg(f"Keep {id} because direct")
|
||
continue
|
||
# XXX: Convert preserves to set early-on for faster lookups here
|
||
if id in config.preserve_statuses:
|
||
printdbg(f"Keep {id} because protected id")
|
||
continue
|
||
if context != None and context in config.preserve_contexts:
|
||
printdbg(f"Keep {id} because protected context ({context})")
|
||
continue
|
||
|
||
del_ids.append(id)
|
||
|
||
printdbg(f"Got {len(statuses)} in page, deleting {len(del_ids)}...")
|
||
|
||
if new_min is not None:
|
||
min_id = new_min
|
||
else:
|
||
min_id = config.min_id
|
||
cont = False
|
||
|
||
return cont, min_id, del_ids
|
||
|
||
def fetch_statuses(config: Config, client: httpx.Client) -> Optional[list[dict[str, Any]]]:
|
||
params: dict[str, QVal] = {"min_id": config.min_id, "limit": 80}
|
||
if not config.delete_reblogs:
|
||
params["exclude_reblogs"] = True
|
||
|
||
time.sleep(config.cooldown_fetch)
|
||
printdbg(f"Fetching batch of statuses using {params}")
|
||
|
||
resp = client.get(f"/api/v1/accounts/{config.user_id}/statuses", params=params)
|
||
if is_http_error(resp):
|
||
printerr(f"Fatal error on status fetch!\n{format_http_error(resp)}")
|
||
return None
|
||
|
||
# TODO: handle rate-limit response
|
||
|
||
try:
|
||
statuses = json.loads(resp.content)
|
||
if isinstance(statuses, list):
|
||
return statuses
|
||
else:
|
||
printerr("Received data format for account statuses is not a list; abort!")
|
||
return None
|
||
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
||
printerr(f"Failed to decode status response; abort!\n{e}")
|
||
return None
|
||
|
||
def process_next_page(config: Config, client: httpx.Client) -> tuple[Config, bool]:
|
||
stats = fetch_statuses(config, client)
|
||
if stats is None:
|
||
config.fatal_error = True
|
||
return config, True
|
||
|
||
cont, next_min, del_ids = filter_statuses(config, stats)
|
||
|
||
for id in del_ids:
|
||
time.sleep(config.cooldown_delete)
|
||
succ = delete_status(client, id)
|
||
if not succ:
|
||
config.error_count += 1
|
||
if config.first_error is None:
|
||
config.first_error = id
|
||
|
||
config.min_id = next_min
|
||
return config, cont
|
||
|
||
def purge(config: Config) -> Config:
|
||
client = httpx.Client(
|
||
headers={
|
||
"Authorization": config.token,
|
||
"User-Agent": f"wither (v{VERSION}) - cli tool deleting old fedi posts"
|
||
},
|
||
base_url=config.api_uri,
|
||
http2=True
|
||
)
|
||
|
||
try:
|
||
while True:
|
||
config, cont = process_next_page(config, client)
|
||
if not cont:
|
||
break
|
||
except KeyboardInterrupt:
|
||
# still save new min_id position on Ctrl+C
|
||
pass
|
||
finally:
|
||
client.close()
|
||
|
||
return config
|
||
|
||
# Config stuff
|
||
def prune_ephemeral_vars(config: Config) -> Config:
|
||
if config.first_error is not None:
|
||
config.min_id = config.first_error
|
||
|
||
del config.token
|
||
del config.error_count
|
||
del config.config_path
|
||
del config.first_error
|
||
del config.fatal_error
|
||
|
||
return config
|
||
|
||
def store_config(config: Config) -> None:
|
||
conf_path = config.config_path
|
||
config = prune_ephemeral_vars(config)
|
||
|
||
with open(conf_path, "w", encoding="utf-8") as f:
|
||
json.dump(config.__dict__, f, indent=2, ensure_ascii=False)
|
||
|
||
def assert_config(config: Config) -> None:
|
||
def empty(v: Any) -> bool:
|
||
return v is None or v == ""
|
||
|
||
if empty(config.token):
|
||
raise ValueError("Unset token!")
|
||
if empty(config.api_uri):
|
||
raise ValueError("Unset api uri!")
|
||
if empty(config.user_id):
|
||
raise ValueError("Unset user id!")
|
||
|
||
if config.delete_reblogs:
|
||
raise ValueError("Deleting/Unreblogging reblogs not yet implemented!")
|
||
|
||
def apply_stored_config(config: Config, newvals: dict[str, Any]) -> Config:
|
||
ephemeral = Config.ephemerals()
|
||
for k in Config.__annotations__.keys():
|
||
if k not in ephemeral and newvals.get(k) != None:
|
||
setattr(config, k, newvals[k])
|
||
return config
|
||
|
||
def load_config(config: Config) -> Config:
|
||
try:
|
||
with open(config.config_path, "r") as cf:
|
||
conf_stored = json.load(cf)
|
||
apply_stored_config(config, conf_stored)
|
||
except FileNotFoundError:
|
||
pass
|
||
return config
|
||
|
||
def parse_cli() -> Config:
|
||
parser = argparse.ArgumentParser(
|
||
prog="wither",
|
||
description="""
|
||
Deletes old fedi posts with additional cirteria.
|
||
State is stored in a config file to facilitate easy reruns in the future.
|
||
However, the access token is NEVER stored and always needs to be supplied explicitly.
|
||
Options specified in an existing config file take precedence over values specified on the command line,
|
||
thus it usually doesn’t make sense to specify anything but token while using a file.
|
||
Instead edit the values in the config file directly.
|
||
"""
|
||
)
|
||
parser.add_argument("-t", "--token", type=str, required=True)
|
||
parser.add_argument("--api_uri", type=str)
|
||
parser.add_argument("--user_id", type=str)
|
||
parser.add_argument("--min_id", type=str, default="0")
|
||
parser.add_argument("--delete_reblogs", type=bool, default=True)
|
||
parser.add_argument("--delete_private", type=bool, default=True)
|
||
parser.add_argument("--delete_direct", type=bool, default=True)
|
||
parser.add_argument("--preserve_max_age", type=int, default=7776000)
|
||
parser.add_argument("--preserve_contexts", action="append", default=[])
|
||
parser.add_argument("--preserve_statuses", action="append", default=[])
|
||
parser.add_argument("--cooldown_delete", type=int, default=7)
|
||
parser.add_argument("--cooldown_fetch", type=int, default=20)
|
||
parser.add_argument("config_path", type=str)
|
||
ns = parser.parse_args()
|
||
ns.error_count = 0
|
||
ns.first_error = None
|
||
ns.fatal_error = False
|
||
|
||
config = Config(ns)
|
||
|
||
return load_config(config)
|
||
|
||
# Main
|
||
def run() -> int:
|
||
printdbg(f"Starting withering at {NOW}")
|
||
config = parse_cli()
|
||
assert_config(config)
|
||
config_new = purge(config)
|
||
printdbg(f"Concluded with {config_new.error_count} non-fatal errors and fatal_error={config_new.fatal_error}")
|
||
|
||
if config_new.fatal_error:
|
||
ec = 1
|
||
elif config_new.error_count > 0:
|
||
ec = 2
|
||
else:
|
||
ec = 0
|
||
|
||
store_config(config_new)
|
||
return ec
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(run())
|