Flask code auditSQL injectionCommand/Code ExecutionDeserializationFile operationsXXESSRFXSSOthersAuditing PracticePostscriptreference
Flask code audit
SQL injection
1. The correct usage is straightforward: use "comma", not "percent sign"

stmt = "SELECT * FROM table WHERE id=?"
connection.execute(stmt, (value,))
# Or
cursor.execute("SELECT * FROM users WHERE name = ?", (username,))
2. Check if all SQL statements use+
,%s
ORf-string
Directly concatenate user input
"SELECT * FROM table WHERE id=" + value
"SELECT * FROM table WHERE id=%s" % value
"SELECT * FROM table WHERE id={0}".format(value)
3. SQLAlchemy'stext()
Whether parameterization is performed
from sqlalchemy import text
# Incorrect usage
stmt = text(f"SELECT * FROM users WHERE name = '{username}'")
# Correct usage
stmt = text("SELECT * FROM users WHERE name = :username").bindparams(username=username)
# Or
query = "SELECT * FROM articles WHERE title LIKE :keyword"
result = db.session.execute(query, {"keyword": f"%{keyword}%"})
4. Safe use of ORM: Prefer the ORM method
# SQLAlchemy ORM
User.query.filter_by(username=username).first()
Command/Code Execution
1. Dangerous functions popen, system, commands, subprocess, exec, eval
import subprocess
@app.route('/ping')
def ping():
ip = request.args.get('ip')
result = subprocess.run(["ping", "-c", "1", ip], capture_output=True, text=True)
return f"<pre>{result.stdout}</pre>"
#subprocess.run() binding parameters will not execute malicious commands.
2. SSTI
render_template_string
3. Risks of third-party libraries
import yaml
# Vulnerability example: Using default Loader
data = yaml.load(user_input, Loader=yaml.Loader) # Can trigger arbitrary code execution
4. Deserialization Vulnerabilities: pickle, marshal, PyYAML
import pickle
# Vulnerability example: Deserialization of user controllable data
data = request.get_data()
obj = pickle.loads(data) # Attackers can construct malicious serialized objects (such as反弹Shell)
Deserialization
The core of deserialization vulnerabilities is that when the program restores untrusted serialized data to an object, the legality of the data is not verified, leading to attackers executing arbitrary code through maliciously constructed serialized data. Common scenarios:
pickle
Unsafe use of the module:pickle.loads()
Directly deserialize user input.PyYAML
Unsafe Loading:yaml.load()
Default support for executing constructors(such as!!python/object
)。Custom deserialization logic:self-implemented by developers
__reduce__
methods are exploited
1. Unsafe modules:pickle
,marshal
,PyYAML
etc.
2. Defense measures: prefer to use secure formats such as JSON(json.loads()
)insteadpickle
.
3. Deserialization operations of third-party libraries are dangerous libraries:PyYAML
(Default Loader is not secure)、dill
,shelve
etc.
4. Safe Usagepickle
Only deserialize trusted data to ensure the source of serialized data is trustworthy. Signature verification signs the serialized data to prevent tampering.
import hmac, pickle key = b'secret_key' data = request.get_data() # Verify HMAC signature if not hmac.compare_digest(hmac.new(key, data).digest(), request.headers.get('Signature')): abort(403) obj = pickle.loads(data)
5. Secure usagePyYAML
Use the secure Loader forcibly: such asSafeLoader
ORFullLoader
import yaml data = yaml.load(user_input, Loader=yaml.SafeLoader) # Disable constructor
File operations
1. Key functions
file(), file.save(), open(), codecs.open()
2. Secure file upload (whitelist file type restriction, rename uploaded file, magic verify file content)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS if file and allowed_file(file.filename): file.save("safe_path") import uuid secure_name = str(uuid.uuid4()) + ".png" # Generate a random filename or through secure_filename() file.save(f"/uploads/{secure_name}") import magic mime = magic.from_buffer(file.read(1024), mime=True) if mime not in ['image/png', 'image/jpeg', 'image/gif']: abort(400, "Invalid file type")
3. Prevent path traversal
from werkzeug.utils import secure_filename import os filename = secure_filename(request.form['filename']) # Filter special characters base_dir = os.path.abspath("/var/data") target_path = os.path.join(base_dir, filename) # Ensure the target path is under base_dir if not os.path.commonprefix([base_dir, target_path]) == base_dir: abort(403, "Invalid path")
4. Secure file download: map filenames to safe IDs without exposing the file path directly
# Database stores the mapping of file ID to the actual path @app.route('/download/<file_id>') def download(file_id): file_path = db.get_file_path(file_id) # Query the safe path from the database return send_file(file_path)
XXE
1. Directly parse user XML and use dangerous functions/libraries:lxml
,xml.etree.ElementTree
,defusedxml
xml_data='''<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///c:/cc.txt">]> <data>&xxe;</data>''' from lxml import etree # Vulnerability example: directly parse user input root = etree.fromstring(xml_data) # Allow external entity resolution print(root.text)
2. Disable external entity resolution, input filtering, and use JSON instead of XML
from defusedxml.ElementTree import parse tree = parse(xml_file) # Disable external entities by default from lxml import etree parser = etree.XMLParser(resolve_entities=False) # Disable entity resolution root = etree.fromstring(xml_data, parser=parser) if re.search(r"<!ENTITY|SYSTEM|PUBLIC", xml_data, re.IGNORECASE): abort(400, "Invalid XML")
SSRF
1. Direct request, indirect URL concatenation of the user-provided URL (urllib, requests, etc. libraries)
# Vulnerability example: Directly request the user-provided URL url = request.form['url'] response = requests.get(url) # Input "file:///etc/passwd" may read the file (depending on library support) # Vulnerability example: Concatenating user input to the internal API user_id = request.args.get('id') internal_url = f"http://internal-api:8080/user/{user_id}" requests.get(internal_url) # Input "id@evil.com" may bypass verification # requests.get(), urllib.request.urlopen()
2. Verify domain/IP, restrict protocol type, and prevent DNS rebinding attacks
ALLOWED_DOMAINS = {'example.com', 'cdn.example.net'} from urllib.parse import urlparse def is_allowed_url(url): parsed = urlparse(url) if parsed.hostname in ALLOWED_DOMAINS: return True return False if not is_allowed_url(url): abort(400, "Invalid URL") parsed = urlparse(url) if parsed.scheme not in ['http', 'https']: abort(400, "Unsupported protocol") import socket from urllib.parse import urlparse parsed = urlparse(url) hostname = parsed.hostname # Parse DNS and verify if the IP is valid resolved_ip = socket.gethostbyname(hostname) if resolved_ip in ['127.0.0.1', '169.254.169.254']: abort(403, "Forbidden IP")
XSS
1. Directly render unescaped user input (|safe
filter orMarkup
class orrender_template_string
or directly return input)
from flask import Markup user_input = request.args.get('q') return render_template('search.html', result=Markup(user_input)) <script> var userData = "{{ search_query|safe }}"; // Input "; alert(1); // </script>
2. Payload
<svg><script>alert(1)</script></svg> <!-- HTML entity encoding bypass --> <script>alert(1)</script> <img src=https://www.freebuf.com/articles/es/x onerror=alert(1) > <img/src='1'/onerror=alert(0)>
3. Useescape()
Filter user input, use render_template to render, and set CSP (Content Security Policy)
from markupsafe import escape @app.route('/comment', methods=['POST']) def comment(): username = escape(request.args.get('username')) return render_template('profile.html', username=username)
4. File Upload
the attacker uploads maliciousxss.svg
<svg xmlns="http://www.w3.org/2000/svg" version="1.1">
<circle cx="100" cy="50" r="40" stroke="black" stroke-width="2" fill="red" />
<script>alert(1)</script>
</svg>
Solution: Prohibit uploading SVG, HTML files or use
Content-Disposition: attachment
, to prevent direct parsing in the browser
return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=True)
Others
CSRF
from flask_wtf.csrf import CSRFProtect csrf = CSRFProtect(app)
Permission verification
from flask_login import current_user user = User.query.get(user_id) if user.id != current_user.id: abort(403)
Logical vulnerability
# Vulnerability example: Unlocked balance deduction leading to concurrency user.balance -= amount db.session.commit() # Concurrent requests may cause the balance to be negative
Component vulnerabilities
pip install safety safety scan safety system-scan safety scan --apply-fixes
cors
# Flask-CORS Example from flask_cors import CORS CORS(app, origins=["https://www.attacker.com"], supports_credentials=True)
Origin: http://www.attacker.com # Return as follows Access-Control-Allow-Origin: https://www.attacker.com Access-Control-Allow-Credentials: true # Or as follows Access-Control-Allow-Origin: null Access-Control-Allow-Credentials: true
Configuration | Risk level | Description |
---|---|---|
Access-Control-Allow-Origin: * +Credentials: true | Secure (but incorrect) | The browser forcibly blocks the request |
Access-Control-Allow-Origin: Any value +Credentials: true | High risk | Data can be stolen by attackers |
Access-Control-Allow-Origin: null +Credentials: true | High risk | May bypass the same-origin policy in special scenarios |
Fix core: Do not use client-provided data without verificationOrigin
Directly return *, and wildcard characters are prohibited when carrying credentials*
andnull
.
Key environment variable
import os app.secret_key = os.getenv("SECRET_KEY", "fallback_secret")
The principle of least privilege
Ordinary users are only granted
SELECT, INSERT, UPDATE
Privileges, only administrators canDROP
ORALTER
Database
CREATE USER 'appuser'@'localhost' IDENTIFIED BY 'strongpassword'; GRANT SELECT, INSERT, UPDATE ON mydb.* TO 'appuser'@'localhost'; CREATE USER 'admin'@'localhost' IDENTIFIED BY 'adminpassword'; GRANT ALL PRIVILEGES ON mydb.* TO 'admin'@'localhost';
Cache security
from flask import Flask, request, jsonify from flask_caching import Cache import redis app = Flask(__name__) # Configure cache (assuming Redis is in the intranet, password and TLS are enabled) app.config['CACHE_TYPE'] = 'RedisCache' app.config['CACHE_REDIS_HOST'] = '10.0.0.5' # Intranet IP app.config['CACHE_REDIS_PORT'] = 6379 app.config['CACHE_REDIS_PASSWORD'] = 'strong_redis_password' app.config['CACHE_KEY_PREFIX'] = 'myapp:' # Use namespace to isolate data app.config['CACHE_DEFAULT_TIMEOUT'] = 300 cache = Cache(app) # Example interface: store and retrieve user data (assuming data is sensitive, encrypt and store first) import hashlib import base64 def encrypt_data(data, key='secret_key'): # This is just an example of using a simple hash with salt (please use a more secure encryption method in actual scenarios) return base64.b64encode(hashlib.sha256((data + key).encode()).digest()).decode() def decrypt_data(data, key='secret_key'): # The encrypted data cannot be decrypted directly, this is just an example return data @app.route('/store', methods=['POST']) def store(): username = request.form.get('username') secret_info = request.form.get('secret_info') if not username or not secret_info: return jsonify({'error': 'Missing parameters'}), 400 # Encrypt sensitive data encrypted = encrypt_data(secret_info) cache_key = f"user:{username}:info" cache.set(cache_key, encrypted) return jsonify({'message': 'Stored securely'}), 200 @app.route('/retrieve', methods=['GET']) def retrieve(): username = request.args.get('username') if not username: return jsonify({'error': 'Missing username'}), 400 cache_key = f"user:{username}:info" encrypted = cache.get(cache_key) if not encrypted: return jsonify({'error': 'No data found'}), 404 # Here is the decryption call (if decryption is possible) info = decrypt_data(encrypted) return jsonify({'username': username, 'secret_info': info}), 200 if __name__ == '__main__': app.run(debug=False)
Auditing Practice
Targeting a certain Flask CMS, search for|safe
The keyword is found to exist in article.html
{% if render_recommendations is defined %} {{ render_recommendations()|safe }} {% endif %}
in__init__.py
declare render_recommendations context in
def init_app(self, app): """Initialize plugin""" super().init_app(app) # Register template function def render_recommendations(): """Render recommendation template""" template_path = os.path.join(os.path.dirname(__file__), 'templates', 'recommendations.html') if os.path.exists(template_path): with open(template_path, 'r', encoding='utf-8') as f: template = f.read() return Markup(render_template_string(template)) return '' # Directly add to Jinja2 environment app.jinja_env.globals['render_recommendations'] = render_recommendations
Here, Markup and |safe add the content of recommendations.html to article.html as follows
<div class="bg-white dark:bg-gray-800 rounded-lg shadow-sm p-8 mt-12 mb-8"> <h3 class="text-xl font-bold mb-8 text-gray-900 dark:text-white flex items-center"> <svg class="w-5 h-5 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24"> <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 10V3L4 14h7v7l9-11h-7z"></path> </svg> Related Recommendations </h3> <div id="recommendations-container" class="mb-2"> <!-- Recommended content will be loaded dynamically via JS --> </div> </div> <script src="https://www.freebuf.com/articles/es/{{ url_for('article_recommender_static', filename='js/recommendations.js') }}"></script>
The following JS file is as follows:
Functional module | Function |
---|---|
EMPTY_STATE_HTML | Display empty state when there are no recommended articles |
ERROR_STATE_HTML | Display error state when loading fails |
renderArticleCard(article) | Generate article card HTML |
renderTags(tags) | Render tags (up to 2) |
loadRecommendations(articleId) | Request and render recommended articles |
DOMContentLoaded Event | Automatically fetch recommended articles after the page is loaded |
function renderArticleCard(article) {
return `
<a href="https://www.freebuf.com/article/${article.id}"
class="group block bg-gray-50 dark:bg-gray-700 rounded-lg p-6 transition-all duration-200 hover:shadow-md hover:bg-gray-100 dark:hover:bg-gray-600"
<div class="space-y-4">
<h4 class="font-bold text-gray-900 dark:text-gray-100 group-hover:text-blue-600 dark:group-hover:text-blue-400 transition-colors duration-200 line-clamp-2">
${article.title}
</h4>
<p class="text-sm text-gray-600 dark:text-gray-300 line-clamp-2">
${article.summary}
</p>
<div class="flex items-center justify-between">
<span class="text-sm text-gray-500 dark:text-gray-400">
${article.category}
</span>
<div class="flex flex-wrap gap-2">>
${renderTags(article.tags)}
</div>
</div>
</div>
</a>
`;
}
title, category, tags all have stored XSS, submit malicious payload<img/src='1'/onerror=alert(0)>
After that, render js code to complete the pop-up in the related recommendation feature of other articles
Postscript
Referer / Origin
In HTTP requests,Referer
andOrigin
They are two header fields related to the source of the request, but they have significant differences in their usage, format, and security features. The following are their core differences:
1. Definition and format
Field | Definition | Format example | Includes path information |
---|---|---|---|
Referer | Indicates the current request's Complete URL of the source page | https://example.com/page.html | ✅ Includes path (such as/page.html ) |
Origin | Indicates the current request's Protocol + Domain + Port(Does not include path) | https://example.com:8080 | ❌ Does not include path |
2. Main uses
Field | Application scenarios | Typical use cases |
---|---|---|
Referer | - Traffic source statistics - Prevent image hotlinking (Hotlinking) - Analyze user behavior | Users frompageA.html Click the link to jump topageB.html ,Referer ValuepageA.html |
Origin | - CORS (Cross-Origin Resource Sharing) security mechanism - Limit the source of cross-domain requests | Automatically added when the browser initiates a cross-domain AJAX requestOrigin Headers provided for server verification |
3. Sending conditions
Field | Trigger the type of request sent | Browser behavior |
---|---|---|
Referer | - Page jump (such asClick) - Resource loading (such as , ``) - Form submission (GET/POST) | Default sending, but may be blocked by browser policies (such asReferrer-Policy )or user settings block |
Origin | Cross-domain AJAX (fetch /XMLHttpRequest )-POST requests (part of the browsers) - WebSocket connection | Only in cross-domain requests or specific methods (such asPOST )when sent |
Transaction execution failed
If the transaction execution fails, the database connection may become stuck, affecting other queries.
try: db.session.add(new_user) db.session.commit() except Exception as e: db.session.rollback() # Transaction rollback, to prevent database state anomalies print(f"Error: {e}")
reference
https://mp.weixin.qq.com/s/y1ta34MzowUnOvFnShk2MQ https://www.freebuf.com/news/168362.html https://blog.hackall.cn/cvesubmit/614.html https://www.freebuf.com/articles/web/404899.html https://blog.neargle.com/2016/07/25/log-of-simple-code-review-about-python-base-webapp/ https://www.cnblogs.com/xiaozi/p/7268506.html

评论已关闭