Issue
app.py
from pprint import pprint
from flask import Flask, render_template
from flask_restful import Api, Resource
from restApi import crud
app = Flask(__name__)
api = Api(app)
@app.route('/aad')
def hello_world():
return "hello world"
@app.route('/')
def hello_worldp():
return render_template("index.html")
class HelloWorld(Resource):
def get(self):
return {"prova": "sono una prova"}
api.add_resource(HelloWorld, "/HelloWorld")
api.add_resource(crud.ProvaApi, "/c")
api.add_resource(crud.getAllColumns, "/allColumns")
if __name__ == '__main__':
app.run(debug=True)
test.py
from pprint import pprint
from sqlalchemy.orm import joinedload
from flask import jsonify
from data import schema
def test1():
c = schema.Colonne(titolo="prova5", stato="boh")
cont1 = schema.Contenuto(testo="prova Contenuto1")
cont2 = schema.Contenuto(url="nonsourl")
t = schema.Tile(titolo="provaTile", autore="nonso", contenuto=cont1.id)
t2 = schema.Tile(titolo="provaTile", autore="nonso", contenuto=cont2.id)
session = schema.dbSession()
# c.tile.append(t)
#c.tile.append(t2)
#c.tile[0].contenuto.add(cont1)
#c.tile[0].contenuto.add(cont2)
# session.add(c)
#session.commit()
colschema = schema.ColonneSchema()
tileschema = schema.TileSchema()
contschema = schema.ContenutoSchema()
qr = session.query(schema.Colonne, schema.Tile, schema.Contenuto).\
select_from(schema.Colonne).\
join(schema.Tile, schema.Colonne.titolo == schema.Tile.colonna_id, isouter=True).\
join(schema.Contenuto, schema.Tile.contenuto == schema.Contenuto.id, isouter=True).all()
#qr = qr.filter(schema.Colonne.titolo == "prova5")
#record = qr.one()
print(qr)
#ti = record.tile.get(0).filter(schema.Tile.id == 1)
#ti.contenuto.add(cont1)
qp = session.query(schema.Tile);
qp = qp.filter(schema.Tile.id == "1")
record = qp.one()
record.contenuto = cont1.id
session.commit()
qr = session.query(schema.Colonne, schema.Tile, schema.Contenuto). \
select_from(schema.Colonne). \
join(schema.Tile, schema.Colonne.titolo == schema.Tile.colonna_id, isouter=True). \
join(schema.Contenuto, schema.Tile.contenuto == schema.Contenuto.id, isouter=True).all()
for r1, r2, r3 in qr:
print(colschema.dump(r1))
print(tileschema.dump(r2))
print(contschema.dump(r3))
def test2():
c = schema.Colonne(titolo="prova4", stato="boh")
t = schema.Tile(titolo="provaTile", autore="nonso")
t2 = schema.Tile(titolo="provaTile", autore="nonso")
cont = schema.Contenuto(testo="prova Contenuto", url ="nonsourl")
session = schema.dbSession()
c.tiles.append(t)
c.tiles[0].contenuto = cont
dum = schema.ColonneSchema().dump(c)
#print(c.tiles[0].contenuto)
#print(dum)
#c.tiles.append(t2)
#c.tiles[0].append(cont)
session.add(c)
session.commit()
#tiles
#cols = session.query(schema.Tile).all()
# print(jsonify(cols))
#t_schema = schema.TileSchema()
# print(c_schema.dump(cols))
#for col in cols:
# print(jsonify(col))
# print(t_schema.dump(col))
def test3():
session = schema.dbSession()
query = "select Colonne.titolo, Colonne.stato, Tile.id, Tile.autore, Tile.contenuto, Contenuto.testo,Contenuto.url" \
" from Colonne inner join Tile" \
" on Colonne.titolo = Tile.colonna_id left join " \
"Contenuto on Contenuto.id = Tile.contenuto;"
rs = session.execute(query)
#row_headers = [x[0] for x in rs.description]
resultset = [dict(row) for row in rs]
print(resultset)
#for row in rs:
# print(jsonify(row))
def test4():
query="insert into Contenuto values ('gna', 'codio')"
session = schema.dbSession()
rs = session.execute(query)
def test5():
query = "SELECT * FROM Contenuto"
session = schema.dbSession()
rs = session.execute(query)
presultset = [dict(row) for row in rs]
print(presultset)
def test6():
session = schema.dbSession()
rs = session.execute("PRAGMA foreign_keys;")
for row in rs:
print(row)
def test7():
session = schema.dbSession()
rs1 = session.query(schema.Colonne).select_from(schema.Colonne).join(schema.Tile).join(schema.Contenuto).all()
rs = session.query(schema.Colonne).all()
scCol = schema.ColonneSchema(many=True)
print(scCol.dump(rs))
#for row in rs:
# print(scCol.dump(rs))
def test8():
c = schema.Colonne(titolo="prova4", stato="boh")
t = schema.Tile(titolo="provaTile", autore="nonso", colonna = c)
t.colonna = c
cont = schema.Contenuto(testo="prova Contenuto", url="nonsourl")
t.contenuto = cont
rs = schema.ContenutoSchema().dump(cont)
pprint(rs)
rs = schema.ColonneSchema().dump(c)
pprint(rs)
def test9():
session = schema.dbSession()
rs = session.query(schema.Colonne).all()
pprint(schema.ColonneSchema(many=True).dump(rs))
def test10():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
pprint(rs)
listaTot= []
listaColonne = []
for col in colonne:
dizionario = {}
print("***************COlonna********************")
print(schema.ColonneSchema().dump(col))
listaTile = []
dizionario['colonne'] = schema.ColonneSchema().dump(col)
for tile in col.tile:
print("***************Tile********************")
#pprint(tileSchema.dump(tile))
print("***************Contenuto********************")
#pprint(contenutoSchema.dump(tile.contenuto))
print("***************Contenuto********************")
print("***************Tile********************")
tiletmp= tileSchema.dump(tile)
tiletmp['contenuto'] = contenutoSchema.dump(tile.contenuto)
listaTile.append(tiletmp)
dizionario['tile'] = listaTile
print("***************COlonna********************")
listaTot.append(dizionario)
return listaTot
def test11():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
pprint(rs)
listaColonne= []
for col in colonne:
print(schema.ColonneSchema().dump(col))
def test12():
sess = schema.dbSession()
r=sess.query(schema.Colonne).filter_by(titolo="prova4").first()
sess.delete(r)
sess.commit()
print(r)
def test13():#update
sess = schema.dbSession()
r=sess.query(schema.Colonne).filter_by(titolo="prova4").first()
r.titolo ="provagay"
sess.add(r)
sess.commit()
print(r)
def test14():#cancella elemento
sess = schema.dbSession()
r=sess.query(schema.Tile).filter_by(id="1").first()
sess.delete(r)
sess.commit()
print(r)
def test15():#update elemento
session = schema.dbSession()
c=session.query(schema.Colonne).filter_by(titolo="prova4").first()
t = schema.Tile(titolo="provaTile", autore="nonso")
t2 = schema.Tile(titolo="provaTile", autore="nonso")
cont = schema.Contenuto(testo="prova Contenuto", url ="nonsourl")
c.tiles.append(t)
c.tiles[0].contenuto = cont
dum = schema.ColonneSchema().dump(c)
#print(c.tiles[0].contenuto)
#print(dum)
#c.tiles.append(t2)
#c.tiles[0].append(cont)
session.add(c)
session.commit()
def query_to_dict(ret):
if ret is not None:
return [{key: value for key, value in row.items()} for row in ret if row is not None]
else:
return [{}]
if __name__ == "__main__":
pprint(test10())
schema.py
from flask import Flask
from marshmallow_sqlalchemy.fields import Nested
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey
from sqlalchemy.orm import relationship, scoped_session, sessionmaker, joinedload,backref
from sqlalchemy.ext.declarative import declarative_base
from marshmallow_sqlalchemy import SQLAlchemyAutoSchema, fields
from flask_marshmallow import Marshmallow
app = Flask(__name__)
Base = declarative_base()
engine = create_engine('sqlite:///../database/mydb.db', convert_unicode=True)
dbSession = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine,
))
ma = Marshmallow(app)
class Colonne(Base):
__tablename__ = "Colonne"
id = Column(Integer,primary_key=True,autoincrement=True)
titolo = Column(String, unique=True)
stato = Column(String)
tile = relationship("Tile", back_populates ="colonna", cascade="all,delete")
class Tile(Base):
__tablename__ = "Tile"
id = Column(Integer, primary_key=True, autoincrement=True)
autore = Column(String)
titolo = Column(String)
colonna_id = Column(Integer, ForeignKey("Colonne.id"))
colonna = relationship("Colonne", backref="tiles", overlaps="tile")
contenuto_id = Column(Integer, ForeignKey('Contenuto.id', ondelete='CASCADE'))
contenuto = relationship("Contenuto", backref="tile", cascade="all,delete")
class Contenuto(Base):
__tablename__ = "Contenuto"
id = Column(Integer, primary_key=True, autoincrement=True )
testo = Column(String)
url = Column(String)
class ColonneSchema(SQLAlchemyAutoSchema):
class Meta:
model = Colonne
class TileSchema(SQLAlchemyAutoSchema):
class Meta:
model = Tile
colonna = Nested(ColonneSchema)
class ContenutoSchema(SQLAlchemyAutoSchema):
class Meta:
model = Contenuto
include_fk = True
include_relationships = True
load_instance = False
tile = fields.Nested(TileSchema)
# -----------------------crea tabelle----------------------------
if __name__ == '__main__':
Base.metadata.create_all(bind=engine)
crud.py
from flask_restful import Api, Resource
from data import schema
from pprint import pprint
def getColonne():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
# pprint(rs)
listaTot = []
listaColonne = []
for col in colonne:
dizionario = {}
# print("***************COlonna********************")
# print(schema.ColonneSchema().dump(col))
listaTile = []
dizionario['colonne'] = schema.ColonneSchema().dump(col)
for tile in col.tile:
# print("***************Tile********************")
# pprint(tileSchema.dump(tile))
# print("***************Contenuto********************")
# pprint(contenutoSchema.dump(tile.contenuto))
# print("***************Contenuto********************")
# print("***************Tile********************")
tiletmp = tileSchema.dump(tile)
tiletmp['contenuto'] = contenutoSchema.dump(tile.contenuto)
listaTile.append(tiletmp)
dizionario['tile'] = listaTile
# print("***************COlonna********************")
listaTot.append(dizionario)
return listaTot
if __name__ == "__main__":
pprint(getColonne())
class ProvaApi(Resource):
def get(self):
return {"prova": "sono una prova"}
class getAllColumns(Resource):
def get(self):
return getColonne()
Now, when i run "python crud.py" getColonne()
works.
Otherwise when the endpoint is being called,while the app is running, "getAllColumns", it can't open the database sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file.
the structure is something like this: project data schema.py database mydb.db restApi crud.py test test.py app.py
And of course while running all the test.py i dont have problems.
What's the problem?
Solution
You're using a relative path in your create_engine()
. Using an absolute path will resolve your issue. If it's important that it be relative to the file, use something like create_engine(f"sqlite:///{os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'database/mydb.db')"})
.
Answered By - AdamKG
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.