I’m creating a webhook using the Python Flask framework. The webhook is designed to enable communication between a client (webhook generator) and a server (webhook listener). The server has a file which contains stock keeping units and stock statuses (“In Stock” or “Out of Stock”) of products. The objective is to allow the client to retrieve the stock status of specific products by sending HTTP POST requests to the server, which will display the stock status of the products sent by the clients in the request.Files for the client side have the word “producer” in their names, and files for the server side have the word “consumer” in their names.
The Python file tasks_producer.py
dictates how the HTTP POST requests (tasks) are generated.
# tasks_producer.py
import random
from faker import Faker
from faker.providers import BaseProvider
import config
import time
import requests
import json
import uuid
# Create and initialise a Faker generator and return data in US English
# Seeding to have the same results every time we execute the script
fakeTasks = Faker('en_US')
# Seed the Faker instance to have the same results every time we run the program
fakeTasks.seed_instance(0)
# Define a TaskProvider
class TaskProvider(BaseProvider):
def SKUs(self):
apparel = ["SWEATSHIRT", "TSHIRT", "HOODIE", "PFMTSHIRT"]
manufacturer = ["CN_KD", "CN_US", "CN_WM", "LS_US","RC_US"]
model = ["HWT","CLS","BAS","PRM"]
size = ["S", "M", "L", "XL", "2XL","3XL"]
colour = fakeTasks.color_name()
SKU = [apparel[random.randint(0, len(apparel)-1)], manufacturer[random.randint(0, len(manufacturer)-1)],
model[random.randint(0, len(model)-1)], size[random.randint(0, len(size)-1)], colour.upper()]
return SKU
# Assign the TaskProvider to the Faker instance
fakeTasks.add_provider(TaskProvider)
# Generate a fake task
def produce_task(batchid, taskid):
# SKU list composition
SKUList = []
for _ in range(random.randint(config.MIN_NBR_SKUS, config.MAX_NBR_SKUS)):
SKUList.append("_".join(fakeTasks.SKUs()))
# Message composition
message = {"version": "1.0", "batchid": batchid, "requestid": taskid, "SKU": SKUList}
return message
# Send a webhook request to a specified URL
def send_webhook(msg):
try:
# Post a webhook message
# default is a function applied to objects that are not serializable = it converts them to str
resp = requests.post(config.WEBHOOK_RECEIVER_URL, data=json.dumps(
msg, sort_keys=True, default=str), headers = {"Content-Type": "application/json"}, timeout=1.0)
# Raises an HTTPError if an error has occurred during the process (used for debugging).
resp.raise_for_status()
except requests.exceptions.HTTPError as err:
print("An HTTP Error occurred", repr(err))
except requests.exceptions.ConnectionError as err:
print("An Error Connecting to the API occurred", repr(err))
except requests.exceptions.Timeout as err:
print("A Timeout Error occurred", repr(err))
except requests.exceptions.RequestException as err:
print("An Unknown Error occurred", repr(err))
finally:
return resp.status_code
'''
Webhooks will try to send the request data 3 times, with 5 seconds delay between each try,
if an HTTP status 2xx is not returned.
'''
# Execute Fake Tasks
def produce_bunch_tasks():
n = random.randint(config.MIN_NBR_TASKS, config.MAX_NBR_TASKS)
batchid = str(uuid.uuid4())
for i in range(1, n + 1):
msg = produce_task(batchid, i)
resp = send_webhook(msg)
for _ in range(2):
if (resp >= 300):
time.sleep(5)
resp = send_webhook(msg)
else:
break
print(i, "out of ", n, " -- Status", resp, " -- Message = ", msg)
time.sleep(config.WAIT_TIME)
# Return a generator that enables the values of resp, n (passed as total) and msg to be retrieved one by one and passed to the "producer.html" file
yield resp, n, msg
# Stop the produce_bunch_tasks() function from being automatically executed when the tasks_producer.py file is imported as a module to another file.
# The produce_bunch_tasks() function will still be executed when running the file on its own.
# resp, total, and msg will be passed as variables to the template "producer.html" file:
if __name__ == "__main__":
for resp, total, msg in produce_bunch_tasks():
pass
The Python file app_producer.py
renders the client side Jinja template producer.html
.
# app_producer.py
from flask import Response, render_template
from init_producer import app
import tasks_producer
# Render a template with a given context as a stream and return a TemplateStream
def render_template_stream(template_name, **context):
app.update_template_context(context) # Update the template context with some commonly used variables.
t = app.jinja_env.get_template(template_name) # Load a template by name with loader and return a Template.
rv = t.stream(context) # Return a TemplateStream that returns one function after another as strings
rv.enable_buffering(5) # Buffer 5 items before yielding them
return rv # Return a TemplateStream
@app.route("/", methods=['GET'])
def index():
return render_template('producer.html')
@app.route('/producetasks', methods=['POST'])
def producetasks():
print("Producing tasks")
return Response(render_template_stream('producer.html', data = tasks_producer.produce_bunch_tasks()))
if __name__ == "__main__":
app.run(host="localhost",port=5000, debug=True)
The Jinja template file producer.html
is the web page for the client side:
<!-- producer.html -->
<!doctype html>
<html>
<head>
<title>Tasks Producer</title>
<link href="../styles/style.css" rel="stylesheet">
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<style>
#produceTasks{
display: block;
height: 20%;
width: 100%
}
#Messages{
height: 400px;
width: 100%;
border: 2px solid gray;
overflow-y: scroll
}
</style>
</head>
<body class="container">
<div>
<form method="post" id="produceTasksForm" action = "/producetasks">
<button type="submit" id="produceTasks" form="produceTasksForm">Produce Tasks</button>
</form>
</div>
<div>
<div id="Messages"></div>
{% for rsp,total, msg in data %}
<script>
var rsp = "{{ rsp }}";
var total = "{{ total }}";
var SKUstr = "{{msg['SKU']}}";
var lineidx = "{{ loop.index }}";
var SKUarray = SKUstr.split(',');
//If the webhook request succeeds color it in blue else in red.
if (rsp == '200') {
rsp = rsp.fontcolor("blue");
}
else {
rsp = rsp.fontcolor("red");
}
//Add the details of the generated task to the Messages section.
document.getElementById('Messages').innerHTML += ("<br>" + lineidx + " out of " + total + " -- "+ rsp + " -- " +
"{'version': '1.0', 'batchid': '{{msg['batchid']}}', 'requestid': '{{msg['requestid']}}', 'SKU': " + "<br>");
SKUarray.forEach((SKU)=>{
document.getElementById("Messages").innerHTML += SKU + "," + "<br>"
});
document.getElementById("Messages").innerHTML += "}"
</script>
{% endfor %}
</div>
</body>
</html>
On the server side, the tasks_consumer.py
file dictates how the server retrieves the stock status of the products sent through the webhook requests and return them back to the client.
# tasks_consumer.py
import csv
from flask import request
from init_consumer import app, socketio
import json
# Receive the webhook requests and emit a SocketIO event back to the client
def send_message(data):
status_code = 0
if request.method == 'POST':
roomid = app.config['uid']
msg = json.dumps(data)
event = "Send_stock_status"
socketio.emit(event, msg, namespace = '/collectHooks', room = roomid)
status_code = 200
else:
status_code = 405 # Method not allowed
return status_code
# Retrieve the stock status of the products sent through the webhook requests and return them back to the client.
def sendStockStatus():
SKUlist = []
stockSheet = {}
with open("NZ_NVJ_Apparel_SKUs_sheet.csv", newline='') as csvFile:
stockReader = csv.reader(csvFile, delimiter=',', quotechar='"')
for row in stockReader:
SKUlist.append(row[0])
if request.method == 'POST':
stockInfo = request.json
stockStr = str(stockInfo)
stockStr = stockStr.replace("\'", "\"")
stockDict = json.loads(stockStr)
for stock in stockDict["SKU"]:
if stock in SKUlist:
stockSheet.update({str(stock):"In Stock"})
else:
stockSheet.update({str(stock):"Out of Stock"})
send_message(stockSheet)
# print(stockSheet)
return stockSheet
The Python file app_consumer.py
renders the server side Jinja template consumer.html
.
# app_consumer.py
from flask import Response, render_template, request
from flask_socketio import join_room
from init_consumer import app, socketio
import tasks_consumer
import uuid
# Render a template with a given context as a stream and return a TemplateStream
def render_template_stream(template_name, **context):
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
rv.enable_buffering(5)
return rv
# Render the assigned template file
@app.route("/", methods=['GET'])
def index():
return render_template('consumer.html', stockStatus = {})
# Registers a function to be run before the first request to this instance of the application
# Create a unique session ID and store it within the application configuration file
@app.before_request
def initialize_params():
if not hasattr(app.config,'uid'):
sid = str(uuid.uuid4())
app.config['uid'] = sid
print("initialize_params - Session ID stored =", sid)
@app.route('/consumetasks', methods=['POST'])
def get_stock_status():
print("Retrieving stock status")
return Response(render_template_stream('consumer.html', stockStatus = tasks_consumer.sendStockStatus()))
# Execute on connecting
@socketio.on('connect', namespace='/collectHooks')
def socket_connect():
# Display message upon connecting to the namespace
print('Client Connected To NameSpace /collectHooks - ', request.sid)
# Execute on disconnecting
@socketio.on('disconnect', namespace='/collectHooks')
def socket_connect():
# Display message upon disconnecting from the namespace
print('Client disconnected From NameSpace /collectHooks - ', request.sid)
# Execute upon joining a specific room
@socketio.on('join_room', namespace='/collectHooks')
def on_room():
if app.config['uid']:
room = str(app.config['uid'])
# Display message upon joining a room specific to the session previously stored.
print(f"Socket joining room {room}")
join_room(room)
# Execute upon encountering any error related to the websocket
@socketio.on_error_default
def error_handler(e):
# Display message on error.
print(f"socket error: {e}, {str(request.event)}")
# Run using port 5001
if __name__ == "__main__":
socketio.run(app,host='localhost', port=5001,debug=True)
The Jinja template file consumer.html
is the web page for the server side.
<!-- consumer.html -->
<!doctype html>
<html>
<head>
<title>Stock Sheet</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<style>
th,td{
border: 1px solid rgb(190, 190, 190);
padding: 10px;
}
table {
border-collapse: collapse;
border: 2px solid rgb(200, 200, 200);
font-family: sans-serif;
}
</style>
</head>
<body class="container">
<h1>Stock Sheet</h1>
<div>
<form method="post" id="consumeTasksForm" action = "/consumetasks">
<button type="submit" id="consumeTasks" form="consumeTasksForm">Check stock status</button>
</form>
</div>
<table id="stockSheet">
<tr>
<th scope="col">SKU</th>
<th scope="col">Stock Status</th>
</tr>
</table>
{% for SKU, Stock_status in stockStatus.items() %}
<script>
var table = document.getElementById("stockSheet");
var row = table.insertRow(-1); // Insert a new row at the last position
var cellSKU = row.insertCell(0);
var cellStock = row.insertCell(1);
cellSKU.innerHTML = "{{SKU}}";
cellStock.innerHTML = "{{Stock_status}}";
</script>
{% endfor %}
</body>
</html>
Note that for simplicity, the code for the files config.py
, init_producer.py
and init_consumer.py
are not shown here.
To run the webhook, I ran the file app_consumer.py
and app_producer.py
in two terminals in Visual Studio Code. Rendering the template file producer.html
in the browser displays the Produce Tasks
button at the top of the screen. When I clicked on the button, the web page correctly displayed the HTTP POST request message sent. Rendering the template file consumer.html
in the browser shows the Check stock status
button and the headers of a table at the top of the screen. After the button is clicked, it was supposed to display the stock status of each product sent by the client in a table. However, after I clicked the button, the page displayed the following error:
Unsupported Media Type
Did not attempt to load JSON data because the request Content-Type was not ‘application/json’.
No other errors were displayed in the Visual Studio Code terminals. The send_webhook()
function in the tasks_producer.py
file did not throw any exceptions and returned the response status code of 200
as displayed on the rendered producer.html
file. I have tried to change the statement data=json.dumps(msg, sort_keys=True, default=str)
of the
resp = requests.post(config.WEBHOOK_RECEIVER_URL, data=json.dumps(msg, sort_keys=True, default=str), headers = {"Content-Type": "application/json"}, timeout=1.0)
statement in the tasks_producer.py
file to json=json.dumps(msg, sort_keys=True, default=str)
and data=msg
, and change headers = {"Content-Type": "application/json"}
to headers = {"Accept": "application/json"}
, but to no avail. What’s worse, the send_webhook()
function returned client error response codes in the range 400-499
for the last two options.
The rendered producer.html
page after clicking the Produce Tasks
button, which has the address http://localhost:5000/producetasks
, looked like this.
The rendered
consumer.html
page with the address http://localhost:5001
looked like this.After clicking the
Check stock status
button and the page was redirected to http://localhost:5001/consumetasks
, it displayed the 415 Unsupported Media Type
error message` as follows.
I have no idea what caused the error. Can anyone point me in the right direction?