Creating Modules
This guide walks through creating a custom detection module.
Prerequisites
- Understanding of the batch format
- HTTP server capability in your language of choice
- Access to submit findings via the callback API
Step 1: Create HTTP Server
Your module needs two endpoints:
# Example in Python (Flask)
from flask import Flask, request
import gzip
import json
app = Flask(__name__)
@app.route('/health')
def health():
return {'status': 'ok'}
@app.route('/ingest', methods=['POST'])
def ingest():
batch_id = request.headers.get('X-Batch-Id')
server_id = request.headers.get('X-Server-Id')
session_id = request.headers.get('X-Session-Id')
# Decompress and parse batch
data = gzip.decompress(request.data)
packets = [json.loads(line) for line in data.decode().strip().split('\n')]
# Process packets (async recommended)
process_batch_async(batch_id, server_id, session_id, packets)
return {'ok': True}
if __name__ == '__main__':
app.run(port=8080)Step 2: Implement Detection Logic
Group packets by player and analyze:
def process_batch_async(batch_id, server_id, session_id, packets):
# Group by player
by_player = {}
for packet in packets:
if packet['type'] == '_BATCH_META':
continue
uuid = packet['player']['uuid']
if uuid not in by_player:
by_player[uuid] = []
by_player[uuid].append(packet)
# Analyze each player
findings = []
for uuid, player_packets in by_player.items():
player_findings = analyze_player(uuid, player_packets)
findings.extend(player_findings)
# Submit findings
if findings:
submit_findings(batch_id, server_id, session_id, findings)Step 3: Analyze Packets
Example speed check:
def analyze_player(uuid, packets):
findings = []
# Filter movement packets
movements = [p for p in packets if p['type'] in [
'PLAYER_POSITION',
'PLAYER_POSITION_AND_ROTATION'
]]
# Check for speed violations
for i in range(1, len(movements)):
prev = movements[i-1]
curr = movements[i]
dx = curr['data']['x'] - prev['data']['x']
dz = curr['data']['z'] - prev['data']['z']
distance = (dx**2 + dz**2) ** 0.5
# Maximum vanilla sprint speed is ~0.28 blocks/tick
# With 50ms between packets, max is ~0.56
if distance > 0.7 and curr['data'].get('on_ground', True):
# Determine severity based on how egregious the violation is
if distance > 2.0:
severity = 'critical'
elif distance > 1.2:
severity = 'high'
elif distance > 0.9:
severity = 'medium'
else:
severity = 'low'
findings.append({
'player_uuid': uuid,
'detector_name': 'speed_ground',
'detector_version': '1.0.0',
'severity': severity,
'title': f'Speed violation: {distance:.2f} blocks/tick',
'description': 'Player moving faster than allowed threshold on ground',
'evidence_json': {
'distance': distance,
'expected_max': 0.56
}
})
return findingsStep 4: Submit Findings
Send findings to the API callback:
import requests
def submit_findings(batch_id, server_id, session_id, findings):
response = requests.post(
'https://api.asyncanticheat.com/callbacks/findings',
headers={
'Authorization': f'Bearer {MODULE_CALLBACK_TOKEN}',
'Content-Type': 'application/json'
},
json={
'server_id': server_id,
'session_id': session_id,
'batch_id': batch_id,
'findings': findings
}
)
response.raise_for_status()Step 5: Register Module
Register your module with the API:
curl -X POST "https://api.asyncanticheat.com/servers/my-server/modules" \
-H "Authorization: Bearer $INGEST_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "my-speed-check",
"base_url": "https://my-module.example.com",
"enabled": true,
"transform": "raw_ndjson_gz"
}'Best Practices
Performance
- Process batches asynchronously
- Use streaming JSON parsers for large batches
- Batch multiple findings in one callback
Accuracy
- Account for network latency and timing variations
- Use violation levels (VL) that accumulate over time
- Test against known legitimate players
Reliability
- Implement health checks properly
- Handle malformed packets gracefully
- Log errors for debugging
Example: Complete Speed Module
from flask import Flask, request
import gzip
import json
import requests
import threading
import os
app = Flask(__name__)
MODULE_CALLBACK_TOKEN = os.environ['MODULE_CALLBACK_TOKEN']
API_URL = os.environ.get('API_URL', 'https://api.asyncanticheat.com')
@app.route('/health')
def health():
return {'status': 'ok'}
@app.route('/ingest', methods=['POST'])
def ingest():
batch_id = request.headers.get('X-Batch-Id')
server_id = request.headers.get('X-Server-Id')
session_id = request.headers.get('X-Session-Id')
data = gzip.decompress(request.data)
# Process in background
thread = threading.Thread(
target=process_batch,
args=(batch_id, server_id, session_id, data)
)
thread.start()
return {'ok': True}
def process_batch(batch_id, server_id, session_id, data):
packets = [json.loads(line) for line in data.decode().strip().split('\n')]
# Group by player
by_player = {}
for p in packets:
if p['type'] == '_BATCH_META':
continue
uuid = p['player']['uuid']
by_player.setdefault(uuid, []).append(p)
# Analyze
findings = []
for uuid, player_packets in by_player.items():
findings.extend(check_speed(uuid, player_packets))
# Submit
if findings:
requests.post(
f'{API_URL}/callbacks/findings',
headers={
'Authorization': f'Bearer {MODULE_CALLBACK_TOKEN}',
'Content-Type': 'application/json'
},
json={
'batch_id': batch_id,
'server_id': server_id,
'findings': findings
}
)
def check_speed(uuid, packets):
findings = []
movements = [p for p in packets if 'POSITION' in p['type']]
for i in range(1, len(movements)):
prev, curr = movements[i-1], movements[i]
dx = curr['data']['x'] - prev['data']['x']
dz = curr['data']['z'] - prev['data']['z']
dist = (dx**2 + dz**2) ** 0.5
if dist > 0.7 and curr['data'].get('on_ground', True):
# Determine severity based on violation magnitude
severity = 'critical' if dist > 2.0 else 'high' if dist > 1.2 else 'medium' if dist > 0.9 else 'low'
findings.append({
'player_uuid': uuid,
'detector_name': 'speed_ground',
'detector_version': '1.0.0',
'severity': severity,
'title': f'Speed: {dist:.2f} b/tick',
'description': 'Movement speed exceeded threshold',
'evidence_json': {'distance': dist}
})
return findings
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)Last updated on