Skip to content

API Integration Patterns

This guide covers common integration patterns for building on the MailShield API.

Common Integration Use Cases

Use CasePatternComplexity
Security dashboardPolling + cachingMedium
Automated alertingPolling + comparisonMedium
Multi-tenant managementBulk operationsHigh
CI/CD integrationOn-demand checksLow
Compliance reportingScheduled exportMedium

Building a Monitoring Dashboard

Architecture

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│  MailShield API │────►│  Your Backend   │────►│  Dashboard UI   │
└─────────────────┘     └─────────────────┘     └─────────────────┘


                        ┌─────────────────┐
                        │     Cache       │
                        │  (Redis/Memory) │
                        └─────────────────┘

Implementation

python
import os
import json
import redis
import requests
from datetime import datetime, timedelta

API_BASE = "https://app.mailshield.app/api/v1"
API_TOKEN = os.environ["MAILSHIELD_API_TOKEN"]
CACHE_TTL = 300  # 5 minutes

redis_client = redis.Redis(host='localhost', port=6379, db=0)

headers = {
    "Authorization": f"Bearer {API_TOKEN}",
    "Content-Type": "application/json"
}

def get_cached_or_fetch(cache_key, fetch_func, ttl=CACHE_TTL):
    """Get from cache or fetch from API."""
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    data = fetch_func()
    redis_client.setex(cache_key, ttl, json.dumps(data))
    return data

def fetch_all_domains():
    """Fetch all domains with scores."""
    response = requests.get(f"{API_BASE}/domains", headers=headers)
    response.raise_for_status()
    domains = response.json()["data"]

    # Enrich with scores
    for domain in domains:
        score_resp = requests.get(
            f"{API_BASE}/domains/{domain['id']}/score",
            headers=headers
        )
        if score_resp.ok:
            domain["score"] = score_resp.json()["data"]

    return domains

def get_dashboard_data():
    """Get data for dashboard display."""
    return get_cached_or_fetch("dashboard:domains", fetch_all_domains)

Dashboard Metrics to Display

MetricAPI SourceUpdate Frequency
Domain count/domainsOn change
Average score/domains/{id}/scoreEvery 5-15 min
DMARC pass rate/domains/{id}/reports/dmarcEvery 15-60 min
Active alerts/domains/{id}/alertsEvery 1-5 min
Recent DNS changes/domains/{id}/checksOn demand

Automated Alerting Integrations

Alert Detection Pattern

python
class AlertMonitor:
    def __init__(self):
        self.previous_states = {}

    def check_for_changes(self):
        """Compare current state to previous and alert on changes."""
        alerts = []

        domains = fetch_all_domains()
        for domain in domains:
            domain_id = domain["id"]
            current_score = domain.get("score", {}).get("score")

            if domain_id in self.previous_states:
                previous_score = self.previous_states[domain_id]

                # Alert on significant score drop
                if previous_score and current_score:
                    if previous_score - current_score >= 10:
                        alerts.append({
                            "type": "score_drop",
                            "domain": domain["domain"],
                            "previous": previous_score,
                            "current": current_score
                        })

            self.previous_states[domain_id] = current_score

        return alerts

    def check_dmarc_failures(self, domain_id, threshold=0.05):
        """Alert if DMARC failure rate exceeds threshold."""
        response = requests.get(
            f"{API_BASE}/domains/{domain_id}/reports/dmarc",
            headers=headers,
            params={"startDate": (datetime.now() - timedelta(days=1)).isoformat() + "Z"}
        )

        if not response.ok:
            return None

        data = response.json()["data"]
        summary = data.get("summary", {})
        total = summary.get("totalMessages", 0)
        pass_rate = summary.get("passRate", 100)

        if total > 100 and pass_rate < (100 - threshold * 100):
            return {
                "type": "high_failure_rate",
                "total_messages": total,
                "pass_rate": pass_rate,
                "failure_rate": 100 - pass_rate
            }

        return None

Slack Integration

python
import requests

SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]

