API Integration Patterns
This guide covers common integration patterns for building on the MailShield API.
Common Integration Use Cases
| Use Case | Pattern | Complexity |
|---|---|---|
| Security dashboard | Polling + caching | Medium |
| Automated alerting | Polling + comparison | Medium |
| Multi-tenant management | Bulk operations | High |
| CI/CD integration | On-demand checks | Low |
| Compliance reporting | Scheduled export | Medium |
Building a Monitoring Dashboard
Architecture
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ MailShield API │────►│ Your Backend │────►│ Dashboard UI │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Cache │
│ (Redis/Memory) │
└─────────────────┘Implementation
python
import os
import json
import redis
import requests
from datetime import datetime, timedelta
API_BASE = "https://app.mailshield.app/api/v1"
API_TOKEN = os.environ["MAILSHIELD_API_TOKEN"]
CACHE_TTL = 300 # 5 minutes
redis_client = redis.Redis(host='localhost', port=6379, db=0)
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
def get_cached_or_fetch(cache_key, fetch_func, ttl=CACHE_TTL):
"""Get from cache or fetch from API."""
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
data = fetch_func()
redis_client.setex(cache_key, ttl, json.dumps(data))
return data
def fetch_all_domains():
"""Fetch all domains with scores."""
response = requests.get(f"{API_BASE}/domains", headers=headers)
response.raise_for_status()
domains = response.json()["data"]
# Enrich with scores
for domain in domains:
score_resp = requests.get(
f"{API_BASE}/domains/{domain['id']}/score",
headers=headers
)
if score_resp.ok:
domain["score"] = score_resp.json()["data"]
return domains
def get_dashboard_data():
"""Get data for dashboard display."""
return get_cached_or_fetch("dashboard:domains", fetch_all_domains)Dashboard Metrics to Display
| Metric | API Source | Update Frequency |
|---|---|---|
| Domain count | /domains | On change |
| Average score | /domains/{id}/score | Every 5-15 min |
| DMARC pass rate | /domains/{id}/reports/dmarc | Every 15-60 min |
| Active alerts | /domains/{id}/alerts | Every 1-5 min |
| Recent DNS changes | /domains/{id}/checks | On demand |
Automated Alerting Integrations
Alert Detection Pattern
python
class AlertMonitor:
def __init__(self):
self.previous_states = {}
def check_for_changes(self):
"""Compare current state to previous and alert on changes."""
alerts = []
domains = fetch_all_domains()
for domain in domains:
domain_id = domain["id"]
current_score = domain.get("score", {}).get("score")
if domain_id in self.previous_states:
previous_score = self.previous_states[domain_id]
# Alert on significant score drop
if previous_score and current_score:
if previous_score - current_score >= 10:
alerts.append({
"type": "score_drop",
"domain": domain["domain"],
"previous": previous_score,
"current": current_score
})
self.previous_states[domain_id] = current_score
return alerts
def check_dmarc_failures(self, domain_id, threshold=0.05):
"""Alert if DMARC failure rate exceeds threshold."""
response = requests.get(
f"{API_BASE}/domains/{domain_id}/reports/dmarc",
headers=headers,
params={"startDate": (datetime.now() - timedelta(days=1)).isoformat() + "Z"}
)
if not response.ok:
return None
data = response.json()["data"]
summary = data.get("summary", {})
total = summary.get("totalMessages", 0)
pass_rate = summary.get("passRate", 100)
if total > 100 and pass_rate < (100 - threshold * 100):
return {
"type": "high_failure_rate",
"total_messages": total,
"pass_rate": pass_rate,
"failure_rate": 100 - pass_rate
}
return NoneSlack Integration
python
import requests
SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]
def send_slack_alert(alert):
"""Send alert to Slack channel."""
if alert["type"] == "score_drop":
message = {
"text": f"⚠️ Security Score Drop",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{alert['domain']}* score dropped from "
f"*{alert['previous']}* to *{alert['current']}*"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "View in MailShield"},
"url": f"https://app.mailshield.app/domains/{alert['domain']}"
}
]
}
]
}
elif alert["type"] == "high_failure_rate":
message = {
"text": f"🚨 High DMARC Failure Rate",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"DMARC failure rate is *{alert['failure_rate']:.1f}%* "
f"({alert['total_messages']} messages)"
}
}
]
}
else:
return
requests.post(SLACK_WEBHOOK_URL, json=message)Microsoft Teams Integration
python
TEAMS_WEBHOOK_URL = os.environ["TEAMS_WEBHOOK_URL"]
def send_teams_alert(alert):
"""Send alert to Microsoft Teams channel."""
if alert["type"] == "score_drop":
card = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"themeColor": "FF9800",
"summary": f"Security Score Drop: {alert['domain']}",
"sections": [{
"activityTitle": f"⚠️ Score Drop: {alert['domain']}",
"facts": [
{"name": "Previous Score", "value": str(alert['previous'])},
{"name": "Current Score", "value": str(alert['current'])},
{"name": "Change", "value": f"-{alert['previous'] - alert['current']}"}
]
}],
"potentialAction": [{
"@type": "OpenUri",
"name": "View in MailShield",
"targets": [{"os": "default", "uri": f"https://app.mailshield.app"}]
}]
}
requests.post(TEAMS_WEBHOOK_URL, json=card)Pagination and Large Dataset Handling
Paginated Fetching
python
def fetch_all_paginated(endpoint, params=None):
"""Fetch all pages of a paginated endpoint."""
all_data = []
params = params or {}
params["limit"] = 100 # Max page size
params["offset"] = 0
while True:
response = requests.get(
f"{API_BASE}{endpoint}",
headers=headers,
params=params
)
response.raise_for_status()
result = response.json()
data = result["data"]
all_data.extend(data)
# Check if there are more pages
pagination = result.get("pagination", {})
total = pagination.get("total", len(data))
if params["offset"] + len(data) >= total:
break
params["offset"] += len(data)
return all_dataStreaming Large Reports
For very large datasets, process in chunks:
python
def process_reports_in_chunks(domain_id, start_date, chunk_days=7):
"""Process DMARC reports in time-based chunks."""
current_start = start_date
end_date = datetime.now()
while current_start < end_date:
chunk_end = min(current_start + timedelta(days=chunk_days), end_date)
response = requests.get(
f"{API_BASE}/domains/{domain_id}/reports/dmarc",
headers=headers,
params={
"startDate": current_start.isoformat() + "Z",
"endDate": chunk_end.isoformat() + "Z"
}
)
if response.ok:
yield response.json()["data"]
current_start = chunk_endError Handling and Retry Strategies
Robust API Client
python
import time
from functools import wraps
class APIError(Exception):
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
super().__init__(f"API Error {status_code}: {message}")
def retry_with_backoff(max_retries=3, base_delay=1):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except APIError as e:
last_exception = e
# Don't retry client errors (4xx) except rate limits
if 400 <= e.status_code < 500 and e.status_code != 429:
raise
# Exponential backoff
delay = base_delay * (2 ** attempt)
time.sleep(delay)
raise last_exception
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def api_request(method, endpoint, **kwargs):
"""Make API request with retry logic."""
url = f"{API_BASE}{endpoint}"
response = requests.request(method, url, headers=headers, **kwargs)
if not response.ok:
error_data = response.json().get("error", {})
raise APIError(
response.status_code,
error_data.get("message", "Unknown error")
)
return response.json()["data"]Rate Limit Handling
python
class RateLimitedClient:
def __init__(self, requests_per_minute=100):
self.requests_per_minute = requests_per_minute
self.request_times = []
def _wait_if_needed(self):
"""Wait if we're approaching rate limit."""
now = time.time()
# Remove requests older than 1 minute
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.requests_per_minute:
# Wait until oldest request is >1 minute old
sleep_time = 60 - (now - self.request_times[0])
if sleep_time > 0:
time.sleep(sleep_time)
def request(self, method, endpoint, **kwargs):
"""Make rate-limited request."""
self._wait_if_needed()
self.request_times.append(time.time())
return api_request(method, endpoint, **kwargs)Example: Custom Reporting Pipeline
Weekly Security Report Generator
python
from datetime import datetime, timedelta
from jinja2 import Template
REPORT_TEMPLATE = """
# Weekly Email Security Report
Generated: {{ generated_at }}
Period: {{ start_date }} to {{ end_date }}
## Summary
- Domains Monitored: {{ domains | length }}
- Average Score: {{ average_score | round(1) }}
- Domains at Risk (Score < 70): {{ at_risk_count }}
## Domain Details
| Domain | Score | Grade | DMARC Policy | Pass Rate |
|--------|-------|-------|--------------|-----------|
{% for d in domains %}
| {{ d.domain }} | {{ d.score or 'N/A' }} | {{ d.grade or 'N/A' }} | {{ d.dmarc_policy or 'none' }} | {{ d.pass_rate or 'N/A' }}% |
{% endfor %}
## Recommendations
{% for rec in recommendations %}
- {{ rec }}
{% endfor %}
"""
def generate_weekly_report():
"""Generate weekly security report."""
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
# Fetch all domains with details
domains = fetch_all_domains()
# Enrich with DMARC data
for domain in domains:
try:
reports = api_request(
"GET",
f"/domains/{domain['id']}/reports/dmarc",
params={"startDate": start_date.isoformat() + "Z"}
)
summary = reports.get("summary", {})
domain["pass_rate"] = summary.get("passRate")
except:
domain["pass_rate"] = None
# Extract score details
score_data = domain.get("score", {})
domain["score"] = score_data.get("score")
domain["grade"] = score_data.get("grade")
# Calculate metrics
scores = [d["score"] for d in domains if d["score"]]
average_score = sum(scores) / len(scores) if scores else 0
at_risk_count = len([d for d in domains if d["score"] and d["score"] < 70])
# Generate recommendations
recommendations = []
for d in domains:
if d["score"] and d["score"] < 70:
recommendations.append(f"{d['domain']}: Review DNS configuration")
if d.get("dmarc_policy") == "none":
recommendations.append(f"{d['domain']}: Move toward DMARC enforcement")
# Render report
template = Template(REPORT_TEMPLATE)
report = template.render(
generated_at=datetime.now().isoformat(),
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
domains=domains,
average_score=average_score,
at_risk_count=at_risk_count,
recommendations=recommendations[:10] # Top 10
)
return reportScheduled Execution
python
# Using APScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', day_of_week='mon', hour=9)
def weekly_report_job():
"""Run every Monday at 9 AM."""
report = generate_weekly_report()
# Send via email, save to file, upload to S3, etc.
send_report_email(report)
# For continuous monitoring
@scheduler.scheduled_job('interval', minutes=5)
def check_alerts_job():
"""Check for alerts every 5 minutes."""
monitor = AlertMonitor()
alerts = monitor.check_for_changes()
for alert in alerts:
send_slack_alert(alert)
scheduler.start()Next Steps
- Code Examples - Ready-to-use code snippets
- API Authentication - Setup and token management
- Rate Limiting - Understanding limits
- API Errors - Error handling reference