123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- """
- Script to fill calibration database with filtering slowdb compton measurements
- """
- import argparse
- from configparser import ConfigParser
- from datetime import datetime, timedelta
- from typing import Tuple, List, Dict, Union, Optional
- import warnings
- import logging
- import psycopg2
- from psycopg2.extras import execute_values
- class PostgreSQLHandler():
- """A common class for processing postgresql databases
- """
-
- def __init__(self, host: str = 'cmddb', database: str = 'slowdb', user: str = None, password: str = None):
- """
- Parameters
- ----------
- host : str
- host name (default is "cmddb")
- database : str
- database name (default is "slowdb")
- user : str
- username (default is None)
- password : str
- password (default is None)
- """
-
- self.conn = psycopg2.connect(host = host, database = database, user = user, password = password)
- self.cur = self.conn.cursor()
- logging.info("PostgreSQL Hander created")
-
- @property
- def list_tables(self) -> List[str]:
- """Returns list of existed tables in the compton measurements slowDB
-
- Returns
- -------
- List[str]
- list of tables
- """
-
- logging.info("Get list of the slowdb tables")
- self.cur.execute("""
- SELECT table_name FROM information_schema.tables
- WHERE table_schema = 'public'
- """)
- return list(map(lambda x: x[0], self.cur.fetchall()))
-
- def table_columns(self, table: str) -> List[str]:
- """Returns list of the fields of the table
-
- Parameters
- ----------
- table : str
- table from slowdb compton measurements
-
- Returns
- -------
- List[str]
- list of the fields
- """
- pass
- class SlowdbComptonHandler(PostgreSQLHandler):
- """A class for processing and filtering of compton measurements from slowdb
- """
-
- def __is_overlapped_row(self, start_time_next: datetime, stop_time_prev: datetime):
- gap = timedelta(seconds=2)
- if(start_time_next < stop_time_prev):
- logging.debug(f'time gap {abs(start_time_next - stop_time_prev)}')
- return start_time_next < stop_time_prev - gap
-
- def __drop_overlapping_rows_list(self, table: list) -> list:
- """Removes rows with overlapping time intervals from the table
-
- Parameters
- ----------
- table : list
- the table MUST BE ORDERED BY TIME where 5th column is start_time, 6th column is end_time
-
- Returns
- -------
- list
- clear table
- """
-
- if len(table) == 0:
- logging.info("Empty list. No overlapping rows")
- return table
-
- logging.info("Drop overlapping rows in list representation")
- table = table[::-1] # sort table by time from last to past
- min_time = table[0][6]
- overlapped_idxs = list()
-
- for idx, row in enumerate(table):
- start_time, stop_time = row[5], row[6]
- if self.__is_overlapped_row(min_time, stop_time):
- overlapped_idxs.append(idx)
- else:
- min_time = start_time
-
- for index in sorted(overlapped_idxs, reverse=True): # strict condition of the backward loop
- table.pop(index)
-
- return table[::-1]
-
- def load_tables(self, tables: List[str], daterange: Optional[datetime] = None):
- """Returns tables containing compton energy measurements
-
- Parameters
- ----------
- tables : List[str]
- names of tables in the slowdb compton measurements database
- (full list of available tables can be seen with the property tables)
- daterange : Optional[datetime]
- minimum time for selection (UTC)
-
- Returns
- -------
- Union[pd.DataFrame, list]
- table containing compton energy measurements with fields:
- write_time - time when the row was written (UTC)
- mean_energy - compton mean of the energy measurement [MeV]
- std_energy - compton std of the energy measurement [MeV]
- mean_spread - compton mean of the spread measurement [MeV]
- std_spread - compton std of the spread measurement [MeV]
- start_time - beginning time of the compton measurement (UTC)
- end_time - end time of the compton measurement (UTC)
- """
-
- time_condition = f"AND time>(%(date)s AT TIME ZONE 'UTC')" if daterange is not None else ""
-
- sql_query = lambda table: f"""SELECT
- time AT TIME ZONE 'UTC' AS time,
- CAST(values_array[1] AS numeric) AS mean_energy,
- CAST(values_array[2] AS numeric) AS std_energy,
- ROUND(CAST(values_array[5]/1000 AS numeric), 6) AS mean_spread,
- ROUND(CAST(values_array[6]/1000 AS numeric), 6) AS std_spread,
- date_trunc('second', time AT TIME ZONE 'UTC' + (values_array[8] * interval '1 second')) AS start_time,
- date_trunc('second', time AT TIME ZONE 'UTC' +
- (values_array[8] * interval '1 second') + (values_array[7] * interval '1 second')) AS stop_time
- FROM {table} WHERE g_id=43 AND dt>0 {time_condition}"""
-
- full_sql_query = '\nUNION ALL\n'.join([sql_query(table) for table in tables]) + '\nORDER BY time;'
-
- logging.debug(f"Full sql query {full_sql_query}")
-
- self.cur.execute(full_sql_query, {'date': daterange})
- table = self.cur.fetchall()
- table = self.__drop_overlapping_rows_list(table)
- return table
-
- class CalibrdbHandler(PostgreSQLHandler):
- """A class for processing of calibration database
- """
-
- def select_table(self, system: str, algo: str, name: str, version: str = 'Default', verbose: bool = True) -> int:
- """Selects the table from database
-
- Parameters
- ----------
- system : str
- name of the system
- algo : str
- name of the algorithm
- name : str
- name of the calibration
- version : str
- name of the calibration version (default is Default)
- verbose : bool
- print additional information or not (default is False)
-
- Returns
- -------
- sid : int
- value corresponding the table
- """
-
- self.cur.execute(f"""SELECT * FROM clbrset
- WHERE system='{system}' AND algo='{algo}' AND name='{name}' AND version='{version}'""")
- result = self.cur.fetchall()
- logging.debug(f"selected clbrset: {result}")
- if len(result) > 1:
- logging.warning('Multiple equal calibration sets. clbrset DB problem')
- return result[0]
- sid = result[0][0]
- return sid
-
- def load_table(self, system: str, algo: str, name: str, version: str = 'Default', num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None) -> Tuple[list, list]:
- """Loads the calibration table
-
- Parameters
- ----------
- system : str
- name of the system
- algo : str
- name of the algorithm
- name : str
- name of the calibration
- version : str
- name of the calibration version (default is Default)
- num_last_rows : Optional[int]
- the number of last rows of the table
- timerange : Optional[Tuple[datetime, datetime]]
- time range (UTC) condition on the selection of the table (default is None)
-
- Returns
- -------
- Tuple[list, list]
- the calibration table and name of fields
- """
-
- sid = self.select_table(system, algo, name, version)
- time_condition = "AND begintime BETWEEN %s AND %s" if timerange is not None else ""
- sql_query = f"""SELECT * FROM clbrdata WHERE sid={sid} {time_condition} ORDER BY time DESC """
- if num_last_rows is not None:
- sql_query += f"LIMIT {num_last_rows}"
-
- if timerange is None:
- self.cur.execute(sql_query)
- else:
- self.cur.execute(sql_query, timerange)
- fields_name = [i[0] for i in self.cur.description]
- table = self.cur.fetchall()
- return table, fields_name
-
- def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = True):
- if len(new_rows) == 0:
- return
-
- sid = self.select_table(system, algo, name, version)
-
- new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
-
- if handle_last_time_row:
- last_written_row, _ = self.load_table(system, algo, name, version, num_last_rows = 1)
- if len(last_written_row) > 0:
- if last_written_row[0][5] > new_rows[0][3]:
- logging.info('Removing of overlapping written row')
- self.delete_row(sid = last_written_row[0][1], createdby = last_written_row[0][2], time = last_written_row[0][3])
-
- insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, data) VALUES %s;"""
- execute_values(self.cur, insert_query, new_rows, fetch=False)
- logging.info(f"Inserted {len(new_rows)} new rows")
- return
-
- def delete_row(self, sid: int, createdby: str, time: datetime):
- delete_query = f"""DELETE FROM clbrdata
- WHERE sid = %s AND createdby = %s AND time = %s
- """
- self.cur.execute(delete_query, (sid, createdby, time))
- logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
- return
-
- def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default'):
- sid = self.select_table(system, algo, name, version)
- remove_query = f"""
- DELETE FROM clbrdata a
- USING clbrdata b
- WHERE
- a.sid = {sid}
- AND a.cid < b.cid
- AND a.sid = b.sid
- AND a.time = b.time
- """
- self.cur.execute(remove_query)
- pass
-
- def commit(self):
- logging.info("Changes commited")
- self.conn.commit()
- return
- def rollback(self):
- logging.info("Changes aborted")
- self.conn.rollback()
- return
-
- def __del__(self):
- logging.info("del clbr class")
- self.cur.close()
- self.conn.close()
-
-
- def main():
- log_format = '[%(asctime)s] %(levelname)s: %(message)s'
- logging.basicConfig(filename="compton_filter.log", format=log_format, level=logging.INFO)
- logging.info("Program started")
-
- parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
- parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
- parser.add_argument('--config', help = 'Config file containing information for access to databases')
-
- args = parser.parse_args()
- logging.info(f"Arguments: season: {args.season}, config {args.config}")
- parser = ConfigParser()
- parser.read(args.config);
- logging.info("Config parsed")
-
- clbrdb = CalibrdbHandler(**parser['clbrDB'])
- last_written_row, _ = clbrdb.load_table('Misc', 'RunHeader', 'Compton_run', num_last_rows = 1)
- last_time = last_written_row[0][3] if len(last_written_row) > 0 else None
-
- compton_slowdb = SlowdbComptonHandler(**parser['postgresql'])
- res = compton_slowdb.load_tables(['cmd3_2021_2'], last_time)
-
- clbrdb.update(res)
- clbrdb.commit()
- del clbrdb
-
- # python scripts/compton_filter.py --season cmd3_2021_2 --config database.ini
- if __name__ == "__main__":
- main()
|