Skip to the content.

Flask con SQLAlchemy (MySQL)

Flask básico

pip install mysqlclient
from flask import Flask

app = Flask(__name__)

if __name__=='__main__':
    app.run(debug=True)

Flask Blueprint

from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return 'página de inicio'

@app.route('/guardar')
def save():
    return 'Elemento Guardado'

@app.route('/borrar')
def delete():
    return 'Elemento Borrado'

if __name__=='__main__':
    app.run(debug=True)

Con blueprint

# main.py

from flask import Flask
from rutas import rutas

app = Flask(__name__)
app.register_blueprint(rutas)

if __name__=='__main__':
    app.run(debug=True)
# rutas.py
from flask import Blueprint

rutas = Blueprint('rutas', __name__)

@rutas.route('/')
def index():
    return 'página de inicio'

@rutas.route('/guardar')
def save():
    return 'Elemento Guardado'

@rutas.route('/borrar')
def delete():
    return 'Elemento Borrado'

Flask SQLAlchemy

Conexión a base de datos

# main.py

from flask import Flask
from rutas import rutas

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@server/db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

SQLAlchemy(app)

app.register_blueprint(rutas)

from db import db

with app.app_context():
    db.create_all()


if __name__=='__main__':
    app.run(debug=True)
# db.py
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()
# modelos.py

from db import db

class Contacts(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    fullname = db.Column(db.String(100))
    email = db.Column(db.String(100))
    phone = db.Column(db.String(100))
    
    def __init__(self, fullname, email, phone):
        self.fullname = fullname
        self.email = email
        self.phone = phone
          

Guardar datos

# rutas.py
from flask import Blueprint, render_template, request
from modelo import Contacts
from db import db

rutas = Blueprint('rutas', __name__)

@ruta.route('/new', methods=['POST'])
def save():
    fu = request.form['fullname']
    em = request.form['email']
    ph = request.form['phone']
    
    guardar = Contacts(fu,em,ph)
    db.session.add(guardar)
    db.session.commit()
    
    return "contacto guardado"

Mostrar datos

@ruta.routes('/')
def index():
    data = Contacts.query.all()
    return  render_template('index.html' data = data)

Rutas con blueprint

# path desde el html (blueprint)
<a href="">Update</a>
<a href="">Delete</a>

Elimianar datos

@ruta.routes('/delete/<id>')
def delete(id):
    data = Contacts.query.get(id)
    db.session.delete(data)
    db.session.commit()
    return  'eliminar datos'

Usar url_for

return (url_for('rutas.index'))

Actualizar datos


@ruta.routes('/update/<id>', methods=['GET', 'POST'])
def update(id):
    get_element = Contacts.query.get(id)
    return render_template('update.html' elementos=get_elements)
<form action="/update/" method="POST">
    <input type="text" value="" name="fullname">
    <input type="text" value="" name="email">
    <input type="text" value="" name="phone">
    <button>
        Actualizar
    </button>
</form>
@ruta.routes('/update/<id>', methods=['GET', 'POST'])
def update(id):
    get_element = Contacts.query.get(id)
    if request.method == 'POST':    
    	get_element.fullname = request.form['fullname']
    	get_element.email = request.form['email']
    	get_element.phone = request.form['phone']
        db.session.commit()
        return "actualizando"
    return render_template('update.html' elementos=get_elements)

Subir archivos a la base de datos

<form action="/update" enctype="multipart/form-data" method="post">
    <input type="file" name="file">
    <button>
        Subir
    </button>
</form>
# model
class Upload(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    filename = db.Column(db.String(50))
    data = db.Column(db.LargeBinary)
# ruta
@app.route('/',methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        file = request.files['file']
        upload = Upload(name=file.filename, data=file.read())
        db.session.add(upload)
        db.session.commit()
        return f'Uploaded: {file.filename}'
    return render_template('index.html')

Descargar archivos de la base de datos

# download part

from io import BytesIO

@app.route('/download/<file_id>')
def download(file_id):
    upload = Upload.query.filter_by(id=file_id).first()
    return send_file(BytesIO(upload.data), attachment_filename=upload.filename, as_attachment=True)

Fechas

import date

fecha = db.Column(db.Date)