-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice.py
111 lines (92 loc) · 3.11 KB
/
service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import db
class CRUDService:
def __init__(self, tableName):
self.primaryKey = None
self.schema = None
self.tableName = tableName
self.initSchema()
def initSchema(self):
conn = db.get_db()
sql = 'PRAGMA table_info({0})'.format(self.tableName)
cursor = conn.execute(sql)
rows = cursor.fetchall()
schema = {}
for row in rows:
name = row['name']
type = row['type']
schema[name] = type
if row['pk']:
self.primaryKey = name
self.schema = schema
def getEmpty(self):
empty = {}
for key in self.schema:
empty[key] = ''
return empty
def create(self, data):
conn = db.get_db()
placeholders = []
insertKeys = []
for key in self.schema.keys():
if key is self.primaryKey:
continue
if key in data:
insertKeys.append(key)
# we are using named placeholders here for the query building
placeholders.append(':' + key)
sql = 'INSERT INTO {0} ({1}) VALUES ({2});'.format(
self.tableName, ', '.join(insertKeys), ', '.join(placeholders)
)
conn.execute(sql, data)
conn.commit()
return True
def readAll(self, order_by=None, order_mode='ASC'):
conn = db.get_db()
sql = 'SELECT * FROM {0}'.format(self.tableName)
if order_by is not None:
sql += ' ORDER BY {0} {1};'.format(
order_by, order_mode)
results = conn.execute(sql).fetchall()
return results
def readByPK(self, id):
conn = db.get_db()
sql = 'SELECT * FROM {0} WHERE {1} = ?;'.format(
self.tableName, self.primaryKey)
el = conn.execute(sql, (str(id),)).fetchone()
return el
def readBy(self, key, value):
conn = db.get_db()
sql = 'SELECT * FROM {0} WHERE {1} = ?;'.format(
self.tableName, key)
el = conn.execute(sql, (str(value),)).fetchall()
return el
def read(self, id=False):
if id is False:
return self.readAll()
return self.readByPK(id)
def update(self, data):
conn = db.get_db()
keysAndPlaceholders = []
for key in self.schema.keys():
if key is self.primaryKey:
continue
if key in data:
keysAndPlaceholders.append('{0} = :{1}'.format(key, key))
sql = 'UPDATE {0} SET {1} WHERE {2} = :{2};'.format(
self.tableName, ', '.join(keysAndPlaceholders), self.primaryKey
)
conn.execute(sql, data)
conn.commit()
return True
def delete(self, id):
conn = db.get_db()
sql = 'DELETE FROM {0} WHERE {1} = ?;'.format(
self.tableName, self.primaryKey)
conn.execute(sql, (id,))
conn.commit()
return True
def countRows(self):
conn = db.get_db()
query = 'SELECT COUNT(*) FROM {0};'.format(self.tableName)
conn.execute(query)
return conn.fetchone()[0]