There are problems on problems on problems here, unfortunately. I had the same confusion as you, and it looks like it is disputed by others as well (CPython #101932).
The short answer is: the email
module isn’t a complete solution for parsing POST requests, and doing it yourself is harder than you expect; save yourself the headache and just grab a small 3rd-party library or a large 3rd-party framework.
As for the long answer…
-
email.parser.BytesParser().parse(environ['wsgi.input'])
doesn’t work because the email
module is expecting properly-formed e-mails that include a header, but WSGI just passes the body.
- the least wasteful way I could come up with to do this was to create a
BytesFeedParser
, stick a header into it, terminate the header block, then stream the request body into it.
- we are Formally Given Permission not to worry about
chunked
transfer encoding, thankfully.
- To go into detail: the major WSGI server implementations all take one of these 3 strategies: (a) reject such requests, per the spec; (b) cache the whole thing and give the WSGI app a synthetic
CONTENT_LENGTH
value when the time comes, also allowed per the spec; or (c) implement actual empty reads instead of just hanging when you try to consume past the end of the input, this last and nonstandard behavior being indicated by a input_terminated
flag. Each of these strategies is very easy for us to operate under.
- the outer package is a “multipart” message, with each message inside of it having the field value as the sub-message body, and the relevant details scattered variously around the
Content-Type
and Content-Disposition
headers
Combining these, and handling the 3 core use-cases of (i) GETed forms, (ii) POSTed forms, and (iii) multipart forms, which is required to send files, we get something like this:
import wsgiref.simple_server
import email.message, email.parser, email.policy
import urllib.parse
import re
from collections import namedtuple
try:
from resource import getpagesize
except ImportError:
import mmap
def getpagesize():
return mmap.PAGESIZE
FieldEntry = namedtuple('FieldEntry', ["name", "value", "filename", "MIMEtype"])
FieldEntry_T = 'tuple[str, Optional[bytes], Optional[str], Optional[str]]'
# * Field names are decoded into strings for CONVENIENCE
# * Values are left as they came in from the wire.
# * No need for fe.isFile -- fe.filename is None for non-file inputs.
# BEWARE: fe.filename = '' when no file is chosen. This is what the browesr sends, weirdly.
# * MIMEtype is 99% of the time GUESSED by the browser based on literally
# nothing except a static lookup of the file extension in its local database,
# so it's not usually useful. But it's not ALWAYS so useless; it can
# be set by certain API clients in certain circumstances, so we're
# not going to discard that information, when it is actually sent, in case
# the application building off this function needs it.
def wsgi_parseForm(environ, /) -> 'Iterable[FieldEntry_T]':
if environ['REQUEST_METHOD'] == 'GET':
for k, v in _parse_qs(environ['QUERY_STRING']):
yield FieldEntry(k, v, None, None)
return
m = email.message.Message()
m.add_header('Content-Type', environ['CONTENT_TYPE'])
match m.get_content_type():
case 'application/x-www-form-urlencoded':
for k, v in _parse_qs(bytes().join(_wsgi_body(environ)).decode('ascii')):
yield FieldEntry(k, v, None, None)
return
case 'multipart/form-data':
p = email.parser.BytesFeedParser(policy=email.policy.HTTP)
p.feed(('Content-Type: %s\r\n' % environ['CONTENT_TYPE']).encode('utf-8'))
# ^Don't try to abbreviate this line; it also injects the boundary parameter, which is needed to parse out the sub-messages!
p.feed('\r\n'.encode('utf-8'))
for chunk in _wsgi_body(environ):
# TODO stream each element to the caller as they arrive
# rather than loading them all into RAM at the same time
p.feed(chunk)
m = p.close(); del p
assert m.is_multipart()
for part in m.iter_parts():
part.set_default_type(None)
yield FieldEntry(
part.get_param('name', header='content-disposition'),
part.get_payload(decode=True),
part.get_filename(None),
part.get_content_type()
)
case t:
raise ValueError('unexpected Content-Type: %s' % t)
def _wsgi_body(environ, /):
# Workaround helper function for https://github.com/python/cpython/issues/66077
wsgi_input = environ['wsgi.input']
try:
_read = wsgi_input.read1
except AttributeError:
def _read(n=getpagesize(), /):
return wsgi_input.read(n)
if environ.get('wsgi.input_terminated', False):
# https://github.com/GrahamDumpleton/mod_wsgi/blob/4.9.4/docs/configuration-directives/WSGIChunkedRequest.rst
try:
yield from iter(wsgi_input)
return
except (TypeError, NotImplementedError):
while chunk := _read():
yield chunk
return
if 'HTTP_TRANSFER_ENCODING' in environ:
# https://mail.python.org/pipermail/web-sig/2007-March/002630.html
# https://wsgi.readthedocs.io/en/latest/proposals-2.0.html#unknown-length-wsgi-input
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding#chunked_encoding
raise NotImplementedError("Transfer-Encoding: %s" % environ['HTTP_TRANSFER_ENCODING'])
toread = int(environ.get('CONTENT_LENGTH', 0) or 0) # Weirdly, this is set to the empty string for requests with no body, hence the or-clause
readsofar = 0
while (readsofar < toread):
chunk = _read()
readsofar += len(chunk)
yield chunk
_QSSPLIT = re.compile(r'(?:^|&)([^&]*)')
_QSPARAM = re.compile(r'^(.*?)(?:=(.*))?$', re.DOTALL)
def _parse_qs(qs: str) -> 'Iterable[tuple[str, Optional[bytes]]]':
for p in (m.group(1) for m in _QSSPLIT.finditer(qs)):
k, v = _QSPARAM.match(p).groups()
k = urllib.parse.unquote_plus(k)
v = urllib.parse.unquote_to_bytes(v.replace('+', ' ')) if v is not None else v
yield k, v
def main_wsgi(environ, start_response):
from pprint import pformat
if (environ['REQUEST_METHOD'] == 'GET') and (not environ.get('QUERY_STRING')):
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<form method="POST" enctype="multipart/form-data">'.encode()
yield '<label for="username">username:</label><input name="username" value="user123" /><br />'.encode()
yield '<label for="password">password:</label><input name="password" value="lol456" /><br />'.encode()
yield '<label for="avatar">avatar:</label><input name="avatar" type="file"><br /><input type="submit" value="register" />'.encode()
else:
start_response('200 OK', [('Content-Type', 'text/plain')])
yield pformat(list(wsgi_parseForm(environ))).encode()
if __name__ == '__main__':
wsgiref.simple_server.make_server('', 8000, main_wsgi).serve_forever()