Flask Python code audit ideas and practical records

0 22
Flask code auditSQL injectionCommand/Code ExecutionDeserializationFile operation...

Flask code auditSQL injectionCommand/Code ExecutionDeserializationFile operationsXXESSRFXSSOthersAuditing PracticePostscriptreference

Flask code audit

SQL injection

1. The correct usage is straightforward: use "comma", not "percent sign"

Flask Python code audit ideas and practical records
stmt = "SELECT * FROM table WHERE id=?"
connection.execute(stmt, (value,))
# Or
cursor.execute("SELECT * FROM users WHERE name = ?", (username,))

2. Check if all SQL statements use+,%sORf-stringDirectly concatenate user input

"SELECT * FROM table WHERE id=" + value
"SELECT * FROM table WHERE id=%s" % value
"SELECT * FROM table WHERE id={0}".format(value)

3. SQLAlchemy'stext()Whether parameterization is performed

from sqlalchemy import text
# Incorrect usage
stmt = text(f"SELECT * FROM users WHERE name = '{username}'")
# Correct usage
stmt = text("SELECT * FROM users WHERE name = :username").bindparams(username=username)
# Or
query = "SELECT * FROM articles WHERE title LIKE :keyword"
result = db.session.execute(query, {"keyword": f"%{keyword}%"})

4. Safe use of ORM: Prefer the ORM method

# SQLAlchemy ORM
User.query.filter_by(username=username).first()

Command/Code Execution

1. Dangerous functions popen, system, commands, subprocess, exec, eval

import subprocess

@app.route('/ping')
def ping():
ip = request.args.get('ip')
result = subprocess.run(["ping", "-c", "1", ip], capture_output=True, text=True)
return f"<pre>{result.stdout}</pre>"
#subprocess.run() binding parameters will not execute malicious commands.

2. SSTI

render_template_string

3. Risks of third-party libraries

import yaml
# Vulnerability example: Using default Loader
data = yaml.load(user_input, Loader=yaml.Loader) # Can trigger arbitrary code execution

4. Deserialization Vulnerabilities: pickle, marshal, PyYAML

import pickle
# Vulnerability example: Deserialization of user controllable data
data = request.get_data()
obj = pickle.loads(data) # Attackers can construct malicious serialized objects (such as反弹Shell)

Deserialization

The core of deserialization vulnerabilities is that when the program restores untrusted serialized data to an object, the legality of the data is not verified, leading to attackers executing arbitrary code through maliciously constructed serialized data. Common scenarios:

  1. pickleUnsafe use of the modulepickle.loads()Directly deserialize user input.

  2. PyYAMLUnsafe Loadingyaml.load()Default support for executing constructors(such as!!python/object)。

  3. Custom deserialization logic:self-implemented by developers__reduce__methods are exploited

1. Unsafe modules:pickle,marshal,PyYAMLetc.

2. Defense measures: prefer to use secure formats such as JSON(json.loads())insteadpickle.

3. Deserialization operations of third-party libraries are dangerous libraries:PyYAML(Default Loader is not secure)、dill,shelveetc.

4. Safe UsagepickleOnly deserialize trusted data to ensure the source of serialized data is trustworthy. Signature verification signs the serialized data to prevent tampering.

import hmac, pickle
key = b'secret_key'
data = request.get_data()
# Verify HMAC signature
if not hmac.compare_digest(hmac.new(key, data).digest(), request.headers.get('Signature')):
    abort(403)
obj = pickle.loads(data)

5. Secure usagePyYAMLUse the secure Loader forcibly: such asSafeLoaderORFullLoader

import yaml
data = yaml.load(user_input, Loader=yaml.SafeLoader)  # Disable constructor

File operations

1. Key functions

file(), file.save(), open(), codecs.open()

2. Secure file upload (whitelist file type restriction, rename uploaded file, magic verify file content)

ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'}
def allowed_file(filename):
    return '.' in filename and \
        filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

if file and allowed_file(file.filename):
    file.save("safe_path")
     
import uuid
secure_name = str(uuid.uuid4()) + ".png"  # Generate a random filename or through secure_filename()
file.save(f"/uploads/{secure_name}")

import magic
mime = magic.from_buffer(file.read(1024), mime=True)
if mime not in ['image/png', 'image/jpeg', 'image/gif']:
    abort(400, "Invalid file type")

3. Prevent path traversal

from werkzeug.utils import secure_filename
import os

filename = secure_filename(request.form['filename'])  # Filter special characters
base_dir = os.path.abspath("/var/data")
target_path = os.path.join(base_dir, filename)

