You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
8.5 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. #!/usr/bin/env python3
  2. from flask import Flask, render_template, session, request, abort, redirect, url_for, jsonify
  3. from flask_sqlalchemy import SQLAlchemy
  4. from sqlalchemy import inspect, and_
  5. from flask_wtf import FlaskForm
  6. import bcrypt
  7. from wtforms_alchemy import model_form_factory
  8. from flask_migrate import Migrate
  9. app = Flask(__name__)
  10. app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
  11. app.secret_key = '98d31240f9fbe14c8083586db49c19c3a8d3f726'
  12. db: SQLAlchemy = SQLAlchemy(app)
  13. migrate = Migrate(app, db)
  14. BaseModelForm = model_form_factory(FlaskForm)
  15. class ModelForm(BaseModelForm):
  16. @classmethod
  17. def get_session(cls):
  18. return db.session
  19. class Admin(db.Model):
  20. id = db.Column(db.Integer, primary_key=True)
  21. username = db.Column(db.String, unique=True, nullable=False)
  22. password = db.Column(db.String, nullable=False)
  23. query: db.Query
  24. @classmethod
  25. def generate_password(cls, pw: str):
  26. return bcrypt.hashpw(pw, bcrypt.gensalt(12))
  27. @classmethod
  28. def authenticate(cls, username: str, pw: str):
  29. user = Admin.query.filter_by(username=username).one_or_none()
  30. if user and bcrypt.checkpw(pw, user.password):
  31. session['admin'] = user.username
  32. return user
  33. else:
  34. return None
  35. @classmethod
  36. def exists(cls):
  37. user = Admin.query.one_or_none()
  38. return True if user else False
  39. @classmethod
  40. def authorize(cls):
  41. if not session.get('admin'):
  42. return redirect(url_for("admin_login"))
  43. def object_as_dict(obj):
  44. return {c.key: getattr(obj, c.key)
  45. for c in inspect(obj).mapper.column_attrs}
  46. class Chemical(db.Model):
  47. query: db.Query
  48. id = db.Column(db.Integer, primary_key=True)
  49. # all fields after here are included in the database
  50. chemical_db_id = db.Column(db.String)
  51. library = db.Column(db.String)
  52. # important fields
  53. name = db.Column(db.String, nullable=False)
  54. formula = db.Column(db.String, nullable=False)
  55. mass = db.Column(db.Float, nullable=False)
  56. pubchem_cid = db.Column(db.Integer)
  57. pubmed_refcount = db.Column(db.Integer)
  58. standard_class = db.Column(db.String)
  59. inchikey = db.Column(db.String)
  60. inchikey14 = db.Column(db.String)
  61. final_mz = db.Column(db.Float, nullable=False)
  62. final_rt = db.Column(db.Float, nullable=False)
  63. final_adduct = db.Column(db.String)
  64. adduct = db.Column(db.String)
  65. detected_adducts = db.Column(db.String)
  66. adduct_calc_mz = db.Column(db.String)
  67. msms_detected = db.Column(db.Boolean)
  68. msms_purity = db.Column(db.Float)
  69. createdAt = db.Column(db.Date)
  70. class ChemicalForm(ModelForm):
  71. class Meta:
  72. csrf = False
  73. model = Chemical
  74. # Error Handlers
  75. @app.errorhandler(404)
  76. def handler_404(msg):
  77. return render_template("errors/404.html")
  78. @app.errorhandler(403)
  79. def handler_403(msg):
  80. return render_template("errors/403.html")
  81. # Admin routes
  82. @app.route('/admin')
  83. def admin_root():
  84. if login := Admin.authorize():
  85. return login
  86. return render_template("admin.html", user=session.get("admin"))
  87. @app.route('/admin/create', methods=['GET', 'POST'])
  88. def admin_create():
  89. if Admin.exists():
  90. if login := Admin.authorize():
  91. return login
  92. if request.method == "GET":
  93. return render_template("register.html")
  94. else:
  95. username, pw = request.form.get('username'), request.form.get('password')
  96. if username is None or pw is None:
  97. return render_template("register.html", fail="Invalid Input.")
  98. elif db.session.execute(db.select(Admin).filter_by(username=username)).fetchone():
  99. return render_template("register.html", fail="Username already exists.")
  100. else:
  101. db.session.add(Admin(username=username, password=Admin.generate_password(pw)))
  102. db.session.commit()
  103. return render_template("register.html", success=True)
  104. @app.route('/admin/login', methods=['GET', 'POST'])
  105. def admin_login():
  106. if request.method == "POST":
  107. username, pw = request.form.get('username', ''), request.form.get('password', '')
  108. if Admin.authenticate(username, pw):
  109. return render_template("login.html", success=True)
  110. else:
  111. return render_template("login.html", fail="Could not authenticate.")
  112. else:
  113. return render_template("login.html")
  114. @app.route('/admin/logout', methods=['GET'])
  115. def admin_logout():
  116. session.pop('admin')
  117. return redirect(url_for('home'))
  118. @app.route("/")
  119. def home():
  120. if Admin.exists():
  121. return render_template("index.html")
  122. else:
  123. return redirect(url_for("admin_create"))
  124. # Routes for CRUD operations on chemicals
  125. @app.route("/chemical/create", methods=['GET', 'POST'])
  126. def chemical_create():
  127. if not session.get('admin'):
  128. abort(403)
  129. if request.method == "POST":
  130. form = ChemicalForm(**request.form)
  131. if form.validate():
  132. new_chemical = Chemical(**form.data)
  133. db.session.add(new_chemical)
  134. db.session.commit()
  135. return render_template("create_chemical.html", form=ChemicalForm(), success=True)
  136. else:
  137. return render_template("create_chemical.html", form=form, invalid=True), 400
  138. else:
  139. form = ChemicalForm()
  140. return render_template("create_chemical.html", form=form)
  141. @app.route("/chemical/<int:id>/update", methods=['GET', 'POST'])
  142. def chemical_update(id: int):
  143. if not session.get('admin'):
  144. abort(403)
  145. current_chemical:Chemical = Chemical.query.filter_by(id=id).one_or_404()
  146. dct = object_as_dict(current_chemical)
  147. if request.method == "POST":
  148. form = ChemicalForm(**request.form)
  149. if form.validate():
  150. # take the row with id and update it.
  151. for k in form.data:
  152. setattr(current_chemical, k, form.data[k])
  153. db.session.commit()
  154. return render_template("create_chemical.html", form=form, success=True, id=id)
  155. else:
  156. form = ChemicalForm(**dct)
  157. return render_template("create_chemical.html", form=form, invalid=True, id=id), 400
  158. else:
  159. form = ChemicalForm(**dct)
  160. return render_template("create_chemical.html", form=form, id=id)
  161. @app.route("/chemical/<int:id>/delete")
  162. def chemical_delete(id: int):
  163. if not session.get('admin'):
  164. abort(403)
  165. current_chemical:Chemical = Chemical.query.filter_by(id=id).one_or_404()
  166. db.session.delete(current_chemical)
  167. db.session.commit()
  168. return render_template("delete_chemical.html", id=id)
  169. @app.route("/chemical/<int:id>/view")
  170. def chemical_view(id: int):
  171. current_chemical:Chemical = Chemical.query.filter_by(id=id).one_or_404()
  172. dct = object_as_dict(current_chemical)
  173. return render_template("view_chemical.html", id=id, chemical=dct)
  174. @app.route("/chemical/all")
  175. def chemical_all():
  176. if not session.get('admin'):
  177. abort(403)
  178. result = Chemical.query.all()
  179. data = []
  180. for x in result:
  181. data.append({"url": url_for("chemical_view", id=x.id), "name": x.name, "mz": x.final_mz, "rt": x.final_rt})
  182. return jsonify(data)
  183. @app.route("/chemical/search")
  184. def search_api():
  185. mz_min, mz_max = request.args.get('mz_min'), request.args.get('mz_max')
  186. rt_min, rt_max = request.args.get('rt_min'), request.args.get('rt_max')
  187. if (mz_min is None and mz_max is None) or (rt_min is None and rt_max is None):
  188. return jsonify({"error": "invalid data"}), 400
  189. try:
  190. if mz_min is not None and mz_max is None:
  191. mz_max = float(mz_min) + 3
  192. elif mz_max is not None and mz_min is None:
  193. mz_min = float(mz_max) - 3
  194. if rt_min is not None and rt_max is None:
  195. rt_max = float(rt_min) + 3
  196. elif rt_max is not None and rt_min is None:
  197. rt_min = float(rt_max) - 3
  198. mz_min, mz_max = float(mz_min), float(mz_max)
  199. rt_min, rt_max = float(rt_min), float(rt_max)
  200. except ValueError:
  201. return jsonify({"error": "invalid data"}), 400
  202. mz_filter = and_(mz_max > Chemical.final_mz, Chemical.final_mz > mz_min)
  203. rt_filter = and_(rt_max > Chemical.final_rt, Chemical.final_rt > rt_min)
  204. result = Chemical.query.filter(
  205. and_(mz_filter, rt_filter)
  206. ).limit(10).all()
  207. data = []
  208. for x in result:
  209. data.append({"url": url_for("chemical_view", id=x.id), "name": x.name, "mz": x.final_mz, "rt": x.final_rt})
  210. return jsonify(data)
  211. @app.route("/search")
  212. def search():
  213. return render_template("search.html")
  214. if __name__ == "__main__":
  215. with app.app_context():
  216. db.create_all()
  217. app.run(debug=True)