Integrating with External Systems
Overview
OpenCHS provides multiple integration methods for external systems to interact with the platform, including REST APIs, webhooks, and data import/export capabilities. This document guides developers on integrating their systems with OpenCHS.
Integration Methods
1. REST API Integration
Direct API access for real-time operations
2. Webhook Events
Push notifications for system events
3. Data Import/Export
Bulk data transfer via CSV/JSON
4. Database Integration
Direct database access (read-only recommended)
REST API Integration
Authentication
All API requests require authentication using session-based auth or API keys.
Session-Based (for web applications):
# Step 1: Request OTP
curl -X POST https://your-domain.com/helpline/api/sendOTP \
-H "Content-Type: application/json" \
-d '{"addr_addr": "user@example.com", "addr_type": "email"}'
# Step 2: Verify OTP
curl -X POST https://your-domain.com/helpline/api/verifyOTP \
-H "Content-Type: application/json" \
-d '{"addr_addr": "user@example.com", "otp": "123456"}'
# Response includes HELPLINE_SESSION_ID cookieAPI Key (for system integrations):
# Request with API key header
curl -X GET https://your-domain.com/helpline/api/cases \
-H "X-API-Key: your-api-key-here"Common Integration Patterns
Pattern 1: Case Creation from External System
External CRM or ticketing system creating cases in OpenCHS:
// Node.js example
const axios = require('axios');
class OpenCHSIntegration {
constructor(baseUrl, apiKey) {
this.client = axios.create({
baseURL: baseUrl,
headers: {
'X-API-Key': apiKey,
'Content-Type': 'application/json'
}
});
}
async createCase(caseData) {
try {
const response = await this.client.post('/helpline/api/cases', {
title: caseData.title,
description: caseData.description,
category: caseData.category,
priority: caseData.priority,
reporter_phone: caseData.reporterPhone,
reporter_email: caseData.reporterEmail
});
return {
success: true,
caseId: response.data.id,
caseNumber: response.data.case_number
};
} catch (error) {
console.error('Failed to create case:', error.message);
return {
success: false,
error: error.message
};
}
}
async getCaseStatus(caseId) {
const response = await this.client.get(`/helpline/api/cases/${caseId}`);
return response.data;
}
async updateCase(caseId, updates) {
const response = await this.client.put(
`/helpline/api/cases/${caseId}`,
updates
);
return response.data;
}
}
// Usage
const openchs = new OpenCHSIntegration(
'https://helpline.example.com',
'your-api-key'
);
// Create case from your system
const result = await openchs.createCase({
title: 'Child safety concern from hotline',
description: 'Caller reported concern about child welfare',
category: 'abuse',
priority: 'high',
reporterPhone: '+254700123456',
reporterEmail: 'reporter@example.com'
});
console.log('Created case:', result.caseNumber);Pattern 2: Syncing Cases to External System
Pull cases from OpenCHS to your system:
# Python example
import requests
from datetime import datetime, timedelta
class OpenCHSSync:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({'X-API-Key': api_key})
def get_cases_since(self, since_date):
"""Get all cases created/updated since a specific date"""
response = self.session.get(
f'{self.base_url}/helpline/api/cases',
params={
'updated_since': since_date.isoformat(),
'limit': 100
}
)
response.raise_for_status()
return response.json()
def sync_to_external_system(self, external_api):
"""Sync recent cases to external CRM"""
# Get cases from last hour
since_date = datetime.now() - timedelta(hours=1)
cases = self.get_cases_since(since_date)
for case in cases:
# Transform to external format
external_case = {
'external_id': case['case_number'],
'title': case['title'],
'status': case['status'],
'priority': case['priority'],
'created_at': case['created_at']
}
# Create or update in external system
external_api.upsert_case(external_case)
# Usage
sync = OpenCHSSync('https://helpline.example.com', 'your-api-key')
sync.sync_to_external_system(your_crm_api)Pattern 3: Real-Time Status Updates
Monitor case status changes:
import time
class CaseMonitor:
def __init__(self, openchs_client):
self.client = openchs_client
self.last_check = datetime.now()
def poll_for_updates(self, callback):
"""Poll for case updates every minute"""
while True:
try:
# Get cases updated since last check
cases = self.client.get_cases_since(self.last_check)
for case in cases:
# Trigger callback for each update
callback(case)
self.last_check = datetime.now()
time.sleep(60) # Poll every minute
except Exception as e:
print(f"Error polling updates: {e}")
time.sleep(60)
# Usage
def handle_case_update(case):
print(f"Case {case['case_number']} updated to {case['status']}")
# Send notification, update dashboard, etc.
monitor = CaseMonitor(openchs)
monitor.poll_for_updates(handle_case_update)AI Service Integration
Processing Audio Files
External systems can submit audio files for AI processing:
import requests
class AIServiceClient:
def __init__(self, ai_service_url):
self.base_url = ai_service_url
def transcribe_and_analyze(self, audio_file_path, language='sw'):
"""Submit audio for complete AI pipeline"""
with open(audio_file_path, 'rb') as audio_file:
response = requests.post(
f'{self.base_url}/audio/process',
files={'audio': audio_file},
data={
'language': language,
'include_translation': 'true',
'include_classification': 'true'
}
)
return response.json()
def check_processing_status(self, task_id):
"""Check status of async processing"""
response = requests.get(
f'{self.base_url}/audio/task/{task_id}'
)
return response.json()
# Usage
ai_client = AIServiceClient('http://localhost:8123')
# Process audio file
result = ai_client.transcribe_and_analyze('call_recording.wav', 'sw')
print(f"Transcript: {result['transcript']}")
print(f"Translation: {result['translation']}")
print(f"Risk Level: {result['classification']['priority']}")Streaming Real-Time Results
import requests
def process_with_streaming(audio_path):
"""Get real-time updates as audio is processed"""
with open(audio_path, 'rb') as audio:
response = requests.post(
'http://localhost:8123/audio/process-stream',
files={'audio': audio},
data={'language': 'sw'},
stream=True
)
for line in response.iter_lines():
if line:
event = json.loads(line)
if event['type'] == 'transcription_progress':
print(f"Transcription: {event['progress']}%")
elif event['type'] == 'transcription_complete':
print(f"Text: {event['data']['text']}")
elif event['type'] == 'classification_complete':
print(f"Risk: {event['data']['risk_level']}")Data Import/Export
CSV Import
Import cases from CSV file:
<?php
// import_cases.php
require_once 'vendor/autoload.php';
class CaseImporter {
private $db;
public function importFromCSV($filepath) {
$file = fopen($filepath, 'r');
// Skip header row
fgetcsv($file);
$imported = 0;
$errors = [];
while (($row = fgetcsv($file)) !== FALSE) {
try {
$this->createCase([
'case_number' => $row[0],
'title' => $row[1],
'description' => $row[2],
'category' => $row[3],
'priority' => $row[4],
'reporter_phone' => $row[5]
]);
$imported++;
} catch (Exception $e) {
$errors[] = "Row {$row[0]}: {$e->getMessage()}";
}
}
fclose($file);
return [
'imported' => $imported,
'errors' => $errors
];
}
private function createCase($data) {
// Validation
if (empty($data['title'])) {
throw new Exception('Title is required');
}
// Insert into database
$sql = "INSERT INTO kase (case_number, title, description, category, priority, reporter_phone, created_at)
VALUES (?, ?, ?, ?, ?, ?, NOW())";
$stmt = $this->db->prepare($sql);
$stmt->execute([
$data['case_number'],
$data['title'],
$data['description'],
$data['category'],
$data['priority'],
$data['reporter_phone']
]);
}
}
// Usage
$importer = new CaseImporter($db);
$result = $importer->importFromCSV('cases.csv');
echo "Imported: {$result['imported']} cases\n";
if (!empty($result['errors'])) {
echo "Errors:\n";
foreach ($result['errors'] as $error) {
echo " - $error\n";
}
}
?>JSON Export
Export cases to JSON:
<?php
// export_cases.php
class CaseExporter {
private $db;
public function exportToJSON($filters = []) {
$cases = $this->getCases($filters);
$json = json_encode($cases, JSON_PRETTY_PRINT);
// Save to file
$filename = 'cases_export_' . date('Y-m-d_His') . '.json';
file_put_contents($filename, $json);
return $filename;
}
private function getCases($filters) {
$sql = "SELECT k.*, u.username as assigned_to
FROM kase k
LEFT JOIN auth u ON k.assigned_user_id = u.id
WHERE 1=1";
$params = [];
if (!empty($filters['status'])) {
$sql .= " AND k.status = ?";
$params[] = $filters['status'];
}
if (!empty($filters['created_after'])) {
$sql .= " AND k.created_at >= ?";
$params[] = $filters['created_after'];
}
$stmt = $this->db->prepare($sql);
$stmt->execute($params);
return $stmt->fetchAll(PDO::FETCH_ASSOC);
}
}
// Usage
$exporter = new CaseExporter($db);
$filename = $exporter->exportToJSON([
'status' => 'resolved',
'created_after' => '2025-01-01'
]);
echo "Exported to: $filename\n";
?>Database Integration (Read-Only)
For analytics and reporting, external systems can access the database directly:
Read-Only User Setup
-- Create read-only user for external system
CREATE USER 'external_readonly'@'%' IDENTIFIED BY 'secure_password';
-- Grant SELECT only on specific tables
GRANT SELECT ON helpline.kase TO 'external_readonly'@'%';
GRANT SELECT ON helpline.contact TO 'external_readonly'@'%';
GRANT SELECT ON helpline.kase_activity TO 'external_readonly'@'%';
GRANT SELECT ON helpline.fact_* TO 'external_readonly'@'%';
GRANT SELECT ON helpline.mv_* TO 'external_readonly'@'%';
FLUSH PRIVILEGES;Python Database Connection
import pymysql
import pandas as pd
class OpenCHSDatabase:
def __init__(self, host, database, user, password):
self.connection = pymysql.connect(
host=host,
database=database,
user=user,
password=password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def get_cases_dataframe(self, status=None):
"""Get cases as pandas DataFrame for analysis"""
query = "SELECT * FROM kase"
if status:
query += f" WHERE status = '{status}'"
return pd.read_sql(query, self.connection)
def get_daily_metrics(self, days=30):
"""Get daily metrics for reporting"""
query = f"""
SELECT * FROM fact_case_daily
WHERE metric_date >= DATE_SUB(CURDATE(), INTERVAL {days} DAY)
ORDER BY metric_date DESC
"""
return pd.read_sql(query, self.connection)
# Usage
db = OpenCHSDatabase(
host='helpline.example.com',
database='helpline',
user='external_readonly',
password='secure_password'
)
# Get all open cases
open_cases = db.get_cases_dataframe(status='open')
print(f"Open cases: {len(open_cases)}")
# Get metrics for dashboard
metrics = db.get_daily_metrics(days=7)
print(metrics[['metric_date', 'new_cases', 'resolved_cases']])Best Practices
Security
- Use API Keys: Don't embed credentials in code
- HTTPS Only: Always use secure connections
- Rate Limiting: Implement retry logic with exponential backoff
- Error Handling: Gracefully handle API errors
- Logging: Log all integration activities for debugging
Performance
- Batch Operations: Use batch APIs when available
- Pagination: Handle large datasets with pagination
- Caching: Cache frequently accessed data
- Async Processing: Use async for long-running operations
- Connection Pooling: Reuse database connections
Data Integrity
- Validation: Validate data before sending
- Idempotency: Handle duplicate requests gracefully
- Transactions: Use database transactions for consistency
- Error Recovery: Implement retry mechanisms
- Monitoring: Monitor integration health and errors
Example: Robust Integration Class
import requests
import time
from functools import wraps
def retry_on_failure(max_retries=3, delay=1):
"""Decorator for retry logic"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
time.sleep(delay * (2 ** attempt)) # Exponential backoff
return None
return wrapper
return decorator
class RobustOpenCHSClient:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'X-API-Key': api_key,
'Content-Type': 'application/json'
})
@retry_on_failure(max_retries=3, delay=2)
def create_case(self, case_data):
"""Create case with retry logic"""
# Validate data
self._validate_case_data(case_data)
response = self.session.post(
f'{self.base_url}/helpline/api/cases',
json=case_data,
timeout=30
)
response.raise_for_status()
return response.json()
def _validate_case_data(self, data):
"""Validate case data before submission"""
required_fields = ['title', 'category', 'priority']
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
if data['priority'] not in ['low', 'medium', 'high', 'critical']:
raise ValueError(f"Invalid priority: {data['priority']}")Troubleshooting
Common Issues
Authentication Failures:
# Check API key validity
curl -X GET https://your-domain.com/helpline/api/health \
-H "X-API-Key: your-api-key"
# Response should be 200 OKConnection Timeouts:
# Increase timeout for slow networks
response = requests.post(
url,
json=data,
timeout=60 # 60 seconds
)Rate Limiting:
# Handle rate limit errors
try:
response = client.post(url, json=data)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get('Retry-After', 60))
time.sleep(retry_after)
# Retry requestFor additional support, contact the OpenCHS development team or consult the API Reference.