You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
8.5 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. #!/usr/bin/env python3
  2. from flask import Flask, render_template, session, request, abort, redirect, url_for, jsonify
  3. from flask_sqlalchemy import SQLAlchemy
  4. from sqlalchemy import inspect, or_, and_
  5. from flask_wtf import FlaskForm
  6. import bcrypt
  7. from wtforms_alchemy import model_form_factory
  8. db: SQLAlchemy = SQLAlchemy()
  9. app = Flask(__name__)
  10. app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
  11. app.secret_key = '98d31240f9fbe14c8083586db49c19c3a8d3f726'
  12. BaseModelForm = model_form_factory(FlaskForm)
  13. class ModelForm(BaseModelForm):
  14. @classmethod
  15. def get_session(self):
  16. return db.session
  17. class Admin(db.Model):
  18. id = db.Column(db.Integer, primary_key=True)
  19. username = db.Column(db.String, unique=True, nullable=False)
  20. password = db.Column(db.String, nullable=False)
  21. query: db.Query
  22. @classmethod
  23. def generate_password(cls, pw: str):
  24. return bcrypt.hashpw(pw, bcrypt.gensalt(12))
  25. @classmethod
  26. def authenticate(cls, username: str, pw: str):
  27. user = Admin.query.filter_by(username=username).one_or_none()
  28. if user and bcrypt.checkpw(pw, user.password):
  29. session['admin'] = user.username
  30. return user
  31. else:
  32. return None
  33. @classmethod
  34. def exists(cls):
  35. user = Admin.query.one_or_none()
  36. return True if user else False
  37. @classmethod
  38. def authorize(cls):
  39. if not session.get('admin'):
  40. return redirect(url_for("admin_login"))
  41. def object_as_dict(obj):
  42. return {c.key: getattr(obj, c.key)
  43. for c in inspect(obj).mapper.column_attrs}
  44. class Chemical(db.Model):
  45. query: db.Query
  46. id = db.Column(db.Integer, primary_key=True)
  47. # all fields after here are included in the database
  48. chemical_db_id = db.Column(db.String)
  49. library = db.Column(db.String)
  50. # important fields
  51. name = db.Column(db.String, nullable=False)
  52. formula = db.Column(db.String, nullable=False)
  53. mass = db.Column(db.Float, nullable=False)
  54. pubchem_cid = db.Column(db.Integer)
  55. pubmed_refcount = db.Column(db.Integer)
  56. standard_class = db.Column(db.String)
  57. inchikey = db.Column(db.String)
  58. inchikey14 = db.Column(db.String)
  59. final_mz = db.Column(db.Float, nullable=False)
  60. final_rt = db.Column(db.Float, nullable=False)
  61. final_adduct = db.Column(db.String)
  62. adduct = db.Column(db.String)
  63. detected_adducts = db.Column(db.String)
  64. adduct_calc_mz = db.Column(db.String)
  65. msms_detected = db.Column(db.Boolean)
  66. msms_purity = db.Column(db.Float)
  67. class ChemicalForm(ModelForm):
  68. class Meta:
  69. csrf = False
  70. model = Chemical
  71. # Error Handlers
  72. @app.errorhandler(404)
  73. def handler_404(msg):
  74. return render_template("errors/404.html")
  75. @app.errorhandler(403)
  76. def handler_403(msg):
  77. return render_template("errors/403.html")
  78. # Admin routes
  79. @app.route('/admin')
  80. def admin_root():
  81. if login := Admin.authorize():
  82. return login
  83. return render_template("admin.html", user=session.get("admin"))
  84. @app.route('/admin/create', methods=['GET', 'POST'])
  85. def admin_create():
  86. if Admin.exists():
  87. if login := Admin.authorize():
  88. return login
  89. if request.method == "GET":
  90. return render_template("register.html")
  91. else:
  92. username, pw = request.form.get('username'), request.form.get('password')
  93. if username is None or pw is None:
  94. return render_template("register.html", fail="Invalid Input.")
  95. elif db.session.execute(db.select(Admin).filter_by(username=username)).fetchone():
  96. return render_template("register.html", fail="Username already exists.")
  97. else:
  98. db.session.add(Admin(username=username, password=Admin.generate_password(pw)))
  99. db.session.commit()
  100. return render_template("register.html", success=True)
  101. @app.route('/admin/login', methods=['GET', 'POST'])
  102. def admin_login():
  103. if request.method == "POST":
  104. username, pw = request.form.get('username', ''), request.form.get('password', '')
  105. if Admin.authenticate(username, pw):
  106. return render_template("login.html", success=True)
  107. else:
  108. return render_template("login.html", fail="Could not authenticate.")
  109. else:
  110. return render_template("login.html")
  111. @app.route('/admin/logout', methods=['GET'])
  112. def admin_logout():
  113. session.pop('admin')
  114. return redirect(url_for('home'))
  115. @app.route("/")
  116. def home():
  117. if Admin.exists():
  118. return render_template("index.html")
  119. else:
  120. return redirect(url_for("admin_create"))
  121. # Routes for CRUD operations on chemicals
  122. @app.route("/chemical/create", methods=['GET', 'POST'])
  123. def chemical_create():
  124. if not session.get('admin'):
  125. abort(403)
  126. if request.method == "POST":
  127. form = ChemicalForm(**request.form)
  128. if form.validate():
  129. new_chemical = Chemical(**form.data)
  130. db.session.add(new_chemical)
  131. db.session.commit()
  132. return render_template("create_chemical.html", form=ChemicalForm(), success=True)
  133. else:
  134. return render_template("create_chemical.html", form=form, invalid=True)
  135. else:
  136. form = ChemicalForm()
  137. return render_template("create_chemical.html", form=form)
  138. @app.route("/chemical/<int:id>/update", methods=['GET', 'POST'])
  139. def chemical_update(id: int):
  140. if not session.get('admin'):
  141. abort(403)
  142. current_chemical:Chemical = Chemical.query.filter_by(id=id).one_or_404()
  143. dct = object_as_dict(current_chemical)
  144. if request.method == "POST":
  145. form = ChemicalForm(**request.form)
  146. if form.validate():
  147. # take the row with id and update it.
  148. for k in form.data:
  149. setattr(current_chemical, k, form.data[k])
  150. db.session.commit()
  151. return render_template("create_chemical.html", form=form, success=True, id=id)
  152. else:
  153. form = ChemicalForm(**dct)
  154. return render_template("create_chemical.html", form=form, invalid=True, id=id)
  155. else:
  156. form = ChemicalForm(**dct)
  157. return render_template("create_chemical.html", form=form, id=id)
  158. @app.route("/chemical/<int:id>/delete")
  159. def chemical_delete(id: int):
  160. if not session.get('admin'):
  161. abort(403)
  162. current_chemical:Chemical = Chemical.query.filter_by(id=id).one_or_404()
  163. db.session.delete(current_chemical)
  164. db.session.commit()
  165. return render_template("delete_chemical.html", id=id)
  166. @app.route("/chemical/<int:id>/view")
  167. def chemical_view(id: int):
  168. current_chemical:Chemical = Chemical.query.filter_by(id=id).one_or_404()
  169. dct = object_as_dict(current_chemical)
  170. return render_template("view_chemical.html", id=id, chemical=dct)
  171. @app.route("/chemical/all")
  172. def chemical_all():
  173. if not session.get('admin'):
  174. abort(403)
  175. result = Chemical.query.all()
  176. data = []
  177. for x in result:
  178. data.append({"url": url_for("chemical_view", id=x.id), "name": x.name, "mz": x.final_mz, "rt": x.final_rt})
  179. return jsonify(data)
  180. @app.route("/chemical/search")
  181. def search_api():
  182. mz_min, mz_max = request.args.get('mz_min'), request.args.get('mz_max')
  183. rt_min, rt_max = request.args.get('rt_min'), request.args.get('rt_max')
  184. if (mz_min is None and mz_max is None) or (rt_min is None and rt_max is None):
  185. abort(400)
  186. try:
  187. if mz_min is not None and mz_max is None:
  188. mz_max = float(mz_min) + 3
  189. elif mz_max is not None and mz_min is None:
  190. mz_min = float(mz_max) - 3
  191. if rt_min is not None and rt_max is None:
  192. rt_max = float(rt_min) + 3
  193. elif rt_max is not None and rt_min is None:
  194. rt_min = float(rt_max) - 3
  195. mz_min, mz_max = float(mz_min), float(mz_max)
  196. rt_min, rt_max = float(rt_min), float(rt_max)
  197. except ValueError:
  198. abort(400)
  199. print(mz_min, mz_max, " ", rt_min, rt_max)
  200. mz_filter = and_(mz_max > Chemical.final_mz, Chemical.final_mz > mz_min)
  201. rt_filter = and_(rt_max > Chemical.final_rt, Chemical.final_rt > rt_min)
  202. result = Chemical.query.filter(
  203. or_(mz_filter, rt_filter)
  204. ).limit(20).all()
  205. print("Got Result", result)
  206. data = []
  207. for x in result:
  208. data.append({"url": url_for("chemical_view", id=x.id), "name": x.name, "mz": x.final_mz, "rt": x.final_rt})
  209. return jsonify(data)
  210. @app.route("/search")
  211. def search():
  212. return render_template("search.html")
  213. if __name__ == "__main__":
  214. db.init_app(app)
  215. with app.app_context():
  216. db.create_all()
  217. app.run(debug=True)