compton_filter.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. """
  2. Script to fill calibration database with filtering slowdb compton measurements
  3. """
  4. import argparse
  5. from configparser import ConfigParser
  6. from datetime import datetime, timedelta
  7. from typing import Tuple, List, Dict, Union, Optional
  8. import warnings
  9. import logging
  10. import psycopg2
  11. from psycopg2.extras import execute_values
  12. class PostgreSQLHandler():
  13. """A common class for processing postgresql databases
  14. """
  15. def __init__(self, host: str = 'cmddb', database: str = 'slowdb', user: str = None, password: str = None):
  16. """
  17. Parameters
  18. ----------
  19. host : str
  20. host name (default is "cmddb")
  21. database : str
  22. database name (default is "slowdb")
  23. user : str
  24. username (default is None)
  25. password : str
  26. password (default is None)
  27. """
  28. self.conn = psycopg2.connect(host = host, database = database, user = user, password = password)
  29. self.cur = self.conn.cursor()
  30. logging.info("PostgreSQL Hander created")
  31. @property
  32. def list_tables(self) -> List[str]:
  33. """Returns list of existed tables in the compton measurements slowDB
  34. Returns
  35. -------
  36. List[str]
  37. list of tables
  38. """
  39. logging.info("Get list of the slowdb tables")
  40. self.cur.execute("""
  41. SELECT table_name FROM information_schema.tables
  42. WHERE table_schema = 'public'
  43. """)
  44. return list(map(lambda x: x[0], self.cur.fetchall()))
  45. def table_columns(self, table: str) -> List[str]:
  46. """Returns list of the fields of the table
  47. Parameters
  48. ----------
  49. table : str
  50. table from slowdb compton measurements
  51. Returns
  52. -------
  53. List[str]
  54. list of the fields
  55. """
  56. pass
  57. class SlowdbComptonHandler(PostgreSQLHandler):
  58. """A class for processing and filtering of compton measurements from slowdb
  59. """
  60. def __is_overlapped_row(self, start_time_next: datetime, stop_time_prev: datetime):
  61. gap = timedelta(seconds=2)
  62. if(start_time_next < stop_time_prev):
  63. logging.debug(f'time gap {abs(start_time_next - stop_time_prev)}')
  64. return start_time_next < stop_time_prev - gap
  65. def __drop_overlapping_rows_list(self, table: list) -> list:
  66. """Removes rows with overlapping time intervals from the table
  67. Parameters
  68. ----------
  69. table : list
  70. the table MUST BE ORDERED BY TIME where 5th column is start_time, 6th column is end_time
  71. Returns
  72. -------
  73. list
  74. clear table
  75. """
  76. if len(table) == 0:
  77. logging.info("Empty list. No overlapping rows")
  78. return table
  79. logging.info("Drop overlapping rows in list representation")
  80. table = table[::-1] # sort table by time from last to past
  81. min_time = table[0][6]
  82. overlapped_idxs = list()
  83. for idx, row in enumerate(table):
  84. start_time, stop_time = row[5], row[6]
  85. if self.__is_overlapped_row(min_time, stop_time):
  86. overlapped_idxs.append(idx)
  87. else:
  88. min_time = start_time
  89. for index in sorted(overlapped_idxs, reverse=True): # strict condition of the backward loop
  90. table.pop(index)
  91. return table[::-1]
  92. def load_tables(self, tables: List[str], daterange: Optional[datetime] = None):
  93. """Returns tables containing compton energy measurements
  94. Parameters
  95. ----------
  96. tables : List[str]
  97. names of tables in the slowdb compton measurements database
  98. (full list of available tables can be seen with the property tables)
  99. daterange : Optional[datetime]
  100. minimum time for selection (UTC)
  101. Returns
  102. -------
  103. Union[pd.DataFrame, list]
  104. table containing compton energy measurements with fields:
  105. write_time - time when the row was written (UTC)
  106. mean_energy - compton mean of the energy measurement [MeV]
  107. std_energy - compton std of the energy measurement [MeV]
  108. mean_spread - compton mean of the spread measurement [MeV]
  109. std_spread - compton std of the spread measurement [MeV]
  110. start_time - beginning time of the compton measurement (UTC)
  111. end_time - end time of the compton measurement (UTC)
  112. """
  113. time_condition = f"AND time>(%(date)s AT TIME ZONE 'UTC')" if daterange is not None else ""
  114. sql_query = lambda table: f"""SELECT
  115. time AT TIME ZONE 'UTC' AS time,
  116. CAST(values_array[1] AS numeric) AS mean_energy,
  117. CAST(values_array[2] AS numeric) AS std_energy,
  118. ROUND(CAST(values_array[5]/1000 AS numeric), 6) AS mean_spread,
  119. ROUND(CAST(values_array[6]/1000 AS numeric), 6) AS std_spread,
  120. date_trunc('second', time AT TIME ZONE 'UTC' + (values_array[8] * interval '1 second')) AS start_time,
  121. date_trunc('second', time AT TIME ZONE 'UTC' +
  122. (values_array[8] * interval '1 second') + (values_array[7] * interval '1 second')) AS stop_time
  123. FROM {table} WHERE g_id=43 AND dt>0 {time_condition}"""
  124. full_sql_query = '\nUNION ALL\n'.join([sql_query(table) for table in tables]) + '\nORDER BY time;'
  125. logging.debug(f"Full sql query {full_sql_query}")
  126. self.cur.execute(full_sql_query, {'date': daterange})
  127. table = self.cur.fetchall()
  128. table = self.__drop_overlapping_rows_list(table)
  129. return table
  130. class CalibrdbHandler(PostgreSQLHandler):
  131. """A class for processing of calibration database
  132. """
  133. def select_table(self, system: str, algo: str, name: str, version: str = 'Default', verbose: bool = True) -> int:
  134. """Selects the table from database
  135. Parameters
  136. ----------
  137. system : str
  138. name of the system
  139. algo : str
  140. name of the algorithm
  141. name : str
  142. name of the calibration
  143. version : str
  144. name of the calibration version (default is Default)
  145. verbose : bool
  146. print additional information or not (default is False)
  147. Returns
  148. -------
  149. sid : int
  150. value corresponding the table
  151. """
  152. self.cur.execute(f"""SELECT * FROM clbrset
  153. WHERE system='{system}' AND algo='{algo}' AND name='{name}' AND version='{version}'""")
  154. result = self.cur.fetchall()
  155. logging.debug(f"selected clbrset: {result}")
  156. if len(result) > 1:
  157. logging.warning('Multiple equal calibration sets. clbrset DB problem')
  158. return result[0]
  159. sid = result[0][0]
  160. return sid
  161. def load_table(self, system: str, algo: str, name: str, version: str = 'Default', num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None) -> Tuple[list, list]:
  162. """Loads the calibration table
  163. Parameters
  164. ----------
  165. system : str
  166. name of the system
  167. algo : str
  168. name of the algorithm
  169. name : str
  170. name of the calibration
  171. version : str
  172. name of the calibration version (default is Default)
  173. num_last_rows : Optional[int]
  174. the number of last rows of the table
  175. timerange : Optional[Tuple[datetime, datetime]]
  176. time range (UTC) condition on the selection of the table (default is None)
  177. Returns
  178. -------
  179. Tuple[list, list]
  180. the calibration table and name of fields
  181. """
  182. sid = self.select_table(system, algo, name, version)
  183. time_condition = "AND begintime BETWEEN %s AND %s" if timerange is not None else ""
  184. sql_query = f"""SELECT * FROM clbrdata WHERE sid={sid} {time_condition} ORDER BY time DESC """
  185. if num_last_rows is not None:
  186. sql_query += f"LIMIT {num_last_rows}"
  187. if timerange is None:
  188. self.cur.execute(sql_query)
  189. else:
  190. self.cur.execute(sql_query, timerange)
  191. fields_name = [i[0] for i in self.cur.description]
  192. table = self.cur.fetchall()
  193. return table, fields_name
  194. def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = True):
  195. if len(new_rows) == 0:
  196. return
  197. sid = self.select_table(system, algo, name, version)
  198. new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
  199. if handle_last_time_row:
  200. last_written_row, _ = self.load_table(system, algo, name, version, num_last_rows = 1)
  201. if len(last_written_row) > 0:
  202. if last_written_row[0][5] > new_rows[0][3]:
  203. logging.info('Removing of overlapping written row')
  204. self.delete_row(sid = last_written_row[0][1], createdby = last_written_row[0][2], time = last_written_row[0][3])
  205. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, data) VALUES %s;"""
  206. execute_values(self.cur, insert_query, new_rows, fetch=False)
  207. logging.info(f"Inserted {len(new_rows)} new rows")
  208. return
  209. def delete_row(self, sid: int, createdby: str, time: datetime):
  210. delete_query = f"""DELETE FROM clbrdata
  211. WHERE sid = %s AND createdby = %s AND time = %s
  212. """
  213. self.cur.execute(delete_query, (sid, createdby, time))
  214. logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
  215. return
  216. def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default'):
  217. sid = self.select_table(system, algo, name, version)
  218. remove_query = f"""
  219. DELETE FROM clbrdata a
  220. USING clbrdata b
  221. WHERE
  222. a.sid = {sid}
  223. AND a.cid < b.cid
  224. AND a.sid = b.sid
  225. AND a.time = b.time
  226. """
  227. self.cur.execute(remove_query)
  228. pass
  229. def commit(self):
  230. logging.info("Changes commited")
  231. self.conn.commit()
  232. return
  233. def rollback(self):
  234. logging.info("Changes aborted")
  235. self.conn.rollback()
  236. return
  237. def __del__(self):
  238. logging.info("del clbr class")
  239. self.cur.close()
  240. self.conn.close()
  241. def main():
  242. log_format = '[%(asctime)s] %(levelname)s: %(message)s'
  243. logging.basicConfig(filename="compton_filter.log", format=log_format, level=logging.INFO)
  244. logging.info("Program started")
  245. parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
  246. parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
  247. parser.add_argument('--config', help = 'Config file containing information for access to databases')
  248. args = parser.parse_args()
  249. logging.info(f"Arguments: season: {args.season}, config {args.config}")
  250. parser = ConfigParser()
  251. parser.read(args.config);
  252. logging.info("Config parsed")
  253. clbrdb = CalibrdbHandler(**parser['clbrDB'])
  254. last_written_row, _ = clbrdb.load_table('Misc', 'RunHeader', 'Compton_run', num_last_rows = 1)
  255. last_time = last_written_row[0][3] if len(last_written_row) > 0 else None
  256. compton_slowdb = SlowdbComptonHandler(**parser['postgresql'])
  257. res = compton_slowdb.load_tables(['cmd3_2021_2'], last_time)
  258. clbrdb.update(res)
  259. clbrdb.commit()
  260. del clbrdb
  261. # python scripts/compton_filter.py --season cmd3_2021_2 --config database.ini
  262. if __name__ == "__main__":
  263. main()