"""Stores that keep the entire dataset in memory"""importgzipimportloggingfromshutilimportmovefrompathlibimportPathfromthreadingimportEventfromtypingimportIterablefromtimeimportmonotonic,sleepfromconcurrent.futuresimportThreadPoolExecutor,Futurefromexamol.store.db.baseimportMoleculeStorefromexamol.store.modelsimportMoleculeRecordlogger=logging.getLogger(__name__)
[docs]classInMemoryStore(MoleculeStore):"""Store all molecule records in memory, write to disk as a single file The class will start checkpointing as soon as any record is updated but no more frequently than :attr:`write_freq` Args: path: Path from which to read data. Must be a JSON file, can be compressed with GZIP. Set to ``None`` if you do not want data to be stored write_freq: Minimum time between writing checkpoints """def__init__(self,path:Path|None,write_freq:float=10.):self.path=Path(path)ifpathisnotNoneelsepathself.write_freq=write_freqself.db:dict[str,MoleculeRecord]={}# Start thread which writes untilself._thread_pool:ThreadPoolExecutor|None=Noneself._write_thread:Future|None=Noneself._updates_available:Event=Event()self._closing=Event()# Start by loading the moleculesself._load_molecules()def__enter__(self):ifself.pathisnotNone:logger.info('Start the writing thread')self._thread_pool=ThreadPoolExecutor(max_workers=1)self._write_thread=self._thread_pool.submit(self._writer)# Add a callback to print a logging message if there is an errordef_write_if_error(future:Future):if(exc:=future.exception())isnotNone:logger.warning(f'Write thread failed: {exc}')logger.info('Write thread has exited')self._write_thread.add_done_callback(_write_if_error)returnselfdef__exit__(self,exc_type,exc_val,exc_tb):ifself.pathisnotNone:# Trigger a last writelogger.info('Triggering a last write to the database')self._closing.set()self._write_thread.result()# Mark that we're closedself._write_thread=Noneself._closing.clear()self._thread_pool.shutdown()def_load_molecules(self):"""Load molecules from disk"""ifself.pathisNoneornotself.path.is_file():returnlogger.info(f'Loading data from {self.path}')with(gzip.open(self.path,'rt')ifself.path.name.endswith('.gz')elseself.path.open())asfp:forlineinfp:record=MoleculeRecord.parse_raw(line)self.db[record.key]=recordlogger.info(f'Loaded {len(self.db)} molecule records')
def__getitem__(self,item):returnself.db[item]def__len__(self):returnlen(self.db)def__contains__(self,item:str|MoleculeRecord):mol_key=item.keyifisinstance(item,MoleculeRecord)elseitemreturnmol_keyinself.dbdef_writer(self):next_write=0whileself._updates_available.is_set()ornotself._closing.is_set():# Wait until updates are available and the standoff is not met, or if we're closingwhilemonotonic()<next_writeornotself._closing.is_set():ifself._updates_available.wait(timeout=1):# Check for termination condition once per secondto_sleep=next_write-monotonic()ifto_sleep>0:sleep(to_sleep)break# Mark that we've caught up with whatever signaled this threadself._updates_available.clear()# Checkpoint and advance the standofftemp_path=self.path.parent/("new-"+self.path.name)logger.info(f'Started writing {len(self.db)} records to {temp_path}')self.export_records(temp_path)move(temp_path,self.path)next_write=monotonic()+self.write_freq