def send_slack_alert(alert):
    """Send alert to Slack channel."""
    if alert["type"] == "score_drop":
        message = {
            "text": f"⚠️ Security Score Drop",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*{alert['domain']}* score dropped from "
                                f"*{alert['previous']}* to *{alert['current']}*"
                    }
                },
                {
                    "type": "actions",
                    "elements": [
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "View in MailShield"},
                            "url": f"https://app.mailshield.app/domains/{alert['domain']}"
                        }
                    ]
                }
            ]
        }
    elif alert["type"] == "high_failure_rate":
        message = {
            "text": f"🚨 High DMARC Failure Rate",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"DMARC failure rate is *{alert['failure_rate']:.1f}%* "
                                f"({alert['total_messages']} messages)"
                    }
                }
            ]
        }
    else:
        return

    requests.post(SLACK_WEBHOOK_URL, json=message)

Microsoft Teams Integration

python
TEAMS_WEBHOOK_URL = os.environ["TEAMS_WEBHOOK_URL"]

def send_teams_alert(alert):
    """Send alert to Microsoft Teams channel."""
    if alert["type"] == "score_drop":
        card = {
            "@type": "MessageCard",
            "@context": "http://schema.org/extensions",
            "themeColor": "FF9800",
            "summary": f"Security Score Drop: {alert['domain']}",
            "sections": [{
                "activityTitle": f"⚠️ Score Drop: {alert['domain']}",
                "facts": [
                    {"name": "Previous Score", "value": str(alert['previous'])},
                    {"name": "Current Score", "value": str(alert['current'])},
                    {"name": "Change", "value": f"-{alert['previous'] - alert['current']}"}
                ]
            }],
            "potentialAction": [{
                "@type": "OpenUri",
                "name": "View in MailShield",
                "targets": [{"os": "default", "uri": f"https://app.mailshield.app"}]
            }]
        }

        requests.post(TEAMS_WEBHOOK_URL, json=card)

Pagination and Large Dataset Handling

Paginated Fetching

python
def fetch_all_paginated(endpoint, params=None):
    """Fetch all pages of a paginated endpoint."""
    all_data = []
    params = params or {}
    params["limit"] = 100  # Max page size
    params["offset"] = 0

    while True:
        response = requests.get(
            f"{API_BASE}{endpoint}",
            headers=headers,
            params=params
        )
        response.raise_for_status()
        result = response.json()

        data = result["data"]
        all_data.extend(data)

        # Check if there are more pages
        pagination = result.get("pagination", {})
        total = pagination.get("total", len(data))

        if params["offset"] + len(data) >= total:
            break

        params["offset"] += len(data)

    return all_data

Streaming Large Reports

For very large datasets, process in chunks:

python
def process_reports_in_chunks(domain_id, start_date, chunk_days=7):
    """Process DMARC reports in time-based chunks."""
    current_start = start_date
    end_date = datetime.now()

    while current_start < end_date:
        chunk_end = min(current_start + timedelta(days=chunk_days), end_date)

        response = requests.get(
            f"{API_BASE}/domains/{domain_id}/reports/dmarc",
            headers=headers,
            params={
                "startDate": current_start.isoformat() + "Z",
                "endDate": chunk_end.isoformat() + "Z"
            }
        )

        if response.ok:
            yield response.json()["data"]

        current_start = chunk_end

Error Handling and Retry Strategies

Robust API Client

python
import time
from functools import wraps

class APIError(Exception):
    def __init__(self, status_code, message):
        self.status_code = status_code
        self.message = message
        super().__init__(f"API Error {status_code}: {message}")

