You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

372 lines
12 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. #!/usr/bin/env python3
  2. import os
  3. from flask import Flask, render_template, session, request, abort, redirect, url_for, jsonify
  4. from flask_sqlalchemy import SQLAlchemy
  5. from sqlalchemy import inspect, and_
  6. from flask_wtf import FlaskForm
  7. import bcrypt
  8. from wtforms_alchemy import model_form_factory
  9. from flask_migrate import Migrate
  10. from uuid import uuid4
  11. import csv
  12. import validate
  13. # from datetime import date
  14. app = Flask(__name__)
  15. app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
  16. app.secret_key = '98d31240f9fbe14c8083586db49c19c3a8d3f726'
  17. db: SQLAlchemy = SQLAlchemy()
  18. migrate = Migrate()
  19. db.init_app(app)
  20. migrate.init_app(app, db)
  21. BaseModelForm = model_form_factory(FlaskForm)
  22. class ModelForm(BaseModelForm):
  23. @classmethod
  24. def get_session(cls):
  25. return db.session
  26. class Admin(db.Model):
  27. id = db.Column(db.Integer, primary_key=True)
  28. username = db.Column(db.String, unique=True, nullable=False)
  29. password = db.Column(db.String, nullable=False)
  30. query: db.Query
  31. @classmethod
  32. def generate_password(cls, pw: str):
  33. return bcrypt.hashpw(pw, bcrypt.gensalt(12))
  34. @classmethod
  35. def authenticate(cls, username: str, pw: str):
  36. user = Admin.query.filter_by(username=username).one_or_none()
  37. if user and bcrypt.checkpw(pw, user.password):
  38. session['admin'] = user.username
  39. return user
  40. else:
  41. return None
  42. @classmethod
  43. def exists(cls):
  44. user = Admin.query.one_or_none()
  45. return True if user else False
  46. @classmethod
  47. def authorize(cls):
  48. if "admin" not in session:
  49. return redirect(url_for("admin_login"))
  50. else:
  51. return None
  52. def object_as_dict(obj):
  53. return {c.key: getattr(obj, c.key)
  54. for c in inspect(obj).mapper.column_attrs}
  55. class Chemical(db.Model):
  56. query: db.Query
  57. id = db.Column(db.Integer, primary_key=True)
  58. standard_grp = db.Column(db.String, nullable=False)
  59. uploaded_by = db.Column(db.String, nullable=False)
  60. # all fields after here are included in the database
  61. chemical_db_id = db.Column(db.String)
  62. library = db.Column(db.String)
  63. # important fields
  64. name = db.Column(db.String, nullable=False)
  65. formula = db.Column(db.String, nullable=False)
  66. mass = db.Column(db.Float, nullable=False)
  67. pubchem_cid = db.Column(db.Integer)
  68. pubmed_refcount = db.Column(db.Integer)
  69. standard_class = db.Column(db.String)
  70. inchikey = db.Column(db.String)
  71. inchikey14 = db.Column(db.String)
  72. final_mz = db.Column(db.Float, nullable=False)
  73. final_rt = db.Column(db.Float, nullable=False)
  74. final_adduct = db.Column(db.String, nullable=False)
  75. adduct = db.Column(db.String)
  76. detected_adducts = db.Column(db.String)
  77. adduct_calc_mz = db.Column(db.String)
  78. msms_detected = db.Column(db.Boolean)
  79. msms_purity = db.Column(db.Float)
  80. # serialized into datetime.date
  81. createdAt = db.Column(db.Date)
  82. class ChemicalForm(ModelForm):
  83. class Meta:
  84. csrf = False
  85. model = Chemical
  86. # Error Handlers
  87. @app.errorhandler(404)
  88. def handler_404(msg):
  89. return render_template("errors/404.html")
  90. @app.errorhandler(403)
  91. def handler_403(msg):
  92. return render_template("errors/403.html")
  93. # Admin routes
  94. @app.route('/admin')
  95. def admin_root():
  96. if login := Admin.authorize():
  97. return login
  98. return render_template("admin.html", user=session.get("admin"))
  99. @app.route('/admin/create', methods=['GET', 'POST'])
  100. def admin_create():
  101. if Admin.exists():
  102. if login := Admin.authorize():
  103. return login
  104. if request.method == "GET":
  105. return render_template("register.html")
  106. else:
  107. username, pw = request.form.get(
  108. 'username'), request.form.get('password')
  109. if username is None or pw is None:
  110. return render_template("register.html", fail="Invalid Input.")
  111. elif db.session.execute(db.select(Admin).filter_by(username=username)).fetchone():
  112. return render_template("register.html", fail="Username already exists.")
  113. else:
  114. db.session.add(
  115. Admin(username=username, password=Admin.generate_password(pw)))
  116. db.session.commit()
  117. return render_template("register.html", success=True)
  118. @app.route('/admin/login', methods=['GET', 'POST'])
  119. def admin_login():
  120. if request.method == "POST":
  121. username, pw = request.form.get(
  122. 'username', ''), request.form.get('password', '')
  123. if Admin.authenticate(username, pw):
  124. return render_template("login.html", success=True)
  125. else:
  126. return render_template("login.html", fail="Could not authenticate.")
  127. else:
  128. return render_template("login.html")
  129. @app.route('/admin/logout', methods=['GET'])
  130. def admin_logout():
  131. if "admin" in session:
  132. session.pop('admin')
  133. return redirect(url_for('home'))
  134. @app.route("/")
  135. def home():
  136. if Admin.exists():
  137. return render_template("index.html")
  138. else:
  139. return redirect(url_for("admin_create"))
  140. # Routes for CRUD operations on chemicals
  141. @app.route("/chemical/create", methods=['GET', 'POST'])
  142. def chemical_create():
  143. if not session.get('admin'):
  144. abort(403)
  145. if request.method == "POST":
  146. form = ChemicalForm(**request.form)
  147. if form.validate():
  148. new_chemical = Chemical(**form.data)
  149. db.session.add(new_chemical)
  150. db.session.commit()
  151. return render_template("create_chemical.html", form=ChemicalForm(), success=True)
  152. else:
  153. return render_template("create_chemical.html", form=form, invalid=True), 400
  154. else:
  155. form = ChemicalForm()
  156. return render_template("create_chemical.html", form=form)
  157. @app.route("/chemical/<int:id>/update", methods=['GET', 'POST'])
  158. def chemical_update(id: int):
  159. if not session.get('admin'):
  160. abort(403)
  161. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  162. dct = object_as_dict(current_chemical)
  163. if request.method == "POST":
  164. form = ChemicalForm(**request.form)
  165. if form.validate():
  166. # take the row with id and update it.
  167. for k in form.data:
  168. setattr(current_chemical, k, form.data[k])
  169. db.session.commit()
  170. return render_template("create_chemical.html", form=form, success=True, id=id)
  171. else:
  172. form = ChemicalForm(**dct)
  173. return render_template("create_chemical.html", form=form, invalid=True, id=id), 400
  174. else:
  175. form = ChemicalForm(**dct)
  176. return render_template("create_chemical.html", form=form, id=id)
  177. @app.route("/chemical/<int:id>/delete")
  178. def chemical_delete(id: int):
  179. if not session.get('admin'):
  180. abort(403)
  181. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  182. db.session.delete(current_chemical)
  183. db.session.commit()
  184. return render_template("delete_chemical.html", id=id)
  185. @app.route("/chemical/<int:id>/view")
  186. def chemical_view(id: int):
  187. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  188. dct = object_as_dict(current_chemical)
  189. return render_template("view_chemical.html", id=id, chemical=dct)
  190. @app.route("/chemical/all")
  191. def chemical_all():
  192. if not session.get('admin'):
  193. abort(403)
  194. result: list[Chemical] = Chemical.query.all()
  195. data = []
  196. for x in result:
  197. data.append({c.name: getattr(x, c.name) for c in x.__table__.columns})
  198. return jsonify(data)
  199. @app.route("/chemical/search", methods=["POST"])
  200. def search_api():
  201. query = request.json
  202. if query is None:
  203. return jsonify([])
  204. for field in query:
  205. query[field] = float(query[field])
  206. mz_min, mz_max = query.get('mz_min'), query.get('mz_max')
  207. rt_min, rt_max = query.get('rt_min'), query.get('rt_max')
  208. year_max, month_max, day_max = int(query.get(
  209. 'year_max')), int(query.get('month_max')), int(query.get('day_max'))
  210. try:
  211. mz_filter = and_(mz_max > Chemical.final_mz,
  212. Chemical.final_mz > mz_min)
  213. rt_filter = and_(rt_max > Chemical.final_rt,
  214. Chemical.final_rt > rt_min)
  215. # date_filter = date(year_max, month_max, day_max) >= Chemical.createdAt
  216. except ValueError as e:
  217. return jsonify({"error": str(e)}), 400
  218. result = Chemical.query.filter(
  219. and_(mz_filter, rt_filter)
  220. ).limit(20).all()
  221. data = []
  222. for x in result:
  223. data.append({"url": url_for("chemical_view", id=x.id),
  224. "name": x.name, "mz": x.final_mz, "rt": x.final_rt})
  225. return jsonify(data)
  226. # Utilities for doing add and search operations in batch
  227. # no file over 3MB is allowed.
  228. app.config['MAX_CONTENT_LENGTH'] = 3 * 1000 * 1000
  229. @app.route("/chemical/batchadd", methods=["GET", "POST"])
  230. def batch_add_request():
  231. if not session.get('admin'):
  232. abort(403)
  233. if request.method == "POST":
  234. if "csv" not in request.files or request.files["csv"].filename == '':
  235. return render_template("batchadd.html", invalid="Blank file included")
  236. # save the file to RAM
  237. file = request.files["csv"]
  238. os.makedirs("/tmp/walkerdb", exist_ok=True)
  239. filename = os.path.join("/tmp/walkerdb", str(uuid4()))
  240. file.save(filename)
  241. # perform cleanup regardless of what happens.
  242. def cleanup(): return os.remove(filename)
  243. # read it as a csv
  244. with open(filename, "r") as csvfile:
  245. reader = csv.DictReader(csvfile, delimiter="\t")
  246. results, error = validate.validate_insertion_csv_fields(reader)
  247. if error:
  248. cleanup()
  249. return render_template("batchadd.html", invalid=error)
  250. else:
  251. chemicals = [Chemical(**result) for result in results]
  252. db.session.add_all(chemicals)
  253. db.session.commit()
  254. cleanup()
  255. return render_template("batchadd.html", success=True)
  256. else:
  257. return render_template("batchadd.html")
  258. @app.route("/chemical/batch", methods=["GET", "POST"])
  259. def batch_query_request():
  260. if not session.get('admin'):
  261. abort(403)
  262. if request.method == "POST":
  263. if "csv" not in request.files or request.files["csv"].filename == '':
  264. return render_template("batchadd.html", invalid="Blank file included")
  265. # save the file to RAM
  266. file = request.files["csv"]
  267. os.makedirs("/tmp/walkerdb", exist_ok=True)
  268. filename = os.path.join("/tmp/walkerdb", str(uuid4()))
  269. file.save(filename)
  270. # perform cleanup regardless of what happens.
  271. def cleanup(): return os.remove(filename)
  272. # read it as a csv
  273. with open(filename, "r") as csvfile:
  274. reader = csv.DictReader(csvfile)
  275. queries, error = validate.validate_query_csv_fields(reader)
  276. if error:
  277. cleanup()
  278. return render_template("batchquery.html", invalid=error)
  279. else:
  280. # generate the queries here.
  281. data = []
  282. for query in queries:
  283. mz_filter = and_(query["mz_max"] > Chemical.final_mz,
  284. Chemical.final_mz > query["mz_min"])
  285. rt_filter = and_(query["rt_max"] > Chemical.final_rt,
  286. Chemical.final_rt > query["rt_min"])
  287. # date_filter = query["date"] >= Chemical.createdAt
  288. result = Chemical.query.filter(
  289. and_(mz_filter, rt_filter)
  290. ).limit(5).all()
  291. hits = []
  292. for x in result:
  293. hits.append({"url": url_for("chemical_view", id=x.id),
  294. "name": x.name, "mz": x.final_mz, "rt": x.final_rt})
  295. data.append(dict(
  296. query=query,
  297. hits=hits,
  298. ))
  299. cleanup()
  300. return render_template("batchquery.html", success=True, data=data)
  301. return render_template("batchquery.html")
  302. @app.route("/search")
  303. def search():
  304. return render_template("search.html")
  305. if __name__ == "__main__":
  306. with app.app_context():
  307. db.create_all()
  308. app.run(debug=True)