Files
pixel/backend/tests/test_security_properties.py
张鹏 f9f4560459 Initial commit: Pixel AI comic/video creation platform
- FastAPI backend with SQLModel, Alembic migrations, AgentScope agents
- Next.js 15 frontend with React 19, Tailwind, Zustand, React Flow
- Multi-provider AI system (DashScope, Kling, MiniMax, Volcengine, OpenAI, etc.)
- All HTTP clients migrated from sync requests to async httpx
- Admin-managed API keys via environment variables
- SSRF vulnerability fixed in ensure_url()
2026-04-29 01:20:12 +08:00

357 lines
12 KiB
Python

"""
Property-Based Tests for Security Features
Tests:
- Property 25: Rate Limiting Execution
- Property 26: Input Validation and Sanitization
Uses Hypothesis for property-based testing to verify security properties
across a wide range of inputs.
"""
import pytest
import time
from hypothesis import given, strategies as st, settings, assume, HealthCheck
from fastapi.testclient import TestClient
from src.main import app
from src.utils.validators import sanitize_string, sanitize_dict
from src.utils.errors import InvalidParameterException
# ============================================================================
# Property 25: Rate Limiting Execution
# ============================================================================
class TestRateLimitingProperties:
"""
Property 25: Rate Limiting Execution
Validates: Requirements 20.1, 20.2
For any request that exceeds the configured rate limit, the system should:
1. Reject the request with 429 status code
2. Include Retry-After header
3. Include rate limit headers (X-RateLimit-*)
4. Track requests per user and per IP
"""
def test_rate_limit_headers_present(self):
"""
Property: All responses should include rate limit headers.
**Validates: Requirements 20.2**
"""
client = TestClient(app)
response = client.get("/health")
# Verify rate limit headers are present
assert "X-RateLimit-Limit" in response.headers
assert "X-RateLimit-Remaining" in response.headers
assert "X-RateLimit-Reset" in response.headers
# Verify headers contain valid values
# Note: If Redis is not connected, limit may be 0 (rate limiting disabled)
limit = int(response.headers["X-RateLimit-Limit"])
remaining = int(response.headers["X-RateLimit-Remaining"])
reset_time = int(response.headers["X-RateLimit-Reset"])
# Headers should be present and parseable as integers
assert limit >= 0
assert remaining >= 0
assert reset_time >= 0
@given(
num_requests=st.integers(min_value=1, max_value=5)
)
@settings(max_examples=10, deadline=2000)
def test_rate_limit_headers_decrement(self, num_requests):
"""
Property: Rate limit remaining should decrement with each request.
**Validates: Requirements 20.1, 20.2**
"""
client = TestClient(app)
previous_remaining = None
for i in range(num_requests):
response = client.get("/health")
remaining = int(response.headers["X-RateLimit-Remaining"])
if previous_remaining is not None:
# Remaining should decrease (or stay same if limit is very high)
assert remaining <= previous_remaining
previous_remaining = remaining
# ============================================================================
# Property 26: Input Validation and Sanitization
# ============================================================================
class TestInputValidationProperties:
"""
Property 26: Input Validation and Sanitization
Validates: Requirements 20.3
For any user input, the system should:
1. Detect and reject SQL injection attempts
2. Detect and reject XSS attempts
3. Sanitize safe inputs appropriately
4. Preserve safe content while escaping dangerous content
"""
# SQL Injection test cases
@given(
sql_keyword=st.sampled_from([
"UNION SELECT", "DROP TABLE", "DELETE FROM", "INSERT INTO",
"UPDATE SET", "EXEC", "EXECUTE", "'; DROP", "admin'--",
"1' OR '1'='1", "1 UNION SELECT"
])
)
@settings(max_examples=30, deadline=2000)
def test_sql_injection_detection(self, sql_keyword):
"""
Property: Any input containing SQL injection patterns should be rejected.
**Validates: Requirements 20.3**
"""
# Create malicious input with SQL keyword
malicious_input = f"test {sql_keyword} malicious"
# Should raise InvalidParameterException
with pytest.raises(InvalidParameterException) as exc_info:
sanitize_string(malicious_input, "test_field")
# Verify exception was raised (the specific message may vary)
assert exc_info.value is not None
# XSS test cases
@given(
xss_pattern=st.sampled_from([
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')",
"<iframe src='http://evil.com'></iframe>",
"<body onload=alert('XSS')>",
"<svg onload=alert('XSS')>",
"<input onfocus=alert('XSS') autofocus>",
"<object data='javascript:alert(XSS)'>",
"<embed src='javascript:alert(XSS)'>",
])
)
@settings(max_examples=30, deadline=2000)
def test_xss_detection(self, xss_pattern):
"""
Property: Any input containing XSS patterns should be rejected.
**Validates: Requirements 20.3**
"""
# Should raise InvalidParameterException
with pytest.raises(InvalidParameterException) as exc_info:
sanitize_string(xss_pattern, "test_field")
# Verify exception was raised
assert exc_info.value is not None
# Safe input test cases
@given(
safe_text=st.text(
min_size=1,
max_size=200,
alphabet=st.characters(
whitelist_categories=('Lu', 'Ll', 'Nd', 'Zs'),
whitelist_characters='.,!?-_@#()[]'
)
)
)
@settings(max_examples=50, deadline=2000)
def test_safe_input_passes(self, safe_text):
"""
Property: Safe input without malicious patterns should pass validation.
**Validates: Requirements 20.3**
"""
# Filter out any accidental SQL/XSS patterns
assume("UNION" not in safe_text.upper())
assume("SELECT" not in safe_text.upper())
assume("DROP" not in safe_text.upper())
assume("DELETE" not in safe_text.upper())
assume("SCRIPT" not in safe_text.upper())
assume("--" not in safe_text)
assume("<" not in safe_text)
assume(">" not in safe_text)
# Should not raise exception
result = sanitize_string(safe_text, "test_field", allow_html=False)
# Result should be a string
assert isinstance(result, str)
assert len(result) > 0
@given(
text=st.text(min_size=1, max_size=50, alphabet="<>abc123 ")
)
@settings(max_examples=30, deadline=2000)
def test_html_escaping_when_not_allowed(self, text):
"""
Property: When HTML is not allowed, HTML characters should be escaped
or the input should be rejected if it contains malicious patterns.
**Validates: Requirements 20.3**
"""
# Filter out XSS patterns that would be rejected
assume("script" not in text.lower())
assume("javascript:" not in text.lower())
assume("onerror" not in text.lower())
assume("onload" not in text.lower())
assume("iframe" not in text.lower())
try:
result = sanitize_string(text, "test_field", allow_html=False)
# If it passes, HTML should be escaped
if '<' in text:
assert '&lt;' in result or '<' not in result
if '>' in text:
assert '&gt;' in result or '>' not in result
except InvalidParameterException:
# Some patterns might still be caught as malicious, which is acceptable
pass
@given(
data=st.fixed_dictionaries({
'prompt': st.text(min_size=1, max_size=100, alphabet=st.characters(
whitelist_categories=('Lu', 'Ll', 'Nd', 'Zs'),
whitelist_characters='.,!?-_'
)),
'model': st.sampled_from(['flux-dev', 'flux-pro', 'sd-3']),
'n': st.integers(min_value=1, max_value=4)
})
)
@settings(max_examples=30, deadline=2000)
def test_dict_sanitization_safe_data(self, data):
"""
Property: Dictionary sanitization should preserve safe data structure.
**Validates: Requirements 20.3**
"""
# Filter out accidental malicious patterns
assume("UNION" not in data['prompt'].upper())
assume("SELECT" not in data['prompt'].upper())
assume("DROP" not in data['prompt'].upper())
assume("<" not in data['prompt'])
assume("--" not in data['prompt'])
# Should not raise exception
result = sanitize_dict(data, allow_html=False)
# Verify structure is preserved
assert isinstance(result, dict)
assert 'prompt' in result
assert 'model' in result
assert 'n' in result
assert result['model'] == data['model']
assert result['n'] == data['n']
@given(
malicious_field=st.sampled_from([
"<script>alert('XSS')</script>",
"'; DROP TABLE users; --",
"1' OR '1'='1",
"<img src=x onerror=alert(1)>"
])
)
@settings(max_examples=20, deadline=2000)
def test_dict_sanitization_malicious_data(self, malicious_field):
"""
Property: Dictionary sanitization should reject dictionaries
containing malicious data in any field.
**Validates: Requirements 20.3**
"""
data = {
'prompt': malicious_field,
'model': 'flux-dev'
}
# Should raise InvalidParameterException
with pytest.raises(InvalidParameterException):
sanitize_dict(data, allow_html=False)
@given(
nested_data=st.fixed_dictionaries({
'request': st.fixed_dictionaries({
'prompt': st.text(min_size=1, max_size=50, alphabet=st.characters(
whitelist_categories=('Lu', 'Ll', 'Nd', 'Zs')
)),
'params': st.fixed_dictionaries({
'n': st.integers(min_value=1, max_value=4)
})
})
})
)
@settings(max_examples=20, deadline=2000)
def test_nested_dict_sanitization(self, nested_data):
"""
Property: Nested dictionary sanitization should work recursively.
**Validates: Requirements 20.3**
"""
# Filter out accidental malicious patterns
assume("UNION" not in nested_data['request']['prompt'].upper())
assume("SELECT" not in nested_data['request']['prompt'].upper())
assume("<" not in nested_data['request']['prompt'])
# Should not raise exception
result = sanitize_dict(nested_data, allow_html=False)
# Verify nested structure is preserved
assert isinstance(result, dict)
assert 'request' in result
assert 'prompt' in result['request']
assert 'params' in result['request']
assert 'n' in result['request']['params']
@given(
safe_list=st.lists(
st.text(min_size=1, max_size=20, alphabet=st.characters(
whitelist_categories=('Lu', 'Ll', 'Nd')
)),
min_size=1,
max_size=5
)
)
@settings(max_examples=20, deadline=2000)
def test_list_sanitization_in_dict(self, safe_list):
"""
Property: Lists within dictionaries should be sanitized recursively.
**Validates: Requirements 20.3**
"""
# Filter out accidental malicious patterns
for item in safe_list:
assume("UNION" not in item.upper())
assume("SELECT" not in item.upper())
assume("<" not in item)
data = {
'prompts': safe_list,
'model': 'flux-dev'
}
# Should not raise exception
result = sanitize_dict(data, allow_html=False)
# Verify list is preserved
assert isinstance(result, dict)
assert 'prompts' in result
assert isinstance(result['prompts'], list)
assert len(result['prompts']) == len(safe_list)
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])