def retry_with_backoff(max_retries=3, base_delay=1):
    """Decorator for retry with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except APIError as e:
                    last_exception = e

                    # Don't retry client errors (4xx) except rate limits
                    if 400 <= e.status_code < 500 and e.status_code != 429:
                        raise

                    # Exponential backoff
                    delay = base_delay * (2 ** attempt)
                    time.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

@retry_with_backoff(max_retries=3)
def api_request(method, endpoint, **kwargs):
    """Make API request with retry logic."""
    url = f"{API_BASE}{endpoint}"
    response = requests.request(method, url, headers=headers, **kwargs)

    if not response.ok:
        error_data = response.json().get("error", {})
        raise APIError(
            response.status_code,
            error_data.get("message", "Unknown error")
        )

    return response.json()["data"]

Rate Limit Handling

python
class RateLimitedClient:
    def __init__(self, requests_per_minute=100):
        self.requests_per_minute = requests_per_minute
        self.request_times = []

    def _wait_if_needed(self):
        """Wait if we're approaching rate limit."""
        now = time.time()
        # Remove requests older than 1 minute
        self.request_times = [t for t in self.request_times if now - t < 60]

        if len(self.request_times) >= self.requests_per_minute:
            # Wait until oldest request is >1 minute old
            sleep_time = 60 - (now - self.request_times[0])
            if sleep_time > 0:
                time.sleep(sleep_time)

    def request(self, method, endpoint, **kwargs):
        """Make rate-limited request."""
        self._wait_if_needed()
        self.request_times.append(time.time())
        return api_request(method, endpoint, **kwargs)

Example: Custom Reporting Pipeline

Weekly Security Report Generator

python
from datetime import datetime, timedelta
from jinja2 import Template

REPORT_TEMPLATE = """
# Weekly Email Security Report
Generated: {{ generated_at }}
Period: {{ start_date }} to {{ end_date }}

## Summary
- Domains Monitored: {{ domains | length }}
- Average Score: {{ average_score | round(1) }}
- Domains at Risk (Score < 70): {{ at_risk_count }}

## Domain Details

| Domain | Score | Grade | DMARC Policy | Pass Rate |
|--------|-------|-------|--------------|-----------|
{% for d in domains %}
| {{ d.domain }} | {{ d.score or 'N/A' }} | {{ d.grade or 'N/A' }} | {{ d.dmarc_policy or 'none' }} | {{ d.pass_rate or 'N/A' }}% |
{% endfor %}

## Recommendations

{% for rec in recommendations %}
- {{ rec }}
{% endfor %}
"""

def generate_weekly_report():
    """Generate weekly security report."""
    end_date = datetime.now()
    start_date = end_date - timedelta(days=7)

    # Fetch all domains with details
    domains = fetch_all_domains()

    # Enrich with DMARC data
    for domain in domains:
        try:
            reports = api_request(
                "GET",
                f"/domains/{domain['id']}/reports/dmarc",
                params={"startDate": start_date.isoformat() + "Z"}
            )
            summary = reports.get("summary", {})
            domain["pass_rate"] = summary.get("passRate")
        except:
            domain["pass_rate"] = None

        # Extract score details
        score_data = domain.get("score", {})
        domain["score"] = score_data.get("score")
        domain["grade"] = score_data.get("grade")

    # Calculate metrics
    scores = [d["score"] for d in domains if d["score"]]
    average_score = sum(scores) / len(scores) if scores else 0
    at_risk_count = len([d for d in domains if d["score"] and d["score"] < 70])

    # Generate recommendations
    recommendations = []
    for d in domains:
        if d["score"] and d["score"] < 70:
            recommendations.append(f"{d['domain']}: Review DNS configuration")
        if d.get("dmarc_policy") == "none":
            recommendations.append(f"{d['domain']}: Move toward DMARC enforcement")

    # Render report
    template = Template(REPORT_TEMPLATE)
    report = template.render(
        generated_at=datetime.now().isoformat(),
        start_date=start_date.strftime("%Y-%m-%d"),
        end_date=end_date.strftime("%Y-%m-%d"),
        domains=domains,
        average_score=average_score,
        at_risk_count=at_risk_count,
        recommendations=recommendations[:10]  # Top 10
    )

    return report

Scheduled Execution

python
# Using APScheduler
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job('cron', day_of_week='mon', hour=9)
def weekly_report_job():
    """Run every Monday at 9 AM."""
    report = generate_weekly_report()
    # Send via email, save to file, upload to S3, etc.
    send_report_email(report)

# For continuous monitoring
@scheduler.scheduled_job('interval', minutes=5)
def check_alerts_job():
    """Check for alerts every 5 minutes."""
    monitor = AlertMonitor()
    alerts = monitor.check_for_changes()
    for alert in alerts:
        send_slack_alert(alert)

scheduler.start()

Next Steps

Monitor and secure your email domains.