# Ensure the target path is under base_dir
if not os.path.commonprefix([base_dir, target_path]) == base_dir:
    abort(403, "Invalid path")

4. Secure file download: map filenames to safe IDs without exposing the file path directly

# Database stores the mapping of file ID to the actual path
@app.route('/download/<file_id>')
def download(file_id):
    file_path = db.get_file_path(file_id)  # Query the safe path from the database
    return send_file(file_path)

XXE

1. Directly parse user XML and use dangerous functions/libraries:lxml,xml.etree.ElementTree,defusedxml

xml_data='''<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///c:/cc.txt">]>
<data>&xxe;</data>'''
from lxml import etree

# Vulnerability example: directly parse user input
root = etree.fromstring(xml_data)  # Allow external entity resolution
print(root.text)

2. Disable external entity resolution, input filtering, and use JSON instead of XML

from defusedxml.ElementTree import parse
tree = parse(xml_file)  # Disable external entities by default

from lxml import etree
parser = etree.XMLParser(resolve_entities=False)  # Disable entity resolution
root = etree.fromstring(xml_data, parser=parser)

if re.search(r"<!ENTITY|SYSTEM|PUBLIC", xml_data, re.IGNORECASE):
    abort(400, "Invalid XML")

SSRF

1. Direct request, indirect URL concatenation of the user-provided URL (urllib, requests, etc. libraries)

# Vulnerability example: Directly request the user-provided URL
url = request.form['url']
response = requests.get(url)  # Input "file:///etc/passwd" may read the file (depending on library support)

# Vulnerability example: Concatenating user input to the internal API
user_id = request.args.get('id')
internal_url = f"http://internal-api:8080/user/{user_id}"
requests.get(internal_url)  # Input "id@evil.com" may bypass verification
# requests.get(), urllib.request.urlopen()

2. Verify domain/IP, restrict protocol type, and prevent DNS rebinding attacks

ALLOWED_DOMAINS = {'example.com', 'cdn.example.net'}
from urllib.parse import urlparse
def is_allowed_url(url):
    parsed = urlparse(url)
    if parsed.hostname in ALLOWED_DOMAINS:
        return True
    return False
if not is_allowed_url(url):
    abort(400, "Invalid URL")

parsed = urlparse(url)
if parsed.scheme not in ['http', 'https']:
    abort(400, "Unsupported protocol")

import socket
from urllib.parse import urlparse
parsed = urlparse(url)
hostname = parsed.hostname
# Parse DNS and verify if the IP is valid
resolved_ip = socket.gethostbyname(hostname)
if resolved_ip in ['127.0.0.1', '169.254.169.254']:
    abort(403, "Forbidden IP")

XSS

1. Directly render unescaped user input (|safefilter orMarkupclass orrender_template_stringor directly return input)

from flask import Markup
user_input = request.args.get('q')
return render_template('search.html', result=Markup(user_input))

<script>
  var userData = "{{ search_query|safe }}";  // Input "; alert(1); //
</script>

2. Payload

<svg><script>alert(1)</script></svg>  <!-- HTML entity encoding bypass -->
<script>alert(1)</script>
<img src=https://www.freebuf.com/articles/es/x onerror=alert(1) >
<img/src='1'/onerror=alert(0)>

3. Useescape()Filter user input, use render_template to render, and set CSP (Content Security Policy)

from markupsafe import escape

@app.route('/comment', methods=['POST'])
def comment():
    username = escape(request.args.get('username'))
    return render_template('profile.html', username=username)

4. File Upload

the attacker uploads maliciousxss.svg

<svg xmlns="http://www.w3.org/2000/svg" version="1.1">
   <circle cx="100" cy="50" r="40" stroke="black" stroke-width="2" fill="red" />
   <script>alert(1)</script>
</svg>

Solution: Prohibit uploading SVG, HTML files or useContent-Disposition: attachment, to prevent direct parsing in the browser

return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=True)

Others

CSRF

from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)

Permission verification

from flask_login import current_user
user = User.query.get(user_id)
if user.id != current_user.id:
    abort(403)

Logical vulnerability

# Vulnerability example: Unlocked balance deduction leading to concurrency
user.balance -= amount
db.session.commit()  # Concurrent requests may cause the balance to be negative

Component vulnerabilities

pip install safety
safety scan
safety system-scan
safety scan --apply-fixes

cors

