init
This commit is contained in:
96
porkbun-refresher.py
Normal file
96
porkbun-refresher.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
import http.client
|
||||||
|
from types import MappingProxyType
|
||||||
|
import typing as t
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Need to define the PRIMARY_DOMAIN, otherwise it will fail
|
||||||
|
PRIMARY_DOMAIN: t.Final = os.environ["PRIMARY_DOMAIN"]
|
||||||
|
PORKBUN_DOMAIN: t.Final = os.environ.get("PORKBUN_DOMAIN" , "api.porkbun.com")
|
||||||
|
REFERENTIAL_DOMAIN: t.Final = os.environ.get("REFERENTIAL_DOMAIN", "ifconfig.me")
|
||||||
|
PORKBUN_DNS_PREFIX: t.Final = os.environ.get("PORKBUN_DNS_PREFIX", "/api/json/v3/dns")
|
||||||
|
SECRET_FILE: t.Final = os.environ.get("SECRET_FILE", "/run/agenix/porkbun")
|
||||||
|
|
||||||
|
with open(SECRET_FILE) as f:
|
||||||
|
KEYS: t.Final = MappingProxyType(dict(x.strip().split("=") for x in f.readlines()))
|
||||||
|
|
||||||
|
|
||||||
|
def get_current_ip(domain: str):
|
||||||
|
conn = http.client.HTTPSConnection(domain)
|
||||||
|
conn.request(method="GET", url="/ip")
|
||||||
|
res = conn.getresponse()
|
||||||
|
data = res.read().decode()
|
||||||
|
if res.status != 200:
|
||||||
|
raise RuntimeError(f"couldn't get ip addr: {res.status}: {res.reason}")
|
||||||
|
conn.close()
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
class PorkbunRecord(t.TypedDict):
|
||||||
|
id: str
|
||||||
|
name: str
|
||||||
|
type: str
|
||||||
|
content: str
|
||||||
|
ttl: str
|
||||||
|
prio: str
|
||||||
|
notes: str
|
||||||
|
|
||||||
|
|
||||||
|
class PorkbunResponse(t.TypedDict):
|
||||||
|
status: str
|
||||||
|
cloudflare: t.Literal["enabled", "disabled"]
|
||||||
|
records: list[PorkbunRecord]
|
||||||
|
|
||||||
|
|
||||||
|
def get_porkbun_record(domain: str) -> PorkbunResponse:
|
||||||
|
body = KEYS.copy()
|
||||||
|
conn = http.client.HTTPSConnection(PORKBUN_DOMAIN)
|
||||||
|
conn.request(
|
||||||
|
method="POST",
|
||||||
|
url=f"{PORKBUN_DNS_PREFIX}/retrieveByNameType/{domain}/A/",
|
||||||
|
body=json.dumps(body),
|
||||||
|
)
|
||||||
|
res = conn.getresponse()
|
||||||
|
data = res.read().decode()
|
||||||
|
if res.status != 200:
|
||||||
|
raise RuntimeError(f"couldn't get ip addr: {res.status}: {res.reason}")
|
||||||
|
conn.close()
|
||||||
|
return json.loads(data)
|
||||||
|
|
||||||
|
|
||||||
|
def set_porkbun_record(domain: str, ip_addr: str, id: str):
|
||||||
|
body = KEYS.copy()
|
||||||
|
body.update(type="A", content=ip_addr)
|
||||||
|
conn = http.client.HTTPSConnection(PORKBUN_DOMAIN)
|
||||||
|
conn.request(
|
||||||
|
method="POST",
|
||||||
|
url=f"{PORKBUN_DNS_PREFIX}/edit/{domain}/{id}",
|
||||||
|
body=json.dumps(body),
|
||||||
|
)
|
||||||
|
res = conn.getresponse()
|
||||||
|
data = res.read().decode()
|
||||||
|
if res.status != 200:
|
||||||
|
raise RuntimeError(f"couldn't set ip addr: {res.status}: {res.reason}")
|
||||||
|
conn.close()
|
||||||
|
return json.loads(data)
|
||||||
|
|
||||||
|
|
||||||
|
def main(domain: str):
|
||||||
|
current_ip = get_current_ip(REFERENTIAL_DOMAIN)
|
||||||
|
|
||||||
|
if (porkbun_record := get_porkbun_record(domain=domain))["status"] != "SUCCESS" or len(
|
||||||
|
porkbun_record["records"]
|
||||||
|
) != 1:
|
||||||
|
raise RuntimeError("Something went very wrong")
|
||||||
|
|
||||||
|
if (porkbun_ip := porkbun_record["records"][0]["content"]) == current_ip:
|
||||||
|
print("the ip hasn't changed. Nothing left to be done...")
|
||||||
|
else:
|
||||||
|
print(f"Current IP: {current_ip} Porkbun IP: {porkbun_ip}. Updating record...")
|
||||||
|
id = porkbun_record["records"][0]["id"]
|
||||||
|
set_porkbun_record(domain=domain, ip_addr=current_ip, id=id)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main(PRIMARY_DOMAIN)
|
||||||
|
|
||||||
Reference in New Issue
Block a user