You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

278 lines
8.4 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. #!/usr/bin/env python3
  2. from datetime import date
  3. from flask import Flask, render_template, session, request, abort, redirect, url_for, jsonify
  4. from flask_sqlalchemy import SQLAlchemy
  5. from sqlalchemy import inspect, and_
  6. from flask_wtf import FlaskForm
  7. import bcrypt
  8. from wtforms_alchemy import model_form_factory
  9. from flask_migrate import Migrate
  10. app = Flask(__name__)
  11. app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
  12. app.secret_key = '98d31240f9fbe14c8083586db49c19c3a8d3f726'
  13. db: SQLAlchemy = SQLAlchemy(app)
  14. migrate = Migrate(app, db)
  15. BaseModelForm = model_form_factory(FlaskForm)
  16. class ModelForm(BaseModelForm):
  17. @classmethod
  18. def get_session(cls):
  19. return db.session
  20. class Admin(db.Model):
  21. id = db.Column(db.Integer, primary_key=True)
  22. username = db.Column(db.String, unique=True, nullable=False)
  23. password = db.Column(db.String, nullable=False)
  24. query: db.Query
  25. @classmethod
  26. def generate_password(cls, pw: str):
  27. return bcrypt.hashpw(pw, bcrypt.gensalt(12))
  28. @classmethod
  29. def authenticate(cls, username: str, pw: str):
  30. user = Admin.query.filter_by(username=username).one_or_none()
  31. if user and bcrypt.checkpw(pw, user.password):
  32. session['admin'] = user.username
  33. return user
  34. else:
  35. return None
  36. @classmethod
  37. def exists(cls):
  38. user = Admin.query.one_or_none()
  39. return True if user else False
  40. @classmethod
  41. def authorize(cls):
  42. if not session.get('admin'):
  43. return redirect(url_for("admin_login"))
  44. def object_as_dict(obj):
  45. return {c.key: getattr(obj, c.key)
  46. for c in inspect(obj).mapper.column_attrs}
  47. class Chemical(db.Model):
  48. query: db.Query
  49. id = db.Column(db.Integer, primary_key=True)
  50. # all fields after here are included in the database
  51. chemical_db_id = db.Column(db.String)
  52. library = db.Column(db.String)
  53. # important fields
  54. name = db.Column(db.String, nullable=False)
  55. formula = db.Column(db.String, nullable=False)
  56. mass = db.Column(db.Float, nullable=False)
  57. pubchem_cid = db.Column(db.Integer)
  58. pubmed_refcount = db.Column(db.Integer)
  59. standard_class = db.Column(db.String)
  60. inchikey = db.Column(db.String)
  61. inchikey14 = db.Column(db.String)
  62. final_mz = db.Column(db.Float, nullable=False)
  63. final_rt = db.Column(db.Float, nullable=False)
  64. final_adduct = db.Column(db.String)
  65. adduct = db.Column(db.String)
  66. detected_adducts = db.Column(db.String)
  67. adduct_calc_mz = db.Column(db.String)
  68. msms_detected = db.Column(db.Boolean)
  69. msms_purity = db.Column(db.Float)
  70. # serialized into datetime.date
  71. createdAt = db.Column(db.Date)
  72. class ChemicalForm(ModelForm):
  73. class Meta:
  74. csrf = False
  75. model = Chemical
  76. # Error Handlers
  77. @app.errorhandler(404)
  78. def handler_404(msg):
  79. return render_template("errors/404.html")
  80. @app.errorhandler(403)
  81. def handler_403(msg):
  82. return render_template("errors/403.html")
  83. # Admin routes
  84. @app.route('/admin')
  85. def admin_root():
  86. if login := Admin.authorize():
  87. return login
  88. return render_template("admin.html", user=session.get("admin"))
  89. @app.route('/admin/create', methods=['GET', 'POST'])
  90. def admin_create():
  91. if Admin.exists():
  92. if login := Admin.authorize():
  93. return login
  94. if request.method == "GET":
  95. return render_template("register.html")
  96. else:
  97. username, pw = request.form.get(
  98. 'username'), request.form.get('password')
  99. if username is None or pw is None:
  100. return render_template("register.html", fail="Invalid Input.")
  101. elif db.session.execute(db.select(Admin).filter_by(username=username)).fetchone():
  102. return render_template("register.html", fail="Username already exists.")
  103. else:
  104. db.session.add(
  105. Admin(username=username, password=Admin.generate_password(pw)))
  106. db.session.commit()
  107. return render_template("register.html", success=True)
  108. @app.route('/admin/login', methods=['GET', 'POST'])
  109. def admin_login():
  110. if request.method == "POST":
  111. username, pw = request.form.get(
  112. 'username', ''), request.form.get('password', '')
  113. if Admin.authenticate(username, pw):
  114. return render_template("login.html", success=True)
  115. else:
  116. return render_template("login.html", fail="Could not authenticate.")
  117. else:
  118. return render_template("login.html")
  119. @app.route('/admin/logout', methods=['GET'])
  120. def admin_logout():
  121. session.pop('admin')
  122. return redirect(url_for('home'))
  123. @app.route("/")
  124. def home():
  125. if Admin.exists():
  126. return render_template("index.html")
  127. else:
  128. return redirect(url_for("admin_create"))
  129. # Routes for CRUD operations on chemicals
  130. @app.route("/chemical/create", methods=['GET', 'POST'])
  131. def chemical_create():
  132. if not session.get('admin'):
  133. abort(403)
  134. if request.method == "POST":
  135. form = ChemicalForm(**request.form)
  136. if form.validate():
  137. new_chemical = Chemical(**form.data)
  138. db.session.add(new_chemical)
  139. db.session.commit()
  140. return render_template("create_chemical.html", form=ChemicalForm(), success=True)
  141. else:
  142. return render_template("create_chemical.html", form=form, invalid=True), 400
  143. else:
  144. form = ChemicalForm()
  145. return render_template("create_chemical.html", form=form)
  146. @app.route("/chemical/<int:id>/update", methods=['GET', 'POST'])
  147. def chemical_update(id: int):
  148. if not session.get('admin'):
  149. abort(403)
  150. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  151. dct = object_as_dict(current_chemical)
  152. if request.method == "POST":
  153. form = ChemicalForm(**request.form)
  154. if form.validate():
  155. # take the row with id and update it.
  156. for k in form.data:
  157. setattr(current_chemical, k, form.data[k])
  158. db.session.commit()
  159. return render_template("create_chemical.html", form=form, success=True, id=id)
  160. else:
  161. form = ChemicalForm(**dct)
  162. return render_template("create_chemical.html", form=form, invalid=True, id=id), 400
  163. else:
  164. form = ChemicalForm(**dct)
  165. return render_template("create_chemical.html", form=form, id=id)
  166. @app.route("/chemical/<int:id>/delete")
  167. def chemical_delete(id: int):
  168. if not session.get('admin'):
  169. abort(403)
  170. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  171. db.session.delete(current_chemical)
  172. db.session.commit()
  173. return render_template("delete_chemical.html", id=id)
  174. @app.route("/chemical/<int:id>/view")
  175. def chemical_view(id: int):
  176. current_chemical: Chemical = Chemical.query.filter_by(id=id).one_or_404()
  177. dct = object_as_dict(current_chemical)
  178. return render_template("view_chemical.html", id=id, chemical=dct)
  179. @app.route("/chemical/all")
  180. def chemical_all():
  181. if not session.get('admin'):
  182. abort(403)
  183. result: list[Chemical] = Chemical.query.all()
  184. data = []
  185. for x in result:
  186. data.append({c.name: getattr(x, c.name) for c in x.__table__.columns})
  187. return jsonify(data)
  188. @app.route("/chemical/search", methods=["POST"])
  189. def search_api():
  190. query = request.json
  191. if query is None:
  192. return jsonify([])
  193. for field in query:
  194. query[field] = float(query[field])
  195. mz_min, mz_max = query.get('mz_min'), query.get('mz_max')
  196. rt_min, rt_max = query.get('rt_min'), query.get('rt_max')
  197. year_max, month_max, day_max = int(query.get(
  198. 'year_max')), int(query.get('month_max')), int(query.get('day_max'))
  199. try:
  200. mz_filter = and_(mz_max > Chemical.final_mz,
  201. Chemical.final_mz > mz_min)
  202. rt_filter = and_(rt_max > Chemical.final_rt,
  203. Chemical.final_rt > rt_min)
  204. date_filter = date(year_max, month_max, day_max) >= Chemical.createdAt
  205. except ValueError as e:
  206. return jsonify({"error": str(e)}), 400
  207. result = Chemical.query.filter(
  208. and_(mz_filter, rt_filter, date_filter)
  209. ).limit(20).all()
  210. data = []
  211. for x in result:
  212. data.append({"url": url_for("chemical_view", id=x.id),
  213. "name": x.name, "mz": x.final_mz, "rt": x.final_rt})
  214. return jsonify(data)
  215. @app.route("/search")
  216. def search():
  217. return render_template("search.html")
  218. if __name__ == "__main__":
  219. with app.app_context():
  220. db.create_all()
  221. app.run(debug=True)