# Flask-CORS Example
from flask_cors import CORS
CORS(app, origins=["https://www.attacker.com"], supports_credentials=True)
Origin: http://www.attacker.com
# Return as follows
Access-Control-Allow-Origin: https://www.attacker.com
Access-Control-Allow-Credentials: true
# Or as follows
Access-Control-Allow-Origin: null
Access-Control-Allow-Credentials: true
ConfigurationRisk levelDescription
Access-Control-Allow-Origin: *+Credentials: trueSecure (but incorrect)The browser forcibly blocks the request
Access-Control-Allow-Origin: Any value+Credentials: trueHigh riskData can be stolen by attackers
Access-Control-Allow-Origin: null+Credentials: trueHigh riskMay bypass the same-origin policy in special scenarios

Fix core: Do not use client-provided data without verificationOriginDirectly return *, and wildcard characters are prohibited when carrying credentials*andnull.

Key environment variable

import os
app.secret_key = os.getenv("SECRET_KEY", "fallback_secret")

The principle of least privilege

Ordinary users are only grantedSELECT, INSERT, UPDATEPrivileges, only administrators canDROPORALTERDatabase

CREATE USER 'appuser'@'localhost' IDENTIFIED BY 'strongpassword';
GRANT SELECT, INSERT, UPDATE ON mydb.* TO 'appuser'@'localhost';
CREATE USER 'admin'@'localhost' IDENTIFIED BY 'adminpassword';
GRANT ALL PRIVILEGES ON mydb.* TO 'admin'@'localhost';

Cache security

from flask import Flask, request, jsonify
from flask_caching import Cache
import redis

app = Flask(__name__)

# Configure cache (assuming Redis is in the intranet, password and TLS are enabled)
app.config['CACHE_TYPE'] = 'RedisCache'
app.config['CACHE_REDIS_HOST'] = '10.0.0.5'  # Intranet IP
app.config['CACHE_REDIS_PORT'] = 6379
app.config['CACHE_REDIS_PASSWORD'] = 'strong_redis_password'
app.config['CACHE_KEY_PREFIX'] = 'myapp:'  # Use namespace to isolate data
app.config['CACHE_DEFAULT_TIMEOUT'] = 300

cache = Cache(app)

# Example interface: store and retrieve user data (assuming data is sensitive, encrypt and store first)
import hashlib
import base64

def encrypt_data(data, key='secret_key'):
    # This is just an example of using a simple hash with salt (please use a more secure encryption method in actual scenarios)
    return base64.b64encode(hashlib.sha256((data + key).encode()).digest()).decode()

def decrypt_data(data, key='secret_key'):
    # The encrypted data cannot be decrypted directly, this is just an example
    return data

@app.route('/store', methods=['POST'])
def store():
    username = request.form.get('username')
    secret_info = request.form.get('secret_info')
    if not username or not secret_info:
        return jsonify({'error': 'Missing parameters'}), 400
    # Encrypt sensitive data
    encrypted = encrypt_data(secret_info)
    cache_key = f"user:{username}:info"
    cache.set(cache_key, encrypted)
    return jsonify({'message': 'Stored securely'}), 200

@app.route('/retrieve', methods=['GET'])
def retrieve():
    username = request.args.get('username')
    if not username:
        return jsonify({'error': 'Missing username'}), 400
    cache_key = f"user:{username}:info"
    encrypted = cache.get(cache_key)
    if not encrypted:
        return jsonify({'error': 'No data found'}), 404
    # Here is the decryption call (if decryption is possible)
    info = decrypt_data(encrypted)
    return jsonify({'username': username, 'secret_info': info}), 200

if __name__ == '__main__':
    app.run(debug=False)

Auditing Practice

Targeting a certain Flask CMS, search for|safeThe keyword is found to exist in article.html

{% if render_recommendations is defined %}
    {{ render_recommendations()|safe }}
    {% endif %}

in__init__.pydeclare render_recommendations context in

def init_app(self, app):
        """Initialize plugin"""
        super().init_app(app)
        
        # Register template function
        def render_recommendations():
            """Render recommendation template"""
            template_path = os.path.join(os.path.dirname(__file__), 'templates', 'recommendations.html')
            if os.path.exists(template_path):
                with open(template_path, 'r', encoding='utf-8') as f:
                    template = f.read()
                return Markup(render_template_string(template))
            return ''
        
        # Directly add to Jinja2 environment
        app.jinja_env.globals['render_recommendations'] = render_recommendations

Here, Markup and |safe add the content of recommendations.html to article.html as follows

