123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- """
- Script to fill calibration database with filtering slowdb compton measurements
- """
- import argparse
- from configparser import ConfigParser
- from datetime import datetime, timedelta, timezone
- import sys
- from typing import Tuple, List, Dict, Union, Optional
- import warnings
- import logging
- try:
- import psycopg2
- from psycopg2.extras import execute_values
- except ImportError:
- sys.path = list(filter(lambda x: "python2.7" not in x, sys.path))
- import psycopg2
- from psycopg2.extras import execute_values
- class PostgreSQLHandler():
- """A common class for processing postgresql databases
- """
- def __init__(self, host: str = 'cmddb', database: str = 'slowdb', user: str = None, password: str = None):
- """
- Parameters
- ----------
- host : str
- host name (default is "cmddb")
- database : str
- database name (default is "slowdb")
- user : str
- username (default is None)
- password : str
- password (default is None)
- """
- self.conn = psycopg2.connect(host = host, database = database, user = user, password = password)
- self.cur = self.conn.cursor()
- logging.info("PostgreSQL Handler created")
- @property
- def list_tables(self) -> List[str]:
- """Returns list of existed tables in the compton measurements slowDB
- Returns
- -------
- List[str]
- list of tables
- """
- logging.info("Get list of the slowdb tables")
- self.cur.execute("""
- SELECT table_name FROM information_schema.tables
- WHERE table_schema = 'public'
- """)
- return list(map(lambda x: x[0], self.cur.fetchall()))
- class SlowdbComptonHandler(PostgreSQLHandler):
- """A class for processing and filtering of compton measurements from slowdb
- """
- def __is_overlapped_row(self, start_time_next: datetime, stop_time_prev: datetime):
- gap = timedelta(seconds=2)
- if(start_time_next < stop_time_prev):
- logging.debug(f'time gap {abs(start_time_next - stop_time_prev)}')
- return start_time_next < stop_time_prev - gap
- def __drop_overlapping_rows_list(self, table: list) -> list:
- """Removes rows with overlapping time intervals from the table
- Parameters
- ----------
- table : list
- the table MUST BE ORDERED BY TIME where 5th column is start_time, 6th column is end_time
- Returns
- -------
- list
- clear table
- """
- n_rows = len(table)
- if n_rows == 0:
- logging.info("Empty list. No overlapping rows")
- return table
- table = table[::-1] # sort table by time from last to past
- min_time = table[0][6]
- overlapped_idxs = list()
- for idx, row in enumerate(table):
- start_time, stop_time = row[5], row[6]
- if self.__is_overlapped_row(min_time, stop_time):
- overlapped_idxs.append(idx)
- else:
- min_time = start_time
- for index in sorted(overlapped_idxs, reverse=True): # strict condition of the backward loop
- table.pop(index)
- logging.info(f"Drop overlapping rows in list representation. Survived {len(table)} from {n_rows}")
- return table[::-1]
- def load_tables(self, tables: List[str], daterange: Optional[datetime] = None):
- """Returns tables containing compton energy measurements
- Parameters
- ----------
- tables : List[str]
- names of tables in the slowdb compton measurements database
- (full list of available tables can be seen with the property tables)
- daterange : Optional[datetime]
- minimum time for selection (should contain timezone)
- Returns
- -------
- Union[pd.DataFrame, list]
- table containing compton energy measurements with fields:
- write_time - time when the row was written (contains timezone)
- mean_energy - compton mean of the energy measurement [MeV]
- std_energy - compton std of the energy measurement [MeV]
- mean_spread - compton mean of the spread measurement [MeV]
- std_spread - compton std of the spread measurement [MeV]
- start_time - beginning time of the compton measurement (contains timezone)
- end_time - end time of the compton measurement (contains timezone)
- """
- time_condition = f"AND time>(%(date)s)" if daterange is not None else ""
- sql_query = lambda table: f"""SELECT
- time AS time,
- CAST(values_array[1] AS numeric) AS mean_energy,
- CAST(values_array[2] AS numeric) AS std_energy,
- ROUND(CAST(values_array[5]/1000 AS numeric), 6) AS mean_spread,
- ROUND(CAST(values_array[6]/1000 AS numeric), 6) AS std_spread,
- date_trunc('second', time + (values_array[8] * interval '1 second')) AS start_time,
- date_trunc('second', time + (values_array[8] * interval '1 second') + (values_array[7] * interval '1 second')) AS stop_time
- FROM {table} WHERE g_id=43 AND dt>0 {time_condition}"""
- full_sql_query = '\nUNION ALL\n'.join([sql_query(table) for table in tables]) + '\nORDER BY time;'
- logging.debug(f"Full sql query {full_sql_query}")
- self.cur.execute(full_sql_query, {'date': daterange})
- table = self.cur.fetchall()
- table = self.__drop_overlapping_rows_list(table)
- return table
- class CalibrdbHandler(PostgreSQLHandler):
- """A class for processing of calibration database
- """
- def select_table(self, system: str, algo: str, name: str, version: str = 'Default') -> int:
- """Selects the table from database
- Parameters
- ----------
- system : str
- name of the system
- algo : str
- name of the algorithm
- name : str
- name of the calibration
- version : str
- name of the calibration version (default is Default)
- Returns
- -------
- sid : int
- value corresponding the table
- """
- self.cur.execute(f"""SELECT * FROM clbrset
- WHERE system='{system}' AND algo='{algo}' AND name='{name}' AND version='{version}'""")
- result = self.cur.fetchall()
- logging.debug(f"selected clbrset: {result}")
- if len(result) > 1:
- logging.warning('Multiple equal calibration sets. clbrset DB problem')
- return result[0]
- sid = result[0][0]
- return sid
- def load_table(self, system: str, algo: str, name: str, version: str = 'Default',
- num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None,
- return_timezone: bool = False) -> Tuple[list, list]:
- """Loads the calibration table
- Parameters
- ----------
- system : str
- name of the system
- algo : str
- name of the algorithm
- name : str
- name of the calibration
- version : str
- name of the calibration version (default is Default)
- num_last_rows : Optional[int]
- the number of last rows of the table
- timerange : Optional[Tuple[datetime, datetime]]
- time range condition on the selection of the table (default is None)
- return_timezone : bool
- return timezone in output datetimes as a field or not (default is False)
- Returns
- -------
- Tuple[list, list]
- the calibration table and name of fields
- """
- sid = self.select_table(system, algo, name, version)
- time_condition = "AND begintime BETWEEN %s AND %s" if timerange is not None else ""
- tzone = "AT TIME ZONE 'ALMST'" if return_timezone else ''
- sql_query = f"""SELECT
- cid, sid, createdby,
- time {tzone} AS time,
- begintime {tzone} AS begintime,
- endtime {tzone} AS endtime,
- comment, parameters, data
- FROM clbrdata WHERE sid={sid} {time_condition} ORDER BY time DESC """
- if num_last_rows is not None:
- sql_query += f"LIMIT {num_last_rows}"
- if timerange is None:
- self.cur.execute(sql_query)
- else:
- self.cur.execute(sql_query, timerange)
- fields_name = [i[0] for i in self.cur.description]
- table = self.cur.fetchall()
- return table, fields_name
- def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader",
- name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = False):
- """Writes new_rows in clbrdb (for raw compton measurements)
- Parameters
- ----------
- new_rows : list
- list of the data for writing
- handle_last_time_row : bool
- (DANGEROUS PLACE - keep default False or don't commit changes if you don't know what you want)
- update current values or not: replace all values in interval from min(begintime in new_rows) to max(endtime in new_rows)
- """
- logging.info(f"Update {system}/{algo}/{name} is running...")
- if len(new_rows) == 0:
- logging.info("Success. Nothing new.")
- return
- sid = self.select_table(system, algo, name, version)
- new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
- if handle_last_time_row:
- min_new_time, max_new_time = min(map(lambda x: x[3], new_rows)), max(map(lambda x: x[4], new_rows))
- self.delete_rows(sid = sid, createdby = 'lxeuser', time = (min_new_time, max_new_time))
- insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, data) VALUES %s;"""
- execute_values(self.cur, insert_query, new_rows, fetch=False)
- logging.info(f"Success. Inserted {len(new_rows)} new rows")
- return
- def insert(self, new_rows: list, system: str, algo: str, name: str, version: str,
- update: bool = True, comment: Optional[str] = None):
- """Insert new_rows in the table (for average by energy points)
- Parameters
- ----------
- new_rows : list
- list of new rows in the follwing format
- update : bool
- update current calibration
- comment : Optional[str]
- common comment field
- """
- sid = self.select_table(system, algo, name, version)
- if update:
- update_query = f"""UPDATE clbrdata
- SET data = %(data)s, createdby = %(createdby)s, time = %(time)s, begintime = %(begintime)s, endtime = %(endtime)s
- WHERE sid = %(sid)s AND comment = %(comment)s
- """
- for x in new_rows:
- season_point = (comment if comment is not None else '') + '_' + str(x[4]) + '_' + str(x[3])
- dict_row = {
- 'sid': sid,
- 'createdby': 'lxeuser',
- 'time': x[0],
- 'begintime': x[1],
- 'endtime': x[2],
- 'comment': season_point,
- 'data': x[3:],
- }
- self.cur.execute(update_query, dict_row)
- insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, comment, data) VALUES %s"""
- comment_creator = lambda x: f'{comment if comment is not None else ""}_{str(x[4])}_{str(x[3])}'
- insert_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[1], x[2], comment_creator(x), x[3:]), new_rows))
- execute_values(self.cur, insert_query, insert_rows, fetch=False)
- drop_query = f"""
- DELETE FROM clbrdata a
- USING clbrdata b
- a.sid = {sid}
- AND a.cid < b.cid
- AND a.sid = b.sid
- AND (a.begintime = b.begintime OR a.endtime = b.endtime)
- """
- #AND a.comment = b.comment
- self.cur.execute(drop_query)
- logging.info(f"Inserted {len(insert_rows)} rows into table: {system}/{algo}/{name}/{version}")
- return
- def clear_table(self, sid: int, createdby: str):
- delete_query = f"""DELETE FROM clbrdata WHERE sid = %s AND createdby = %s"""
- logging.info(f"Clear ({sid}, {createdby}) table")
- self.cur.execute(delete_query, (sid, createdby))
- return
- def delete_row(self, sid: int, createdby: str, time: datetime):
- delete_query = f"""DELETE FROM clbrdata
- WHERE sid = %s AND createdby = %s AND time = %s
- """
- self.cur.execute(delete_query, (sid, createdby, time))
- logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
- return
- def delete_rows(self, sid: int, createdby: str, time: Tuple[datetime, datetime]):
- delete_query = f"""DELETE FROM clbrdata
- WHERE sid = %s AND createdby = %s AND endtime > %s AND begintime < %s
- """
- self.cur.execute(delete_query, (sid, createdby, time[0], time[1]))
- logging.info(f"Deleted ({sid}, {createdby} from {time[0]} to {time[1]}) rows")
- return
- def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', keep: str = 'last'):
- sid = self.select_table(system, algo, name, version)
- keep_rule = ''
- if keep == 'last':
- keep_rule = '<'
- elif keep == 'first':
- keep_rule = '>'
- else:
- raise ValueError("keep argument must be 'last' or 'first'")
- remove_query = f"""
- DELETE FROM clbrdata a
- USING clbrdata b
- a.sid = {sid}
- AND a.cid {keep_rule} b.cid
- AND a.sid = b.sid
- AND a.time = b.time
- """
- self.cur.execute(remove_query)
- pass
- def commit(self):
- logging.info("Changes commited")
- self.conn.commit()
- return
- def rollback(self):
- logging.info("Changes aborted")
- self.conn.rollback()
- return
- def __del__(self):
- logging.info("del clbr class")
- self.cur.close()
- self.conn.close()
- def processing_from_file(path):
- """Processes text files (with names like 'vepp2k.edge.txt') that represent compton reanalyses by N.Muchnoi
- Parameters
- ----------
- path : Union[str, list]
- path to the file/files, can be mask if str
- Returns
- -------
- List[Tuple[datetime, Decimal, Decimal, Decimal, Decimal, datetime, datetime]]
- list of tuples representing a single compton measurement with fields:
- writetime, energy_mean, energy_err, spread_mean, spread_err, starttime, stoptime
- """
- logging.info("Reading from files is running...")
- from glob import glob
- from decimal import Decimal
- from datetime import datetime, timedelta, timezone
- if isinstance(path, str):
- files = glob(path)
- else:
- files = path
- logging.info(f"Handle {len(files)} files")
- rows = []
- current_timezone = timedelta(hours=7)
- # colnames = ('t', 'dt', 'E', 'dE', 'S', 'dS', 'B', 'dB', 'I', 'dI') # columns names in the text files
- def preprocess_row(row):
- row = row.strip().split()
- timestamp_mean, dt = int(row[0]), int(row[1])
- timetamp_to_date = lambda timestamp: (datetime.utcfromtimestamp(timestamp) + current_timezone).astimezone(timezone(current_timezone))
- t_start = timetamp_to_date(timestamp_mean - dt)
- t_stop = timetamp_to_date(timestamp_mean + dt)
- t_write = t_stop
- e_mean, de = Decimal(row[2]), Decimal(row[3])
- s_mean, ds = Decimal(row[4])*Decimal('0.001'), Decimal(row[5])*Decimal('0.001') # keV to MeV
- return (t_write, e_mean, de, s_mean, ds, t_start, t_stop)
- for file in files:
- with open(file, 'r') as f:
- for row in f:
- if not(row.startswith('#')):
- rows.append(preprocess_row(row))
- logging.info(f"Success files reading. {len(rows)} in result.")
- return rows
- def main():
- log_format = '[%(asctime)s] %(levelname)s: %(message)s'
- logging.basicConfig(stream=sys.stdout, format=log_format, level=logging.INFO) #"filename=compton_filter.log"
- logging.info("Program started")
- parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
- parser.add_argument('--config', help = 'Config file containing information for access to databases')
- parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
- parser.add_argument('--update', action = 'store_true', help = 'Writes only newest values into the db')
- parser.add_argument('--files', nargs='*', help = """Mask to the path to the files like vepp2k.edge.txt. It has a higher priority than season.
- Update flag will be set as True if using this option.""")
- args = parser.parse_args()
- if args.files is not None:
- args.update = True
- logging.info(f"Arguments: config {args.config}, season: {args.season}, update {args.update}, files {args.files}")
- parser = ConfigParser()
- parser.read(args.config);
- logging.info("Config parsed")
- clbrdb = CalibrdbHandler(**parser['clbrDB'])
- last_written_row, _ = clbrdb.load_table('Misc', 'RunHeader', 'Compton_run', num_last_rows = 1, return_timezone = True)
- last_time = last_written_row[0][3] if (len(last_written_row) > 0) and (args.update) else None
- if args.files is None:
- compton_slowdb = SlowdbComptonHandler(**parser['postgresql'])
- res = compton_slowdb.load_tables([args.season], last_time)
- else:
- res = processing_from_file(args.files)
- clbrdb.update(res, handle_last_time_row = args.update)
- clbrdb.commit()
- del clbrdb
- # python scripts/compton_filter.py --season cmd3_2021_2 --config database.ini --update
- if __name__ == "__main__":
- main()