|
@@ -50,21 +50,6 @@ class PostgreSQLHandler():
|
|
|
WHERE table_schema = 'public'
|
|
|
""")
|
|
|
return list(map(lambda x: x[0], self.cur.fetchall()))
|
|
|
-
|
|
|
- def table_columns(self, table: str) -> List[str]:
|
|
|
- """Returns list of the fields of the table
|
|
|
-
|
|
|
- Parameters
|
|
|
- ----------
|
|
|
- table : str
|
|
|
- table from slowdb compton measurements
|
|
|
-
|
|
|
- Returns
|
|
|
- -------
|
|
|
- List[str]
|
|
|
- list of the fields
|
|
|
- """
|
|
|
- pass
|
|
|
|
|
|
class SlowdbComptonHandler(PostgreSQLHandler):
|
|
|
"""A class for processing and filtering of compton measurements from slowdb
|
|
@@ -160,7 +145,7 @@ class CalibrdbHandler(PostgreSQLHandler):
|
|
|
"""A class for processing of calibration database
|
|
|
"""
|
|
|
|
|
|
- def select_table(self, system: str, algo: str, name: str, version: str = 'Default', verbose: bool = True) -> int:
|
|
|
+ def select_table(self, system: str, algo: str, name: str, version: str = 'Default') -> int:
|
|
|
"""Selects the table from database
|
|
|
|
|
|
Parameters
|
|
@@ -173,8 +158,6 @@ class CalibrdbHandler(PostgreSQLHandler):
|
|
|
name of the calibration
|
|
|
version : str
|
|
|
name of the calibration version (default is Default)
|
|
|
- verbose : bool
|
|
|
- print additional information or not (default is False)
|
|
|
|
|
|
Returns
|
|
|
-------
|
|
@@ -192,7 +175,9 @@ class CalibrdbHandler(PostgreSQLHandler):
|
|
|
sid = result[0][0]
|
|
|
return sid
|
|
|
|
|
|
- def load_table(self, system: str, algo: str, name: str, version: str = 'Default', num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None, return_timezone: bool = False) -> Tuple[list, list]:
|
|
|
+ def load_table(self, system: str, algo: str, name: str, version: str = 'Default',
|
|
|
+ num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None,
|
|
|
+ return_timezone: bool = False) -> Tuple[list, list]:
|
|
|
"""Loads the calibration table
|
|
|
|
|
|
Parameters
|
|
@@ -239,14 +224,14 @@ class CalibrdbHandler(PostgreSQLHandler):
|
|
|
table = self.cur.fetchall()
|
|
|
return table, fields_name
|
|
|
|
|
|
- def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = True):
|
|
|
+ def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader",
|
|
|
+ name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = True):
|
|
|
if len(new_rows) == 0:
|
|
|
return
|
|
|
|
|
|
sid = self.select_table(system, algo, name, version)
|
|
|
|
|
|
new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
|
|
|
- # print(new_rows[0])
|
|
|
|
|
|
if handle_last_time_row:
|
|
|
last_written_row, _ = self.load_table(system, algo, name, version, num_last_rows = 1, return_timezone = True)
|
|
@@ -260,32 +245,56 @@ class CalibrdbHandler(PostgreSQLHandler):
|
|
|
logging.info(f"Inserted {len(new_rows)} new rows")
|
|
|
return
|
|
|
|
|
|
- def insert(self, new_rows: List[Tuple[str, list]], system: str, algo: str, name: str, version: str, update: bool = True):
|
|
|
+ def insert(self, new_rows: list, system: str, algo: str, name: str, version: str,
|
|
|
+ update: bool = True, comment: Optional[str] = None):
|
|
|
"""Insert new_rows in the table
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
- new_rows : List[Tuple[datetime, datetime, datetime, list]]
|
|
|
- list of new rows (tuples) in the follwing format (comment: str, data: list)
|
|
|
+ new_rows : list
|
|
|
+ list of new rows in the follwing format
|
|
|
update : bool
|
|
|
update current calibration
|
|
|
+ comment : Optional[str]
|
|
|
+ common comment field
|
|
|
"""
|
|
|
|
|
|
sid = self.select_table(system, algo, name, version)
|
|
|
|
|
|
- time_now, dlt0 = datetime.utcnow(), timedelta(days=5000)
|
|
|
if update:
|
|
|
- update_query = f"""UPDATE clbrdata SET endtime = %s
|
|
|
- WHERE comment = %s AND endtime > %s AND sid = {sid}
|
|
|
+ update_query = f"""UPDATE clbrdata
|
|
|
+ SET data = %(data)s, createdby = %(createdby)s, time = %(time)s, begintime = %(begintime)s, endtime = %(endtime)s
|
|
|
+ WHERE sid = %(sid)s AND comment = %(comment)s
|
|
|
"""
|
|
|
- for row in new_rows:
|
|
|
- season = row[0]
|
|
|
- self.cur.execute(update_query, (time_now, season, time_now))
|
|
|
+ for x in new_rows:
|
|
|
+ season_point = (comment if comment is not None else '') + '_' + str(x[3])
|
|
|
+ dict_row = {
|
|
|
+ 'sid': sid,
|
|
|
+ 'createdby': 'lxeuser',
|
|
|
+ 'time': x[0],
|
|
|
+ 'begintime': x[1],
|
|
|
+ 'endtime': x[2],
|
|
|
+ 'comment': season_point,
|
|
|
+ 'data': x[3:],
|
|
|
+ }
|
|
|
+ self.cur.execute(update_query, dict_row)
|
|
|
+
|
|
|
+ insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, comment, data) VALUES %s"""
|
|
|
+ comment_creator = lambda x: f'{comment if comment is not None else ""}_{str(x[3])}'
|
|
|
+ insert_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[1], x[2], comment_creator(x), x[3:]), new_rows))
|
|
|
+ execute_values(self.cur, insert_query, insert_rows, fetch=False)
|
|
|
|
|
|
- insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, comment, data) VALUES %s;"""
|
|
|
- insert_rows = list(map(lambda x: (sid, 'lxeuser', time_now, time_now, time_now + dlt0, x[0], x[1]), new_rows))
|
|
|
+ drop_query = f"""
|
|
|
+ DELETE FROM clbrdata a
|
|
|
+ USING clbrdata b
|
|
|
+ WHERE
|
|
|
+ a.sid = {sid}
|
|
|
+ AND a.cid > b.cid
|
|
|
+ AND a.sid = b.sid
|
|
|
+ AND a.comment = b.comment
|
|
|
+ """
|
|
|
+ self.cur.execute(drop_query)
|
|
|
|
|
|
- execute_values(self.cur, insert_query, insert_rows, fetch=False)
|
|
|
logging.info(f"Inserted {len(insert_rows)} rows into table: {system}/{algo}/{name}/{version}")
|
|
|
return
|
|
|
|
|
@@ -303,14 +312,23 @@ class CalibrdbHandler(PostgreSQLHandler):
|
|
|
logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
|
|
|
return
|
|
|
|
|
|
- def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default'):
|
|
|
+ def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', keep: str = 'last'):
|
|
|
sid = self.select_table(system, algo, name, version)
|
|
|
+
|
|
|
+ keep_rule = ''
|
|
|
+ if keep == 'last':
|
|
|
+ keep_rule = '<'
|
|
|
+ elif keep == 'first':
|
|
|
+ keep_rule = '>'
|
|
|
+ else:
|
|
|
+ raise ValueError("keep argument must be 'last' or 'first'")
|
|
|
+
|
|
|
remove_query = f"""
|
|
|
DELETE FROM clbrdata a
|
|
|
USING clbrdata b
|
|
|
WHERE
|
|
|
a.sid = {sid}
|
|
|
- AND a.cid < b.cid
|
|
|
+ AND a.cid {keep_rule} b.cid
|
|
|
AND a.sid = b.sid
|
|
|
AND a.time = b.time
|
|
|
"""
|
|
@@ -341,7 +359,7 @@ def main():
|
|
|
parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
|
|
|
parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
|
|
|
parser.add_argument('--config', help = 'Config file containing information for access to databases')
|
|
|
- parser.add_argument('--update', action = 'store_true', help = 'Only update table with new values')
|
|
|
+ parser.add_argument('--update', action = 'store_true', help = 'Writes only newest values into the db')
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
logging.info(f"Arguments: season: {args.season}, config {args.config}, update {args.update}")
|