You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

438 lines
15 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. #!/usr/bin/env python3
  2. import os
  3. from flask import Flask, render_template, session, request, abort, redirect, url_for, jsonify
  4. from flask_sqlalchemy import SQLAlchemy
  5. from sqlalchemy import inspect, and_
  6. from flask_wtf import FlaskForm
  7. import bcrypt
  8. from wtforms_alchemy import model_form_factory
  9. from flask_migrate import Migrate
  10. from uuid import uuid4
  11. import csv
  12. import validate
  13. import secrets
  14. from dotenv import load_dotenv
  15. load_dotenv()
  16. # from datetime import date
  17. app = Flask(__name__)
  18. app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
  19. app.secret_key = os.getenv('SECRET_KEY', secrets.token_hex(16))
  20. db: SQLAlchemy = SQLAlchemy()
  21. migrate = Migrate()
  22. db.init_app(app)
  23. migrate.init_app(app, db)
  24. # Helper Methods
  25. def object_as_dict(obj):
  26. return {c.key: getattr(obj, c.key)
  27. for c in inspect(obj).mapper.column_attrs}
  28. # Model Forms
  29. BaseModelForm = model_form_factory(FlaskForm)
  30. class ModelForm(BaseModelForm):
  31. @classmethod
  32. def get_session(cls):
  33. return db.session
  34. class User(db.Model):
  35. id = db.Column(db.Integer, primary_key=True)
  36. username = db.Column(db.String, unique=True, nullable=False)
  37. email = db.Column(db.String, nullable=False)
  38. name = db.Column(db.String, nullable=False)
  39. password = db.Column(db.String, nullable=False)
  40. institution = db.Column(db.String, nullable=False)
  41. position = db.Column(db.String, nullable=False)
  42. admin = db.Column(db.Boolean)
  43. # for type annotations
  44. query: db.Query
  45. @classmethod
  46. def generate_password(cls, pw: str):
  47. return bcrypt.hashpw(pw, bcrypt.gensalt(12))
  48. @classmethod
  49. def authenticate(cls, username: str, pw: str):
  50. user = User.query.filter_by(username=username).one_or_none()
  51. if user and bcrypt.checkpw(pw, user.password):
  52. session['user'] = user.username
  53. if user.admin:
  54. session['admin'] = user.username
  55. return user
  56. else:
  57. return None
  58. @classmethod
  59. def admin_exists(cls):
  60. user = User.query.filter_by(admin=True).first()
  61. return True if user else False
  62. @classmethod
  63. def authorize_or_redirect(cls, admin=True):
  64. if (admin and "admin" not in session) or "user" not in session:
  65. return redirect(url_for("login"))
  66. else:
  67. return None
  68. class Chemical(db.Model):
  69. query: db.Query
  70. id = db.Column(db.Integer, primary_key=True)
  71. person_id = db.Column(db.Integer, nullable=False)
  72. standard_grp = db.Column(db.String, nullable=False)
  73. # all fields after here are included in the database
  74. chemical_db_id = db.Column(db.String)
  75. library = db.Column(db.String)
  76. # important fields
  77. metabolite_name = db.Column(db.String, nullable=False)
  78. formula = db.Column(db.String, nullable=False)
  79. mass = db.Column(db.Float, nullable=False)
  80. pubchem_cid = db.Column(db.Integer)
  81. pubmed_refcount = db.Column(db.Integer)
  82. standard_class = db.Column(db.String)
  83. inchikey = db.Column(db.String, nullable=False)
  84. inchikey14 = db.Column(db.String)
  85. final_mz = db.Column(db.Float, nullable=False)
  86. final_rt = db.Column(db.Float, nullable=False)
  87. final_adduct = db.Column(db.String, nullable=False)
  88. adduct = db.Column(db.String)
  89. detected_adducts = db.Column(db.String)
  90. adduct_calc_mz = db.Column(db.String)
  91. msms_detected = db.Column(db.Boolean, nullable=False)
  92. msms_purity = db.Column(db.Float)
  93. # serialized into datetime.date
  94. createdAt = db.Column(db.Date)
  95. class ChemicalForm(ModelForm):
  96. class Meta:
  97. csrf = False
  98. model = Chemical
  99. # Error Handlers
  100. @app.errorhandler(404)
  101. def handler_404(msg):
  102. return render_template("errors/404.html")
  103. @app.errorhandler(403)
  104. def handler_403(msg):
  105. return render_template("errors/403.html")
  106. # Admin routes
  107. @app.route('/dashboard')
  108. def admin_root():
  109. user = User.query.filter_by(username=session.get('user')).one_or_404()
  110. if 'admin' in session:
  111. return render_template("admin.html", user=user)
  112. if 'user' in session:
  113. return render_template("user.html", user=user)
  114. return User.authorize_or_redirect(admin=False) or ""
  115. @app.route('/accounts/create', methods=['GET', 'POST'])
  116. def accounts_create():
  117. if User.admin_exists():
  118. if login := User.authorize_or_redirect():
  119. return login
  120. if request.method == "GET":
  121. return render_template("register.html")
  122. else:
  123. username, pw = request.form.get(
  124. 'username'), request.form.get('password')
  125. if username is None or pw is None:
  126. return render_template("register.html", fail="Invalid Input.")
  127. elif db.session.execute(db.select(User).filter_by(username=username)).fetchone():
  128. return render_template("register.html", fail="Username already exists.")
  129. else:
  130. # because the IDE complains about type mismatches
  131. form = {} | request.form
  132. form['password'] = User.generate_password(pw)
  133. form['admin'] = (True if form.get('admin') == 'y' else False)
  134. form.pop('reconfirm')
  135. user = User(**form)
  136. db.session.add(user)
  137. db.session.commit()
  138. return render_template("register.html", success=True)
  139. @app.route('/accounts/edit', methods=['GET', 'POST'])
  140. def accounts_edit():
  141. if login := User.authorize_or_redirect(admin=False):
  142. return login
  143. user = User.query.filter_by(username=session.get('user')).one_or_404()
  144. if request.method == "GET":
  145. return render_template("account_edit.html", user=object_as_dict(user))
  146. else:
  147. dct = object_as_dict(user)
  148. # update all of the changes
  149. for key in request.form:
  150. if key in dct:
  151. setattr(user, key, request.form[key])
  152. db.session.commit()
  153. return render_template("account_edit.html", user=object_as_dict(user), success=True)
  154. @app.route('/accounts/view')
  155. def accounts_all():
  156. if "admin" not in session:
  157. abort(403)
  158. users = [object_as_dict(u) for u in User.query.all()]
  159. for u in users:
  160. u.pop("password")
  161. return jsonify(users)
  162. @app.route('/accounts/view/<int:id>')
  163. def accounts_view(id):
  164. user = User.query.filter_by(id=id).one_or_404()
  165. return render_template("account_view.html", user=object_as_dict(user))
  166. @app.route('/accounts/login', methods=['GET', 'POST'])
  167. def login():
  168. if request.method == "POST":
  169. username, pw = request.form.get(
  170. 'username', ''), request.form.get('password', '')
  171. if User.authenticate(username, pw):
  172. return render_template("login.html", success=True)
  173. else:
  174. return render_template("login.html", fail="Could not authenticate.")
  175. else:
  176. return render_template("login.html")
  177. @app.route('/accounts/logout', methods=['GET'])
  178. def logout():
  179. if "admin" in session:
  180. session.pop('admin')
  181. if "user" in session:
  182. session.pop('user')
  183. return redirect(url_for('home'))
  184. @app.route("/")
  185. def home():
  186. if User.admin_exists():
  187. return render_template("index.html")
  188. else:
  189. return redirect(url_for("accounts_create"))
  190. # Routes for CRUD operations on chemicals
  191. @app.route("/chemical/create", methods=['GET', 'POST'])
  192. def chemical_create():
  193. if not session.get('admin'):
  194. abort(403)
  195. user = User.query.filter_by(username=session.get('user')).one_or_404()
  196. if request.method == "POST":
  197. form = ChemicalForm(**(request.form | {"person_id": user.id}))
  198. if form.validate():
  199. new_chemical = Chemical(**form.data)
  200. db.session.add(new_chemical)
  201. db.session.commit()
  202. return render_template("create_chemical.html", form=ChemicalForm(), user=object_as_dict(user), success=True)
  203. else:
  204. return render_template("create_chemical.html", form=form, invalid=True), 400
  205. else:
  206. form = ChemicalForm(person_id=user.id)
  207. return render_template("create_chemical.html", form=form, user=object_as_dict(user))
  208. @app.route("/chemical/<int:id>/update", methods=['GET', 'POST'])
  209. def chemical_update(id: int):
  210. if not session.get('admin'):
  211. abort(403)
  212. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  213. dct = object_as_dict(current_chemical)
  214. if request.method == "POST":
  215. form = ChemicalForm(**request.form)
  216. if form.validate():
  217. # take the row with id and update it.
  218. for k in form.data:
  219. setattr(current_chemical, k, form.data[k])
  220. db.session.commit()
  221. return render_template("create_chemical.html", form=form, success=True, id=id)
  222. else:
  223. form = ChemicalForm(**dct)
  224. return render_template("create_chemical.html", form=form, invalid=True, id=id), 400
  225. else:
  226. form = ChemicalForm(**dct)
  227. return render_template("create_chemical.html", form=form, id=id)
  228. @app.route("/chemical/<int:id>/delete")
  229. def chemical_delete(id: int):
  230. if not session.get('admin'):
  231. abort(403)
  232. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  233. db.session.delete(current_chemical)
  234. db.session.commit()
  235. return render_template("delete_chemical.html", id=id)
  236. @app.route("/chemical/<int:id>/view")
  237. def chemical_view(id: int):
  238. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  239. dct = object_as_dict(current_chemical)
  240. return render_template("view_chemical.html", id=id, chemical=dct)
  241. @app.route("/chemical/all")
  242. def chemical_all():
  243. if not session.get('admin'):
  244. abort(403)
  245. result: list[Chemical] = Chemical.query.all()
  246. data = []
  247. for x in result:
  248. data.append({c.name: getattr(x, c.name) for c in x.__table__.columns})
  249. return jsonify(data)
  250. @app.route("/chemical/search", methods=["POST"])
  251. def search_api():
  252. query = request.json
  253. if query is None:
  254. return jsonify([])
  255. for field in query:
  256. query[field] = float(query[field])
  257. mz_min, mz_max = query.get('mz_min'), query.get('mz_max')
  258. rt_min, rt_max = query.get('rt_min'), query.get('rt_max')
  259. year_max, month_max, day_max = int(query.get(
  260. 'year_max')), int(query.get('month_max')), int(query.get('day_max'))
  261. try:
  262. mz_filter = and_(mz_max > Chemical.final_mz,
  263. Chemical.final_mz > mz_min)
  264. rt_filter = and_(rt_max > Chemical.final_rt,
  265. Chemical.final_rt > rt_min)
  266. # date_filter = date(year_max, month_max, day_max) >= Chemical.createdAt
  267. except ValueError as e:
  268. return jsonify({"error": str(e)}), 400
  269. result = Chemical.query.filter(
  270. and_(mz_filter, rt_filter)
  271. ).limit(20).all()
  272. data = []
  273. for x in result:
  274. data.append({"url": url_for("chemical_view", id=x.id),
  275. "name": x.metabolite_name, "mz": x.final_mz, "rt": x.final_rt})
  276. return jsonify(data)
  277. # Utilities for doing add and search operations in batch
  278. # no file over 3MB is allowed.
  279. app.config['MAX_CONTENT_LENGTH'] = 3 * 1000 * 1000
  280. @app.route("/chemical/batchadd", methods=["GET", "POST"])
  281. def batch_add_request():
  282. if not session.get('admin'):
  283. abort(403)
  284. user = User.query.filter_by(username=session.get('user')).one_or_404()
  285. if request.method == "POST":
  286. if "input" not in request.files or request.files["input"].filename == '':
  287. return render_template("batchadd.html", invalid="Blank file included")
  288. # save the file to RAM
  289. file = request.files["input"]
  290. os.makedirs("/tmp/walkerdb", exist_ok=True)
  291. filename = os.path.join("/tmp/walkerdb", str(uuid4()))
  292. file.save(filename)
  293. # perform cleanup regardless of what happens.
  294. def cleanup(): return os.remove(filename)
  295. # read it as a csv
  296. with open(filename, "r") as csvfile:
  297. reader = csv.DictReader(csvfile, delimiter="\t")
  298. results, error = validate.validate_insertion_csv_fields(reader)
  299. if error:
  300. cleanup()
  301. return render_template("batchadd.html", invalid=error)
  302. else:
  303. chemicals = [Chemical(**result, person_id=user.id)
  304. for result in results]
  305. db.session.add_all(chemicals)
  306. db.session.commit()
  307. cleanup()
  308. return render_template("batchadd.html", success=True)
  309. else:
  310. return render_template("batchadd.html")
  311. # regular users can batch search.
  312. @app.route("/chemical/batch", methods=["GET", "POST"])
  313. def batch_query_request():
  314. if not session.get('user'):
  315. abort(403)
  316. if request.method == "POST":
  317. if "input" not in request.files or request.files["input"].filename == '':
  318. return render_template("batchadd.html", invalid="Blank file included")
  319. # save the file to RAM
  320. file = request.files["input"]
  321. os.makedirs("/tmp/walkerdb", exist_ok=True)
  322. filename = os.path.join("/tmp/walkerdb", str(uuid4()))
  323. file.save(filename)
  324. # perform cleanup regardless of what happens.
  325. def cleanup(): return os.remove(filename)
  326. # read it as a csv
  327. with open(filename, "r") as csvfile:
  328. reader = csv.DictReader(csvfile, delimiter="\t")
  329. queries, error = validate.validate_query_csv_fields(reader)
  330. if error:
  331. cleanup()
  332. return render_template("batchquery.html", invalid=error)
  333. else:
  334. # generate the queries here.
  335. data = []
  336. for query in queries:
  337. mz_filter = and_(query["mz_max"] > Chemical.final_mz,
  338. Chemical.final_mz > query["mz_min"])
  339. rt_filter = and_(query["rt_max"] > Chemical.final_rt,
  340. Chemical.final_rt > query["rt_min"])
  341. # date_filter = query["date"] >= Chemical.createdAt
  342. result = Chemical.query.filter(
  343. and_(mz_filter, rt_filter)
  344. ).limit(5).all()
  345. hits = []
  346. for x in result:
  347. hits.append({"url": url_for("chemical_view", id=x.id),
  348. "name": x.metabolite_name, "mz": x.final_mz, "rt": x.final_rt})
  349. data.append(dict(
  350. query=query,
  351. hits=hits,
  352. ))
  353. cleanup()
  354. return render_template("batchquery.html", success=True, data=data)
  355. return render_template("batchquery.html")
  356. @app.route("/search")
  357. def search():
  358. return render_template("search.html")
  359. if __name__ == "__main__":
  360. with app.app_context():
  361. db.create_all()
  362. app.run(debug=True)