Zero-Trust Security Architecture: Securing Modern Cloud Applications
Implement comprehensive zero-trust security principles in your cloud infrastructure to protect against evolving cyber threats and ensure compliance.
Zero-Trust Security Architecture: Securing Modern Cloud Applications
The Security Paradigm Shift
The traditional "castle and moat" security model is fundamentally broken in today's cloud-native world. With remote work, multi-cloud deployments, and microservices architectures, the network perimeter has dissolved. Enter Zero-Trust Architecture โ a security model that assumes breach and verifies everything.
In this comprehensive guide, we'll implement a complete zero-trust security architecture for modern cloud applications, covering identity, network, data, and application security.
Understanding Zero-Trust Principles
Zero-trust security is built on three core principles:
๐ Never Trust, Always Verify
Every user, device, and application must be authenticated and authorized before accessing resources, regardless of their location or network.
๐ฏ Least Privilege Access
Grant users and services only the minimum access required to perform their functions, and regularly review and adjust permissions.
๐ก๏ธ Assume Breach
Design your security architecture assuming that attackers are already inside your network, and implement controls to detect and contain threats.
Zero-Trust Architecture Components
Step 1: Identity and Access Management (IAM)
Let's start with implementing robust identity management using OAuth 2.0 and OpenID Connect:
# auth_service.py
from flask import Flask, request, jsonify, redirect
from flask_oidc import OpenIDConnect
import jwt
from datetime import datetime, timedelta
import redis
import logging
app = Flask(__name__)
app.config.update({
'SECRET_KEY': 'your-secret-key',
'OIDC_CLIENT_SECRETS': 'client_secrets.json',
'OIDC_ID_TOKEN_COOKIE_SECURE': True,
'OIDC_REQUIRE_VERIFIED_EMAIL': True,
'OIDC_SCOPES': ['openid', 'email', 'profile'],
})
oidc = OpenIDConnect(app)
redis_client = redis.Redis(host='redis', port=6379, db=0)
logger = logging.getLogger(__name__)
class ZeroTrustAuthenticator:
def __init__(self):
self.risk_factors = {
'location': 0.2,
'device': 0.3,
'behavior': 0.3,
'time': 0.2
}
def calculate_risk_score(self, user_context):
"""Calculate risk score based on various factors"""
risk_score = 0.0
# Location-based risk
if user_context.get('location') not in self.get_trusted_locations(user_context['user_id']):
risk_score += self.risk_factors['location']
# Device-based risk
if user_context.get('device_fingerprint') not in self.get_trusted_devices(user_context['user_id']):
risk_score += self.risk_factors['device']
# Behavioral analysis
if self.detect_anomalous_behavior(user_context):
risk_score += self.risk_factors['behavior']
# Time-based risk
if self.is_unusual_access_time(user_context):
risk_score += self.risk_factors['time']
return min(risk_score, 1.0)
def get_trusted_locations(self, user_id):
"""Get trusted locations for user"""
key = f"trusted_locations:{user_id}"
locations = redis_client.smembers(key)
return [loc.decode() for loc in locations]
def get_trusted_devices(self, user_id):
"""Get trusted devices for user"""
key = f"trusted_devices:{user_id}"
devices = redis_client.smembers(key)
return [device.decode() for device in devices]
def detect_anomalous_behavior(self, user_context):
"""Detect unusual user behavior patterns"""
user_id = user_context['user_id']
current_behavior = {
'access_patterns': user_context.get('access_patterns', []),
'resource_usage': user_context.get('resource_usage', {}),
'api_calls': user_context.get('api_calls', [])
}
# Compare with historical behavior
historical_key = f"behavior_profile:{user_id}"
historical_data = redis_client.hgetall(historical_key)
# Simple anomaly detection (in production, use ML models)
if not historical_data:
return False
# Check for unusual resource access
typical_resources = set(historical_data.get(b'typical_resources', b'').decode().split(','))
current_resources = set(user_context.get('accessed_resources', []))
if len(current_resources - typical_resources) > 3:
return True
return False
def is_unusual_access_time(self, user_context):
"""Check if access time is unusual for user"""
current_hour = datetime.now().hour
user_id = user_context['user_id']
# Get typical access hours
key = f"access_hours:{user_id}"
typical_hours = redis_client.smembers(key)
typical_hours = [int(hour) for hour in typical_hours]
# Consider unusual if outside typical hours
return current_hour not in typical_hours
authenticator = ZeroTrustAuthenticator()
@app.route('/authenticate', methods=['POST'])
@oidc.require_login
def authenticate():
"""Main authentication endpoint with zero-trust evaluation"""
try:
user_info = oidc.user_getinfo(['sub', 'email', 'name'])
user_context = {
'user_id': user_info['sub'],
'email': user_info['email'],
'location': request.headers.get('X-Forwarded-For'),
'device_fingerprint': request.headers.get('X-Device-Fingerprint'),
'user_agent': request.headers.get('User-Agent'),
'accessed_resources': request.json.get('resources', []),
'access_patterns': request.json.get('access_patterns', [])
}
# Calculate risk score
risk_score = authenticator.calculate_risk_score(user_context)
# Determine authentication requirements
auth_decision = {
'authenticated': True,
'risk_score': risk_score,
'requires_mfa': risk_score > 0.3,
'requires_device_verification': risk_score > 0.5,
'requires_admin_approval': risk_score > 0.8,
'session_duration': 3600 if risk_score < 0.3 else 1800, # Shorter sessions for higher risk
'allowed_resources': determine_allowed_resources(user_info, risk_score)
}
# Generate JWT token with risk-based claims
token = generate_jwt_token(user_info, auth_decision)
# Log authentication event
log_auth_event(user_context, auth_decision)
return jsonify({
'access_token': token,
'token_type': 'Bearer',
'expires_in': auth_decision['session_duration'],
'risk_score': risk_score,
'additional_verification_required': auth_decision['requires_mfa']
})
except Exception as e:
logger.error(f"Authentication error: {str(e)}")
return jsonify({'error': 'Authentication failed'}), 401
def generate_jwt_token(user_info, auth_decision):
"""Generate JWT token with zero-trust claims"""
payload = {
'sub': user_info['sub'],
'email': user_info['email'],
'name': user_info['name'],
'risk_score': auth_decision['risk_score'],
'allowed_resources': auth_decision['allowed_resources'],
'requires_mfa': auth_decision['requires_mfa'],
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=auth_decision['session_duration'])
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
def determine_allowed_resources(user_info, risk_score):
"""Determine resources user can access based on risk score"""
base_resources = get_user_base_resources(user_info['sub'])
if risk_score > 0.7:
# High risk: only basic resources
return [r for r in base_resources if r.get('sensitivity') == 'low']
elif risk_score > 0.4:
# Medium risk: exclude sensitive resources
return [r for r in base_resources if r.get('sensitivity') in ['low', 'medium']]
else:
# Low risk: all assigned resources
return base_resources
def get_user_base_resources(user_id):
"""Get base resources assigned to user"""
# In production, this would query your authorization system
key = f"user_resources:{user_id}"
resources = redis_client.hgetall(key)
return [{'name': k.decode(), 'sensitivity': v.decode()} for k, v in resources.items()]
def log_auth_event(user_context, auth_decision):
"""Log authentication event for monitoring"""
event = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_context['user_id'],
'email': user_context['email'],
'location': user_context['location'],
'risk_score': auth_decision['risk_score'],
'requires_mfa': auth_decision['requires_mfa'],
'session_duration': auth_decision['session_duration']
}
# Send to security monitoring system
logger.info(f"Authentication event: {event}")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, ssl_context='adhoc')
pythonStep 2: Network Segmentation with Service Mesh
Implement network segmentation using Istio service mesh:
# istio-security-config.yaml
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: production
spec:
mtls:
mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: zero-trust-policy
namespace: production
spec:
rules:
- from:
- source:
principals: ["cluster.local/ns/production/sa/web-service"]
to:
- operation:
methods: ["GET", "POST"]
when:
- key: request.headers[authorization]
values: ["Bearer *"]
- from:
- source:
principals: ["cluster.local/ns/production/sa/api-service"]
to:
- operation:
methods: ["GET", "POST", "PUT", "DELETE"]
when:
- key: custom.risk_score
values: ["[0-0.5]"] # Only allow low-risk requests
---
# Network policy for additional layer of security
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: zero-trust-network-policy
namespace: production
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: istio-system
- namespaceSelector:
matchLabels:
name: production
- podSelector:
matchLabels:
app: authorized-service
egress:
- to:
- namespaceSelector:
matchLabels:
name: kube-system
ports:
- protocol: TCP
port: 53
- protocol: UDP
port: 53
- to:
- podSelector:
matchLabels:
app: database
ports:
- protocol: TCP
port: 5432
yamlStep 3: Data Classification and Protection
Implement data classification and encryption:
# data_protection.py
import json
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
from enum import Enum
import hashlib
import re
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class DataProtectionService:
def __init__(self):
self.encryption_keys = {}
self.load_encryption_keys()
def load_encryption_keys(self):
"""Load encryption keys for different data classifications"""
for classification in DataClassification:
password = os.environ.get(f'ENCRYPTION_KEY_{classification.value.upper()}', 'default-key').encode()
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password))
self.encryption_keys[classification] = Fernet(key)
def classify_data(self, data):
"""Automatically classify data based on content analysis"""
data_str = json.dumps(data) if isinstance(data, dict) else str(data)
# PII patterns
pii_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', # Credit card
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' # Email
]
# Financial patterns
financial_patterns = [
r'\$\d+(?:,\d{3})*(?:\.\d{2})?', # Money amounts
r'\b(?:salary|income|revenue|profit)\b' # Financial terms
]
# Check for sensitive patterns
for pattern in pii_patterns:
if re.search(pattern, data_str, re.IGNORECASE):
return DataClassification.RESTRICTED
for pattern in financial_patterns:
if re.search(pattern, data_str, re.IGNORECASE):
return DataClassification.CONFIDENTIAL
# Check for internal keywords
internal_keywords = ['password', 'secret', 'key', 'token', 'internal']
if any(keyword in data_str.lower() for keyword in internal_keywords):
return DataClassification.INTERNAL
return DataClassification.PUBLIC
def encrypt_data(self, data, classification=None):
"""Encrypt data based on its classification"""
if classification is None:
classification = self.classify_data(data)
# Serialize data
if isinstance(data, (dict, list)):
data_bytes = json.dumps(data).encode()
else:
data_bytes = str(data).encode()
# Encrypt with appropriate key
cipher = self.encryption_keys[classification]
encrypted_data = cipher.encrypt(data_bytes)
return {
'encrypted_data': base64.b64encode(encrypted_data).decode(),
'classification': classification.value,
'data_hash': hashlib.sha256(data_bytes).hexdigest()
}
def decrypt_data(self, encrypted_package, user_clearance_level):
"""Decrypt data if user has appropriate clearance"""
classification = DataClassification(encrypted_package['classification'])
# Check access permissions
if not self.check_access_permission(user_clearance_level, classification):
raise PermissionError(f"Insufficient clearance for {classification.value} data")
# Decrypt data
cipher = self.encryption_keys[classification]
encrypted_data = base64.b64decode(encrypted_package['encrypted_data'])
decrypted_bytes = cipher.decrypt(encrypted_data)
# Verify integrity
data_hash = hashlib.sha256(decrypted_bytes).hexdigest()
if data_hash != encrypted_package['data_hash']:
raise ValueError("Data integrity check failed")
# Parse data
try:
return json.loads(decrypted_bytes.decode())
except json.JSONDecodeError:
return decrypted_bytes.decode()
def check_access_permission(self, user_clearance, data_classification):
"""Check if user has permission to access data"""
clearance_hierarchy = {
DataClassification.PUBLIC: 0,
DataClassification.INTERNAL: 1,
DataClassification.CONFIDENTIAL: 2,
DataClassification.RESTRICTED: 3
}
user_level = clearance_hierarchy.get(user_clearance, -1)
required_level = clearance_hierarchy[data_classification]
return user_level >= required_level
# Usage example
protection_service = DataProtectionService()
# Example API endpoint with data protection
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
def require_clearance(required_level):
"""Decorator to enforce data access clearance"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract user clearance from JWT token
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({'error': 'Authorization required'}), 401
try:
token = auth_header.split(' ')[1]
# Verify and decode JWT (implement your JWT verification)
user_clearance = get_user_clearance_from_token(token)
if not protection_service.check_access_permission(user_clearance, required_level):
return jsonify({'error': 'Insufficient clearance'}), 403
return f(*args, **kwargs)
except Exception as e:
return jsonify({'error': 'Authentication failed'}), 401
return decorated_function
return decorator
@app.route('/api/sensitive-data', methods=['GET'])
@require_clearance(DataClassification.CONFIDENTIAL)
def get_sensitive_data():
"""API endpoint that requires confidential clearance"""
# Your sensitive data logic here
sensitive_data = {
'user_financial_info': {
'salary': '$150,000',
'account_balance': '$50,000'
}
}
# Encrypt before returning
encrypted_data = protection_service.encrypt_data(sensitive_data)
return jsonify(encrypted_data)
def get_user_clearance_from_token(token):
"""Extract user clearance level from JWT token"""
# Implement JWT decoding and clearance extraction
# This is a simplified example
return DataClassification.CONFIDENTIAL
pythonStep 4: Continuous Monitoring and Threat Detection
Implement real-time security monitoring:
# security_monitor.py
import asyncio
import json
from datetime import datetime, timedelta
import logging
from dataclasses import dataclass
from typing import List, Dict, Any
import aioredis
import asyncpg
@dataclass
class SecurityEvent:
timestamp: datetime
event_type: str
severity: str
user_id: str
source_ip: str
details: Dict[str, Any]
risk_score: float
class SecurityMonitor:
def __init__(self):
self.alert_thresholds = {
'failed_logins': 5,
'unusual_access_patterns': 3,
'data_exfiltration': 1,
'privilege_escalation': 1
}
self.logger = logging.getLogger(__name__)
self.redis = None
self.db = None
async def initialize(self):
"""Initialize database connections"""
self.redis = await aioredis.from_url("redis://redis:6379")
self.db = await asyncpg.connect(
host="postgres",
database="security_db",
user="security_user",
password="secure_password"
)
async def process_security_event(self, event: SecurityEvent):
"""Process incoming security events"""
# Store event
await self.store_event(event)
# Analyze for threats
threat_level = await self.analyze_threat(event)
# Trigger alerts if necessary
if threat_level > 0.7:
await self.trigger_alert(event, threat_level)
# Update user risk profile
await self.update_user_risk_profile(event.user_id, event)
async def store_event(self, event: SecurityEvent):
"""Store security event in database"""
query = """
INSERT INTO security_events
(timestamp, event_type, severity, user_id, source_ip, details, risk_score)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"""
await self.db.execute(
query,
event.timestamp,
event.event_type,
event.severity,
event.user_id,
event.source_ip,
json.dumps(event.details),
event.risk_score
)
async def analyze_threat(self, event: SecurityEvent) -> float:
"""Analyze event for potential threats"""
threat_score = 0.0
# Check for brute force attacks
if event.event_type == 'failed_login':
recent_failures = await self.count_recent_events(
event.user_id, 'failed_login', minutes=15
)
if recent_failures >= self.alert_thresholds['failed_logins']:
threat_score += 0.8
# Check for unusual access patterns
if event.event_type == 'resource_access':
if await self.detect_unusual_access_pattern(event):
threat_score += 0.6
# Check for data exfiltration indicators
if event.event_type == 'data_download':
download_volume = event.details.get('volume_mb', 0)
if download_volume > 100: # More than 100MB
threat_score += 0.9
# Geographic anomalies
if await self.detect_geographic_anomaly(event):
threat_score += 0.5
# Time-based anomalies
if await self.detect_time_anomaly(event):
threat_score += 0.3
return min(threat_score, 1.0)
async def count_recent_events(self, user_id: str, event_type: str, minutes: int) -> int:
"""Count recent events of specific type for user"""
since = datetime.utcnow() - timedelta(minutes=minutes)
query = """
SELECT COUNT(*) FROM security_events
WHERE user_id = $1 AND event_type = $2 AND timestamp >= $3
"""
result = await self.db.fetchval(query, user_id, event_type, since)
return result or 0
async def detect_unusual_access_pattern(self, event: SecurityEvent) -> bool:
"""Detect unusual resource access patterns"""
# Get user's typical access pattern
query = """
SELECT details FROM security_events
WHERE user_id = $1 AND event_type = 'resource_access'
AND timestamp >= $2
ORDER BY timestamp DESC
LIMIT 50
"""
week_ago = datetime.utcnow() - timedelta(days=7)
rows = await self.db.fetch(query, event.user_id, week_ago)
if not rows:
return False
# Analyze access patterns
typical_resources = set()
for row in rows:
details = json.loads(row['details'])
typical_resources.update(details.get('resources', []))
current_resources = set(event.details.get('resources', []))
unusual_resources = current_resources - typical_resources
# Flag if accessing more than 3 unusual resources
return len(unusual_resources) > 3
async def detect_geographic_anomaly(self, event: SecurityEvent) -> bool:
"""Detect unusual geographic access"""
# Get user's typical locations
query = """
SELECT DISTINCT source_ip FROM security_events
WHERE user_id = $1 AND timestamp >= $2
"""
month_ago = datetime.utcnow() - timedelta(days=30)
rows = await self.db.fetch(query, event.user_id, month_ago)
typical_ips = {row['source_ip'] for row in rows}
# Simple check - in production, use geolocation services
return event.source_ip not in typical_ips
async def detect_time_anomaly(self, event: SecurityEvent) -> bool:
"""Detect unusual access times"""
# Get user's typical access hours
query = """
SELECT EXTRACT(HOUR FROM timestamp) as hour FROM security_events
WHERE user_id = $1 AND timestamp >= $2
"""
month_ago = datetime.utcnow() - timedelta(days=30)
rows = await self.db.fetch(query, event.user_id, month_ago)
typical_hours = {int(row['hour']) for row in rows}
current_hour = event.timestamp.hour
return current_hour not in typical_hours
async def trigger_alert(self, event: SecurityEvent, threat_level: float):
"""Trigger security alert"""
alert = {
'timestamp': datetime.utcnow().isoformat(),
'event_id': f"{event.user_id}_{event.timestamp.timestamp()}",
'threat_level': threat_level,
'event_type': event.event_type,
'user_id': event.user_id,
'source_ip': event.source_ip,
'details': event.details,
'recommended_actions': self.get_recommended_actions(threat_level)
}
# Send to security team
await self.send_alert(alert)
# Auto-remediation for high-threat events
if threat_level > 0.9:
await self.auto_remediate(event)
def get_recommended_actions(self, threat_level: float) -> List[str]:
"""Get recommended actions based on threat level"""
if threat_level > 0.9:
return [
"Immediately suspend user account",
"Force password reset",
"Review recent user activities",
"Contact user for verification"
]
elif threat_level > 0.7:
return [
"Require additional MFA verification",
"Monitor user activities closely",
"Review access permissions"
]
else:
return [
"Monitor for additional suspicious activities",
"Log event for investigation"
]
async def send_alert(self, alert: Dict[str, Any]):
"""Send alert to security team"""
# In production, integrate with your alerting system
# (Slack, PagerDuty, email, etc.)
self.logger.critical(f"SECURITY ALERT: {json.dumps(alert, indent=2)}")
# Store alert in Redis for real-time dashboard
await self.redis.lpush("security_alerts", json.dumps(alert))
await self.redis.expire("security_alerts", 86400) # Keep for 24 hours
async def auto_remediate(self, event: SecurityEvent):
"""Automatic remediation for high-threat events"""
# Suspend user session
await self.redis.set(f"suspended_user:{event.user_id}", "true", ex=3600)
# Block source IP temporarily
await self.redis.set(f"blocked_ip:{event.source_ip}", "true", ex=1800)
# Notify user
await self.notify_user(event.user_id, "Suspicious activity detected. Account temporarily suspended.")
async def update_user_risk_profile(self, user_id: str, event: SecurityEvent):
"""Update user's risk profile based on event"""
risk_key = f"user_risk:{user_id}"
# Get current risk score
current_risk = await self.redis.hget(risk_key, "score")
current_risk = float(current_risk) if current_risk else 0.0
# Update risk score
new_risk = min((current_risk + event.risk_score) / 2, 1.0)
# Update risk profile
await self.redis.hset(risk_key, mapping={
"score": new_risk,
"last_updated": datetime.utcnow().isoformat(),
"last_event": event.event_type
})
await self.redis.expire(risk_key, 86400 * 7) # Keep for 7 days
async def notify_user(self, user_id: str, message: str):
"""Notify user of security event"""
# Implementation depends on your notification system
notification = {
'user_id': user_id,
'message': message,
'timestamp': datetime.utcnow().isoformat(),
'type': 'security_alert'
}
await self.redis.lpush(f"notifications:{user_id}", json.dumps(notification))
# Usage example
monitor = SecurityMonitor()
async def process_authentication_event(user_id: str, success: bool, source_ip: str, details: Dict[str, Any]):
"""Process authentication event"""
event = SecurityEvent(
timestamp=datetime.utcnow(),
event_type='successful_login' if success else 'failed_login',
severity='info' if success else 'warning',
user_id=user_id,
source_ip=source_ip,
details=details,
risk_score=0.1 if success else 0.3
)
await monitor.process_security_event(event)
# Initialize and run
async def main():
await monitor.initialize()
# Example: Process a failed login
await process_authentication_event(
user_id="user123",
success=False,
source_ip="192.168.1.100",
details={"user_agent": "Mozilla/5.0...", "attempted_password": "***"}
)
if __name__ == "__main__":
asyncio.run(main())
pythonStep 5: Policy as Code and Compliance
Implement policy as code using Open Policy Agent (OPA):
# opa-policies.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: opa-policies
namespace: opa-system
data:
zero-trust-policy.rego: |
package zerotrust
import future.keywords.if
import future.keywords.in
# Default deny
default allow = false
# Allow if user has valid token and meets risk criteria
allow if {
valid_token
low_risk_score
authorized_resource
time_based_access
}
# Token validation
valid_token if {
auth_header := input.headers.authorization
startswith(auth_header, "Bearer ")
token := substring(auth_header, 7, -1)
jwt.verify_rs256(token, data.public_key)
payload := jwt.decode(token)[1]
payload.exp > time.now_ns() / 1000000000
}
# Risk score evaluation
low_risk_score if {
payload := jwt.decode(input.headers.authorization)[1]
payload.risk_score <= 0.5
}
# Resource authorization
authorized_resource if {
payload := jwt.decode(input.headers.authorization)[1]
allowed_resources := payload.allowed_resources
some resource in allowed_resources
resource.name == input.resource
}
# Time-based access control
time_based_access if {
current_hour := time.clock([time.now_ns(), "UTC"])
current_hour >= 8
current_hour <= 18
}
# Additional MFA requirement
requires_mfa if {
payload := jwt.decode(input.headers.authorization)[1]
payload.risk_score > 0.3
}
# Device verification requirement
requires_device_verification if {
payload := jwt.decode(input.headers.authorization)[1]
payload.risk_score > 0.5
}
# Data access policies
data_access_allowed if {
payload := jwt.decode(input.headers.authorization)[1]
user_clearance := payload.clearance_level
data_classification := input.data_classification
clearance_levels := {
"public": 0,
"internal": 1,
"confidential": 2,
"restricted": 3
}
clearance_levels[user_clearance] >= clearance_levels[data_classification]
}
# Network policy
network_access_allowed if {
source_namespace := input.source.namespace
target_namespace := input.target.namespace
# Allow same namespace communication
source_namespace == target_namespace
}
network_access_allowed if {
source_service := input.source.service
target_service := input.target.service
# Define allowed service communications
allowed_communications := {
{"web-service", "api-service"},
{"api-service", "database-service"},
{"monitoring-service", "api-service"}
}
{source_service, target_service} in allowed_communications
}
compliance-policy.rego: |
package compliance
import future.keywords.if
import future.keywords.in
# GDPR compliance checks
gdpr_compliant if {
data_retention_compliant
consent_collected
data_minimization_followed
}
data_retention_compliant if {
retention_period := input.data.retention_days
max_retention := data.gdpr_limits[input.data.type]
retention_period <= max_retention
}
consent_collected if {
input.user.consent_given == true
input.user.consent_date
}
data_minimization_followed if {
requested_fields := input.data.fields
necessary_fields := data.necessary_fields[input.operation]
count(requested_fields - necessary_fields) == 0
}
# SOC 2 compliance
soc2_compliant if {
access_controls_implemented
data_encrypted
audit_logging_enabled
}
access_controls_implemented if {
input.security.mfa_enabled == true
input.security.rbac_enabled == true
}
data_encrypted if {
input.data.encrypted_at_rest == true
input.data.encrypted_in_transit == true
}
audit_logging_enabled if {
input.logging.audit_enabled == true
input.logging.retention_days >= 365
}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: opa-server
namespace: opa-system
spec:
replicas: 3
selector:
matchLabels:
app: opa
template:
metadata:
labels:
app: opa
spec:
containers:
- name: opa
image: openpolicyagent/opa:latest-envoy
args:
- "run"
- "--server"
- "--config-file=/config/config.yaml"
- "/policies"
ports:
- containerPort: 8181
volumeMounts:
- name: opa-policies
mountPath: /policies
- name: opa-config
mountPath: /config
volumes:
- name: opa-policies
configMap:
name: opa-policies
- name: opa-config
configMap:
name: opa-config
yamlConclusion
Implementing zero-trust architecture requires a comprehensive approach that covers:
โ Key Achievements
- Identity-Centric Security: Every access request is authenticated and authorized
- Risk-Based Access Control: Dynamic permissions based on real-time risk assessment
- Data Protection: Classification-based encryption and access controls
- Continuous Monitoring: Real-time threat detection and automated response
- Policy as Code: Codified security policies for consistency and compliance
๐ Next Steps
- Implement gradually: Start with high-value assets and expand coverage
- Train your team: Ensure security awareness across all departments
- Regular audits: Continuously assess and improve your security posture
- Stay updated: Keep up with evolving threats and security best practices
Zero-trust is not a product you buy, but a security strategy you implement. Start with the fundamentals outlined in this guide and build upon them based on your specific requirements and risk profile.
Need help implementing zero-trust security? Reach out to our cybersecurity consulting team for a comprehensive security assessment and implementation roadmap.