import json
import logging
import traceback
from flask import abort
from flask import Flask
from flask import jsonify
from flask import request
from flask import send_file
from flask import send_from_directory
from flask import make_response
from flask_cors import CORS
import matplotlib
# make sure we use Agg for offscreen rendering
matplotlib.use("Agg")
import simpy
from openclsim import model
from openclsim import savesim
from openclsim import core
from openclsim import plot
import datetime
import functools
import os
import time
import pandas as pd
import glob
import pathlib
import urllib.parse
import json
import hashlib
root_folder = pathlib.Path(__file__).parent.parent
static_folder = root_folder / "static"
assert (
static_folder.exists()
), "Make sure you run the server from the static directory. {} does not exist".format(
static_folder
)
app = Flask(__name__, static_folder=str(static_folder))
CORS(app)
logger = logging.getLogger(__name__)
[docs]@app.route("/")
def main():
return jsonify(dict(message="Basic Digital Twin Server"))
[docs]@app.route("/csv")
def csv():
print(dir(request), request, request.url, request.base_url, request.host_url)
paths = [
urllib.parse.urljoin(request.host_url, str(x))
for x in static_folder.relative_to(root_folder).glob("**/*.csv")
]
print(paths, static_folder)
df = pd.DataFrame(data={"paths": paths})
csv = df.to_csv(index=False)
resp = make_response(csv)
resp.headers["Content-Type"] = "text/csv"
return resp
[docs]@app.route("/simulate", methods=["POST"])
def simulate():
"""run a simulation"""
if not request.is_json:
abort(400, description="content type should be json")
return
config = request.get_json(force=True)
try:
simulation_result = simulate_from_json(config)
except ValueError as valerr:
trace = traceback.format_exc()
resp = jsonify({"error": str(valerr), "traceback": trace})
resp.status_code = 400
return resp
except RuntimeError as runerr:
trace = traceback.format_exc()
resp = jsonify({"error": str(runerr), "traceback": trace})
resp.status_code = 500
return resp
except Exception as e:
logging.exception("unexpected error")
abort(500, description=str(e))
return
return jsonify(simulation_result)
[docs]@app.route("/plot")
def demo_plot():
"""demo plot"""
# Gantt + Spill + CO2
fig = plot.demo_plot()
return plot.fig2response(fig)
[docs]@app.route("/energy_plot", methods=["POST"])
def energy_plot():
"""return a plot with the cumulative energy use"""
if not request.is_json:
raise ValueError("content type should be json")
config = request.get_json(force=True)
try:
energy_use = energy_use_plot_from_json(config)
except ValueError as valerr:
abort(400, description=str(valerr))
return
except Exception as e:
abort(500, description=str(e))
return
return plot.fig2response(energy_use)
[docs]@app.route("/equipment_plot", methods=["POST"])
def equipment_plot():
"""return a planning"""
if not request.is_json:
abort(400, description="content type should be json")
return
config = request.get_json(force=True)
try:
equipment_plot = equipment_plot_from_json(config)
except ValueError as valerr:
abort(400, description=str(valerr))
return
except RuntimeError as runerr:
abort(400, description=str(runerr))
return
except Exception as e:
abort(500, description=str(e))
return
return plot.fig2response(equipment_plot)
[docs]def update_end_time(event, env):
event.end_time = env.now
[docs]def interrupt_processes(event, activities, env):
for activity in activities:
process = activity["process"]
if not process.processed:
process.interrupt()
activity["activity_log"].log_entry(
"interrupted by 100 year timeout",
env.now,
-1,
None,
activity["activity_log"].id,
)
[docs]def simulate_from_json(config, tmp_path="static"):
"""Create a simulation and run it, based on a json input file.
The optional tmp_path parameter should only be used for unit tests."""
if "initialTime" in config:
try:
simulation_start = datetime.datetime.fromtimestamp(config["initialTime"])
except OSError:
# on windows in certain python versions 0 is not a good start date
simulation_start = datetime.datetime(2000, 1, 1)
else:
simulation_start = datetime.datetime.now()
env = simpy.Environment(initial_time=time.mktime(simulation_start.timetuple()))
simulation = model.Simulation(
env=env,
name="server simulation",
sites=config["sites"],
equipment=config["equipment"],
activities=config["activities"],
)
processes = [activity["process"] for activity in simulation.activities.values()]
for process in processes:
process.end_time = None
process.callbacks.append(functools.partial(update_end_time, env=env))
timeout_100years = env.timeout(100 * 365 * 24 * 3600)
timeout_100years.callbacks.append(
functools.partial(
interrupt_processes, activities=simulation.activities.values(), env=env
)
)
try:
env.run()
completion_time = max(process.end_time for process in processes)
except simpy.Interrupt:
completion_time = env.now
result = simulation.get_logging()
result["completionTime"] = completion_time
costs = 0
for piece in simulation.equipment:
if isinstance(simulation.equipment[piece], core.HasCosts):
costs += simulation.equipment[piece].cost
result["completionCost"] = costs
if "saveSimulation" in config and config["saveSimulation"]:
save_simulation(config, simulation, tmp_path=tmp_path)
return result
[docs]def save_simulation(config, simulation, tmp_path=""):
"""Save the given simulation. The config is used to produce an md5 hash of its text representation.
This hash is used as a prefix for the files which are written. This ensures that simulations with the same config
are written to the same files (although it is not a completely foolproof method, for example changing an equipment
or location name, does not alter the simulation result, but does alter the config file).
The optional tmp_path parameter should only be used for unit tests."""
# TODO: replace traversing static_folder pathlib path
config_text = json.dumps(config, sort_keys=True).encode("utf-8")
hash = hashlib.md5(config_text).hexdigest()
file_prefix = hash + "_"
path = str(tmp_path)
if len(path) != 0 and str(path)[-1] != "/":
path += "/"
# TODO: use pathlib
path += "simulations/"
os.makedirs(
path, exist_ok=True
) # create the simulations directory if it does not yet exist
savesim.save_logs(simulation, path, file_prefix)
[docs]def energy_use_plot_from_json(jsonFile):
"""Create a Gantt chart, based on a json input file"""
vessels = []
for item in jsonFile["equipment"]:
if item["features"]:
vessel = type("Vessel", (core.Identifiable, core.Log), {})
vessel = vessel(**{"env": None, "name": item["id"]})
for feature in item["features"]:
vessel.log_entry(
log=feature["properties"]["message"],
t=feature["properties"]["time"],
value=feature["properties"]["value"],
geometry_log=feature["geometry"]["coordinates"],
ActivityID=None,
)
vessels.append(vessel)
return plot.energy_use_time(vessels, web=True)
[docs]def equipment_plot_from_json(jsonFile):
"""Create a Gantt chart, based on a json input file"""
vessels = []
for item in jsonFile["equipment"]:
if item["features"]:
vessel = type("Vessel", (core.Identifiable, core.Log), {})
vessel = vessel(**{"env": None, "name": item["id"]})
for feature in item["features"]:
vessel.log_entry(
log=feature["properties"]["message"],
t=feature["properties"]["time"],
value=feature["properties"]["value"],
geometry_log=feature["geometry"]["coordinates"],
ActivityID=None,
)
vessels.append(vessel)
return plot.equipment_plot_json(vessels, web=True)