feat: Implement CSRF protection and basic form validation, streamline form data, and add comprehensive unit and E2E tests.
This commit is contained in:
parent
4c435f8e17
commit
197ae8d75b
4 changed files with 120 additions and 19 deletions
58
app.py
58
app.py
|
|
@ -1,10 +1,26 @@
|
||||||
import os
|
import os
|
||||||
import yaml
|
import yaml
|
||||||
import gspread
|
import gspread
|
||||||
from flask import Flask, render_template, request, redirect, url_for, abort
|
import secrets
|
||||||
|
from flask import Flask, render_template, request, redirect, url_for, abort, session
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
app.secret_key = os.environ.get('SECRET_KEY', secrets.token_hex(32))
|
||||||
|
|
||||||
|
def generate_csrf_token():
|
||||||
|
if '_csrf_token' not in session:
|
||||||
|
session['_csrf_token'] = secrets.token_hex(16)
|
||||||
|
return session['_csrf_token']
|
||||||
|
|
||||||
|
app.jinja_env.globals['csrf_token'] = generate_csrf_token
|
||||||
|
|
||||||
|
@app.before_request
|
||||||
|
def csrf_protect():
|
||||||
|
if request.method == "POST":
|
||||||
|
token = session.get('_csrf_token', None)
|
||||||
|
if not token or token != request.form.get('_csrf_token'):
|
||||||
|
abort(403)
|
||||||
|
|
||||||
# Load Configuration
|
# Load Configuration
|
||||||
def load_config():
|
def load_config():
|
||||||
|
|
@ -47,20 +63,20 @@ def get_public_participants(sheet_id, tab_name=None):
|
||||||
public_list = []
|
public_list = []
|
||||||
for row in data_rows:
|
for row in data_rows:
|
||||||
# Ensure row is long enough to avoid errors
|
# Ensure row is long enough to avoid errors
|
||||||
if len(row) < 17:
|
if len(row) < 14:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Extract ONLY non-sensitive columns based on our save order
|
# Extract ONLY non-sensitive columns based on our save order
|
||||||
# 1: Klasse, 2: Zeilnummer, 3: Bootnaam, 4: Boottype
|
# 1: Klasse, 2: Zeilnummer, 3: Bootnaam, 4: Boottype
|
||||||
# 7: Naam, 10: Plaats, 16: Vereniging
|
# 7: Naam, 8: Plaats, 13: Vereniging
|
||||||
entry = {
|
entry = {
|
||||||
'klasse': row[1],
|
'klasse': row[1],
|
||||||
'zeilnummer': row[2],
|
'zeilnummer': row[2],
|
||||||
'bootnaam': row[3],
|
'bootnaam': row[3],
|
||||||
'boottype': row[4],
|
'boottype': row[4],
|
||||||
'naam': row[7],
|
'naam': row[7],
|
||||||
'plaats': row[10],
|
'plaats': row[8],
|
||||||
'vereniging': row[16]
|
'vereniging': row[13]
|
||||||
}
|
}
|
||||||
public_list.append(entry)
|
public_list.append(entry)
|
||||||
|
|
||||||
|
|
@ -101,8 +117,16 @@ def event_form(event_slug):
|
||||||
event_data['sheet_id'] = sheet_id
|
event_data['sheet_id'] = sheet_id
|
||||||
|
|
||||||
if request.method == 'POST':
|
if request.method == 'POST':
|
||||||
|
# Basic Validation
|
||||||
|
required_fields = ['klasse', 'zeilnummer', 'bootnaam', 'naam', 'telefoonmobiel', 'email']
|
||||||
|
for field in required_fields:
|
||||||
|
val = request.form.get(field)
|
||||||
|
if not val or not val.strip():
|
||||||
|
participants = get_public_participants(sheet_id, tab_name)
|
||||||
|
return render_template('form.html', event=event_data, slug=event_slug, participants=participants, error="Oeps! Bepaalde verplichte velden ontbreken.")
|
||||||
|
|
||||||
form_data = [
|
form_data = [
|
||||||
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 0: Timestamp
|
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 0: Timestamp
|
||||||
request.form.get('klasse'), # 1: Klasse
|
request.form.get('klasse'), # 1: Klasse
|
||||||
request.form.get('zeilnummer'), # 2: Zeilnummer
|
request.form.get('zeilnummer'), # 2: Zeilnummer
|
||||||
request.form.get('bootnaam'), # 3: Bootnaam
|
request.form.get('bootnaam'), # 3: Bootnaam
|
||||||
|
|
@ -110,23 +134,19 @@ def event_form(event_slug):
|
||||||
", ".join([k for k in ['genua', 'rolfok', 'spinaker', 'halfwinder', 'genaker', 'alleendacron'] if k in request.form]), # 5: Zeilvoering
|
", ".join([k for k in ['genua', 'rolfok', 'spinaker', 'halfwinder', 'genaker', 'alleendacron'] if k in request.form]), # 5: Zeilvoering
|
||||||
request.form.get('schroef'), # 6: Schroef
|
request.form.get('schroef'), # 6: Schroef
|
||||||
request.form.get('naam'), # 7: Naam (Keep public)
|
request.form.get('naam'), # 7: Naam (Keep public)
|
||||||
"", # 8: Straat (REMOVED)
|
request.form.get('plaats'), # 8: Plaats
|
||||||
"", # 9: Postcode (REMOVED)
|
request.form.get('land', 'Nederland'), # 9: Land
|
||||||
request.form.get('plaats'), # 10: Plaats
|
request.form.get('telefoonmobiel'), # 10: Mobiel (PRIVATE)
|
||||||
request.form.get('land', 'Nederland'), # 11: Land
|
request.form.get('email'), # 11: Email (PRIVATE)
|
||||||
request.form.get('telefoonmobiel'), # 12: Mobiel (PRIVATE)
|
request.form.get('startlicentienummer'), # 12: Licentie
|
||||||
"", # 13: Vast (REMOVED)
|
request.form.get('vereniging'), # 13: Vereniging
|
||||||
request.form.get('email'), # 14: Email (PRIVATE)
|
request.form.get('opmerkingen') # 14: Opmerkingen
|
||||||
request.form.get('startlicentienummer'), # 15: Licentie
|
|
||||||
request.form.get('vereniging'), # 16: Vereniging
|
|
||||||
"", # 17: Buffet (REMOVED)
|
|
||||||
"", # 18: Ontbijt (REMOVED)
|
|
||||||
request.form.get('opmerkingen') # 19: Opmerkingen
|
|
||||||
]
|
]
|
||||||
|
|
||||||
sheet = get_google_sheet(sheet_id, tab_name)
|
sheet = get_google_sheet(sheet_id, tab_name)
|
||||||
if sheet:
|
if sheet:
|
||||||
sheet.append_row(form_data)
|
if not os.environ.get('TESTING_NO_APPEND'):
|
||||||
|
sheet.append_row(form_data)
|
||||||
return redirect(url_for('success', event_slug=event_slug))
|
return redirect(url_for('success', event_slug=event_slug))
|
||||||
else:
|
else:
|
||||||
return f"Error: Could not connect to Google Sheet Tab '{tab_name}'. Check server logs."
|
return f"Error: Could not connect to Google Sheet Tab '{tab_name}'. Check server logs."
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,14 @@
|
||||||
<h1 class="text-center mb-2">{{ event.title }}</h1>
|
<h1 class="text-center mb-2">{{ event.title }}</h1>
|
||||||
<p class="text-center text-muted mb-4">{{ event.description }}</p>
|
<p class="text-center text-muted mb-4">{{ event.description }}</p>
|
||||||
|
|
||||||
|
{% if error %}
|
||||||
|
<div class="alert alert-danger" role="alert">
|
||||||
|
{{ error }}
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
<form method="POST">
|
<form method="POST">
|
||||||
|
<input type="hidden" name="_csrf_token" value="{{ csrf_token() }}">
|
||||||
<!-- Boot Info -->
|
<!-- Boot Info -->
|
||||||
<h4 class="section-title">De Boot</h4>
|
<h4 class="section-title">De Boot</h4>
|
||||||
<div class="row g-3 mb-3">
|
<div class="row g-3 mb-3">
|
||||||
|
|
|
||||||
45
tests/test_app.py
Normal file
45
tests/test_app.py
Normal file
|
|
@ -0,0 +1,45 @@
|
||||||
|
import pytest
|
||||||
|
from app import app
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client():
|
||||||
|
app.config['TESTING'] = True
|
||||||
|
with app.test_client() as client:
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
sess['_csrf_token'] = 'test-token'
|
||||||
|
yield client
|
||||||
|
|
||||||
|
def test_home_page(client):
|
||||||
|
response = client.get('/')
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert b"Zomeravond" in response.data or b"Papklokken" in response.data
|
||||||
|
|
||||||
|
@patch('app.get_google_sheet')
|
||||||
|
def test_post_missing_fields(mock_get_sheet, client):
|
||||||
|
response = client.post('/zomeravond', data={'_csrf_token': 'test-token'})
|
||||||
|
assert b"Oeps! Bepaalde verplichte velden ontbreken" in response.data
|
||||||
|
mock_get_sheet.return_value.append_row.assert_not_called()
|
||||||
|
|
||||||
|
@patch('app.get_google_sheet')
|
||||||
|
@patch('app.get_public_participants')
|
||||||
|
def test_post_success(mock_participants, mock_get_sheet, client):
|
||||||
|
mock_participants.return_value = []
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'_csrf_token': 'test-token',
|
||||||
|
'klasse': 'Kajuitklasse',
|
||||||
|
'zeilnummer': '123',
|
||||||
|
'bootnaam': 'TestBoat',
|
||||||
|
'naam': 'Test Name',
|
||||||
|
'telefoonmobiel': '0612345678',
|
||||||
|
'email': 'test@example.com'
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post('/zomeravond', data=data)
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert '/zomeravond/success' in response.headers.get('Location', '')
|
||||||
|
|
||||||
|
# In full testing, append_row is called unless TESTING_NO_APPEND is set.
|
||||||
|
# But since it's mocked, we can check it.
|
||||||
|
assert mock_get_sheet.return_value.append_row.called
|
||||||
29
tests/test_e2e.py
Normal file
29
tests/test_e2e.py
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import pytest
|
||||||
|
from playwright.sync_api import Page, expect
|
||||||
|
|
||||||
|
def test_homepage_has_title(page: Page):
|
||||||
|
page.goto("http://localhost:5000/")
|
||||||
|
expect(page).to_have_title(re.compile("Zeilwedstrijden"))
|
||||||
|
|
||||||
|
def test_submission_flow(page: Page):
|
||||||
|
page.goto("http://localhost:5000/zomeravond")
|
||||||
|
|
||||||
|
# Fill required fields
|
||||||
|
page.select_option("select[name='klasse']", label="Kajuitklasse")
|
||||||
|
page.fill("input[name='zeilnummer']", "42")
|
||||||
|
page.fill("input[name='bootnaam']", "Vliegende Hollander")
|
||||||
|
page.fill("input[name='naam']", "Hendrik Test")
|
||||||
|
page.fill("input[name='telefoonmobiel']", "0612345678")
|
||||||
|
page.fill("input[name='email']", "hendrik@example.com")
|
||||||
|
|
||||||
|
# Accept terms
|
||||||
|
page.check("input#terms")
|
||||||
|
|
||||||
|
# Submit
|
||||||
|
page.click("button[type='submit']")
|
||||||
|
|
||||||
|
# Expect success redirect
|
||||||
|
expect(page).to_have_url(re.compile(r".*/zomeravond/success"))
|
||||||
|
expect(page.locator("h1")).to_have_text("Bedankt!")
|
||||||
Loading…
Add table
Add a link
Reference in a new issue