Public Example Source Code
import mimetypes
mimetypes.add_type('video/mp2t', '.ts')
mimetypes.add_type('application/vnd.apple.mpegurl', '.m3u8')
import os
import eventlet
eventlet.monkey_patch(socket=True, select=True, time=True, thread=False)
from flask import Flask, send_from_directory, jsonify
from flask_socketio import SocketIO, send
from datetime import datetime
from zoneinfo import ZoneInfo
import threading
import time
import json
import requests
import hid
# ==================== Flask + SocketIO ====================
app = Flask(__name__, static_url_path='', static_folder='.')
app.config['SECRET_KEY'] = 'YOUR_SECRET_KEY_HERE'
socketio = SocketIO(app, async_mode='eventlet', cors_allowed_origins='*')
# ==================== Paths ====================
CHAT_LOG = '/path/to/chat.log'
CHAT_HISTORY_LINES = 7
LAST_FEED_LOG = '/path/to/kaspa_incoming.log'
LAST_FEED_LINES = 4
IMG_DIR = "/path/to/img"
WATCHMYBIRDS_DIR = "/path/to/output"
VIEWS_LOG = '/path/to/views.log'
TX_FOLDER = "/path/to/txs"
SEEN_UTXO_FILE = "/path/to/seen_utxos.json"
PAID_AMOUNT_FILE = "/path/to/paid_amount.json"
TELEGRAM_STATUS_FILE = "/path/to/telegram_status.json"
# ==================== Telegram ====================
TELEGRAM_BOT_TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
CHAT_ID = -1000000000000
# ==================== Kaspa ====================
ADDRESS = "YOUR_KASPA_ADDRESS"
UTXO_API = f"https://api.kaspa.org/addresses/{ADDRESS}/utxos"
POLL_INTERVAL = 5
# ==================== Relay ====================
VENDOR_ID = 0x0000
PRODUCT_ID = 0x0000
def open_relay():
try:
device = hid.device()
device.open(VENDOR_ID, PRODUCT_ID)
print("Relay connected")
return device
except Exception as e:
print("Relay not found:", e)
return None
def relay_on(device):
if device:
device.write([0x00, 0xFF, 0x01, 0x01])
def relay_off(device):
if device:
device.write([0x00, 0xFD, 0x01])
device_relay = open_relay()
# ==================== Helpers ====================
def read_last_lines(file_path, n=5):
if not os.path.exists(file_path):
return []
with open(file_path, 'r', encoding='utf-8') as f:
lines = [l.strip() for l in f if l.strip()]
return lines[-n:]
def load_seen_utxos():
if os.path.exists(SEEN_UTXO_FILE):
with open(SEEN_UTXO_FILE, "r") as f:
return set(json.load(f))
return set()
def save_seen_utxos(s):
with open(SEEN_UTXO_FILE, "w") as f:
json.dump(sorted(list(s)), f)
# ---------- paid amount ----------
def load_paid_amount():
if os.path.exists(PAID_AMOUNT_FILE):
with open(PAID_AMOUNT_FILE, "r") as f:
return float(json.load(f).get("amount", 0))
return 0.0
def save_paid_amount(amount):
with open(PAID_AMOUNT_FILE, "w") as f:
json.dump({"amount": round(amount, 8)}, f)
# ---------- telegram status ----------
def save_telegram_status(link):
with open(TELEGRAM_STATUS_FILE, "w") as f:
json.dump({
"link": link,
"created": int(time.time())
}, f)
def load_telegram_status():
if not os.path.exists(TELEGRAM_STATUS_FILE):
return None
with open(TELEGRAM_STATUS_FILE, "r") as f:
data = json.load(f)
if time.time() - data["created"] > 60:
return None
return data["link"]
# ---------- create invite link ----------
def create_telegram_invite():
expire = int(time.time()) + 7 * 24 * 60 * 60
r = requests.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/createChatInviteLink",
json={"chat_id": CHAT_ID, "expire_date": expire},
timeout=10
).json()
if r.get("ok"):
return r["result"]["invite_link"]
print("Telegram API error:", r)
return None
# ==================== Page view counting ====================
def log_page_view():
today = datetime.now().strftime('%Y-%m-%d')
counts = {}
if os.path.exists(VIEWS_LOG):
with open(VIEWS_LOG, 'r', encoding='utf-8') as f:
for line in f:
parts = line.strip().split()
if len(parts) == 2:
counts[parts[0]] = int(parts[1])
counts[today] = counts.get(today, 0) + 1
with open(VIEWS_LOG, 'w', encoding='utf-8') as f:
for date, count in sorted(counts.items()):
f.write(f"{date} {count}\n")
# ==================== ROUTES ====================
@app.route('/')
def index():
log_page_view()
return send_from_directory('.', 'index.html')
@app.route('/gallery.html')
def gallery_page():
return send_from_directory('.', 'gallery.html')
@app.route('/news.html')
def news_page():
return send_from_directory('.', 'news.html')
@app.route('/watchmybirds/<path:filename>')
def watchmybirds_files(filename):
return send_from_directory(WATCHMYBIRDS_DIR, filename)
@app.route('/howitworks.html')
def howitworks_page():
return send_from_directory('.', 'howitworks.html')
@app.route('/photos')
def list_photos():
photos = []
if os.path.exists(WATCHMYBIRDS_DIR):
for date_dir in sorted(os.listdir(WATCHMYBIRDS_DIR), reverse=True):
full_date_dir = os.path.join(WATCHMYBIRDS_DIR, date_dir)
if not os.path.isdir(full_date_dir):
continue
for f in os.listdir(full_date_dir):
if f.lower().endswith('.jpg') and '_original' in f.lower():
full_path = os.path.join(full_date_dir, f)
sortkey = os.path.getmtime(full_path)
parts = f.split('_')
bird = (parts[2] + " " + parts[3]) if len(parts) >= 4 else f
photos.append({
"path": f"/watchmybirds/{date_dir}/{f}",
"sortkey": sortkey,
"bird": bird
})
photos.sort(key=lambda x: x["sortkey"], reverse=True)
filtered = []
last_seen = {}
for p in photos:
bird = p["bird"]
t = p["sortkey"]
if bird in last_seen and abs(t - last_seen[bird]) < 180:
continue
last_seen[bird] = t
filtered.append(p)
if len(filtered) >= 300:
break
return jsonify(filtered)
@app.route('/img/<path:filename>')
def img_files(filename):
return send_from_directory(IMG_DIR, filename)
@app.route('/telegram.html')
def telegram_page():
return send_from_directory(".", "telegram.html")
@app.route('/telegram-link')
def telegram_link():
link = load_telegram_status()
if link:
return jsonify({"link": link})
return jsonify({})
# ==================== SocketIO ====================
@socketio.on('connect')
def handle_connect():
for msg in read_last_lines(CHAT_LOG, CHAT_HISTORY_LINES):
send(msg)
socketio.emit('last_feed', read_last_lines(LAST_FEED_LOG, LAST_FEED_LINES))
@socketio.on("message")
def handle_message(msg):
try:
user = msg.get("user", "anon")
text = msg.get("text", "")
except:
user = "anon"
text = str(msg)
ts = datetime.now(ZoneInfo("Europe/Prague")).strftime("%Y-%m-%d %H:%M")
line = f"[{ts}] {user}: {text}"
print("💬", line)
with open(CHAT_LOG, "a", encoding="utf-8") as f:
f.write(line + "\n")
socketio.send(line)
# ==================== Kaspa watcher ====================
def kaspa_watcher():
seen = load_seen_utxos()
os.makedirs(TX_FOLDER, exist_ok=True)
while True:
try:
utxos = requests.get(UTXO_API, timeout=15).json()
for u in utxos:
out = u["outpoint"]
utxo_id = f"{out['transactionId']}:{out['index']}"
if utxo_id in seen:
continue
amount = int(u["utxoEntry"]["amount"])
kas = amount / 1e8
ts = datetime.now(ZoneInfo("Europe/Prague")).strftime("%Y-%m-%d %H:%M:%S")
with open(LAST_FEED_LOG, "a") as f:
f.write(f"{ts} | {utxo_id} | {kas:.8f} KAS\n")
total = load_paid_amount() + kas
save_paid_amount(total)
print(f"[{ts}] +{kas:.8f} KAS (total {total:.8f})")
if 4.5 <= kas <= 9.5:
link = create_telegram_invite()
if link:
save_telegram_status(link)
print("Telegram invite created (visible 60s)")
if kas >= 9.6:
print("Relay ON for 5 seconds")
relay_on(device_relay)
threading.Thread(
target=lambda: (time.sleep(5), relay_off(device_relay)),
daemon=True
).start()
seen.add(utxo_id)
save_seen_utxos(seen)
except Exception as e:
print("Kaspa error:", e)
time.sleep(POLL_INTERVAL)
threading.Thread(target=kaspa_watcher, daemon=True).start()
# ==================== START ====================
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000)