<div class="bg-white dark:bg-gray-800 rounded-lg shadow-sm p-8 mt-12 mb-8">
    <h3 class="text-xl font-bold mb-8 text-gray-900 dark:text-white flex items-center">
        <svg class="w-5 h-5 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
            <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 10V3L4 14h7v7l9-11h-7z"></path>
        </svg>
        Related Recommendations
    </h3>
    <div id="recommendations-container" class="mb-2">
        <!-- Recommended content will be loaded dynamically via JS -->
    </div>
</div>

<script src="https://www.freebuf.com/articles/es/{{ url_for('article_recommender_static', filename='js/recommendations.js') }}"></script>

The following JS file is as follows:

Functional moduleFunction
EMPTY_STATE_HTMLDisplay empty state when there are no recommended articles
ERROR_STATE_HTMLDisplay error state when loading fails
renderArticleCard(article)Generate article card HTML
renderTags(tags)Render tags (up to 2)
loadRecommendations(articleId)Request and render recommended articles
DOMContentLoadedEventAutomatically fetch recommended articles after the page is loaded
function renderArticleCard(article) {
    return `
        <a href="https://www.freebuf.com/article/${article.id}" 
           class="group block bg-gray-50 dark:bg-gray-700 rounded-lg p-6 transition-all duration-200 hover:shadow-md hover:bg-gray-100 dark:hover:bg-gray-600"
            <div class="space-y-4">
                <h4 class="font-bold text-gray-900 dark:text-gray-100 group-hover:text-blue-600 dark:group-hover:text-blue-400 transition-colors duration-200 line-clamp-2">
                    ${article.title}
                </h4>
                <p class="text-sm text-gray-600 dark:text-gray-300 line-clamp-2">
                    ${article.summary}
                </p>
                <div class="flex items-center justify-between">
                    <span class="text-sm text-gray-500 dark:text-gray-400">
                        ${article.category}
                    </span>
                    <div class="flex flex-wrap gap-2">>
                        ${renderTags(article.tags)}
                    </div>
                </div>
            </div>
        </a>
    `;
}

title, category, tags all have stored XSS, submit malicious payload<img/src='1'/onerror=alert(0)>After that, render js code to complete the pop-up in the related recommendation feature of other articles

1738548944_67a026d026dadb3153c25.png!small?1738548944504

Postscript

Referer / Origin

In HTTP requests,RefererandOriginThey are two header fields related to the source of the request, but they have significant differences in their usage, format, and security features. The following are their core differences:

1. Definition and format

FieldDefinitionFormat exampleIncludes path information
RefererIndicates the current request's Complete URL of the source pagehttps://example.com/page.html✅ Includes path (such as/page.html)
OriginIndicates the current request's Protocol + Domain + Port(Does not include path)https://example.com:8080❌ Does not include path

2. Main uses

FieldApplication scenariosTypical use cases
Referer- Traffic source statistics - Prevent image hotlinking (Hotlinking) - Analyze user behaviorUsers frompageA.htmlClick the link to jump topageB.html,RefererValuepageA.html
Origin- CORS (Cross-Origin Resource Sharing) security mechanism - Limit the source of cross-domain requestsAutomatically added when the browser initiates a cross-domain AJAX requestOriginHeaders provided for server verification

3. Sending conditions

FieldTrigger the type of request sentBrowser behavior
Referer- Page jump (such asClick) - Resource loading (such as, ``) - Form submission (GET/POST)Default sending, but may be blocked by browser policies (such asReferrer-Policy)or user settings block
OriginCross-domain AJAX (fetch/XMLHttpRequest)-POSTrequests (part of the browsers) - WebSocket connectionOnly in cross-domain requests or specific methods (such asPOST)when sent

Transaction execution failed

If the transaction execution fails, the database connection may become stuck, affecting other queries.

try:
    db.session.add(new_user)
    db.session.commit()
except Exception as e:
    db.session.rollback()  # Transaction rollback, to prevent database state anomalies
    print(f"Error: {e}")

reference

https://mp.weixin.qq.com/s/y1ta34MzowUnOvFnShk2MQ
https://www.freebuf.com/news/168362.html
https://blog.hackall.cn/cvesubmit/614.html
https://www.freebuf.com/articles/web/404899.html
https://blog.neargle.com/2016/07/25/log-of-simple-code-review-about-python-base-webapp/
https://www.cnblogs.com/xiaozi/p/7268506.html
你可能想看:
最后修改时间:
admin
上一篇 2025年03月29日 02:09
下一篇 2025年03月29日 02:32

评论已关闭