'''
Created on 2021-07-26
@author: wf
'''
from typing import Callable
from corpus.config import EventDataSourceConfig
from lodstorage.csv import CSV
from lodstorage.entity import EntityManager
from lodstorage.jsonable import JSONAble
from lodstorage.lod import LOD
from lodstorage.sql import SQLDB
from corpus.utils.download import Profiler
from lodstorage.storageconfig import StorageConfig
from corpus.quality.rating import RatingManager
from corpus.eventrating import EventRating,EventSeriesRating
from lodstorage.sparql import SPARQL
from lodstorage.schema import Schema
from lodstorage.uml import UML
from lodstorage.query import QueryManager
import os
import sys
from datetime import datetime
import re
[docs]class EventStorage:
'''
common storage aspects of the EventManager and EventSeriesManager
'''
profile=True
withShowProgress=False
viewTableExcludes={
"event":
["event_acm",
"event_ceurws",
"event_orclonebackup",
"event_or",
"event_orbackup"],
"eventseries":
["eventseries_acm",
"eventseries_or",
"eventseries_orbackup",
"eventseries_orclonebackup",
"eventseries_gnd"]
}
[docs] @staticmethod
def getStorageConfig(debug:bool=False,mode='sql')->StorageConfig:
'''
get the storageConfiguration
Args:
debug(bool): if True show debug information
mode(str): sql or json
Return:
StorageConfig: the storage configuration to be used
'''
if mode=='sql':
config=StorageConfig.getSQL(debug=debug)
elif mode=='json':
config=StorageConfig.getJSON()
elif mode=='jsonpickle':
config=StorageConfig.getJsonPickle(debug=debug)
else:
raise Exception(f"invalid mode {mode}")
config.cacheDirName="conferencecorpus"
cachedir=config.getCachePath()
config.profile=EventStorage.profile
config.withShowProgress=EventStorage.withShowProgress
if mode=='sql':
config.cacheFile=f"{cachedir}/EventCorpus.db"
return config
[docs] @classmethod
def getQueryManager(cls,lang='sql',name="queries",debug=False):
'''
get the query manager for the given language and fileName
Args:
lang(str): the language of the queries to extract
name(str): the name of the manager containing the query specifications
debug(bool): if True set debugging on
'''
cachedir=EventStorage.getStorageConfig().getCachePath()
for path in cachedir,os.path.dirname(__file__)+"/../resources":
qYamlFile=f"{path}/{name}.yaml"
if os.path.isfile(qYamlFile):
qm=QueryManager(lang=lang,debug=debug,queriesPath=qYamlFile)
return qm
return None
[docs] @classmethod
def getDBFile(cls,cacheFileName="EventCorpus"):
'''
get the database file for the given cacheFileName
Args:
cacheFileName(str): the name of the cacheFile without suffix
'''
config=cls.getStorageConfig()
cachedir=config.getCachePath()
dbfile=f"{cachedir}/{cacheFileName}.db"
dbfile=os.path.abspath(dbfile)
return dbfile
[docs] @classmethod
def getSqlDB(cls):
'''
get the SQL Database
'''
dbfile=EventStorage.getDBFile()
sqlDB=SQLDB(dbfile)
return sqlDB
[docs] @classmethod
def getTableList(cls,withInstanceCount:bool=True)->list:
'''
get the list of SQL Tables involved
Return:
list: the map of SQL tables used for caching
withInstanceCount(bool): if TRUE add the count of instances to the table Map
'''
sqlDB=EventStorage.getSqlDB()
tableList=sqlDB.getTableList()
for table in tableList:
tableName=table["name"]
if withInstanceCount:
countQuery="SELECT count(*) as count from %s" % tableName
countResult=sqlDB.query(countQuery)
table['instances']=countResult[0]['count']
return tableList
[docs] @classmethod
def getViewTableList(cls,viewName,exclude=None):
sqlDB=EventStorage.getSqlDB()
tableList=sqlDB.getTableList()
viewTableList=[]
for table in tableList:
tableName=table["name"]
if tableName.startswith(f"{viewName}_"):
if exclude is None or tableName not in exclude[viewName]:
viewTableList.append(table)
return viewTableList
[docs] @classmethod
def getCommonViewDDLs(cls,viewNames=["event","eventseries"],exclude=None):
'''
get the SQL DDL for a common view
Return:
str: the SQL DDL CREATE VIEW command
'''
viewDDLs={}
for viewName in viewNames:
viewTableList=cls.getViewTableList(viewName, exclude=exclude)
viewDDL=Schema.getGeneralViewDDL(viewTableList, viewName)
viewDDLs[viewName]=viewDDL
return viewDDLs
[docs] @classmethod
def createViews(cls,exclude=None,show=False):
'''
create the general Event views
Args:
exclude(list): the list of table names to be excluded
show(bool): if True show the DDL
'''
sqlDB=EventStorage.getSqlDB()
viewDDLs=EventStorage.getCommonViewDDLs(exclude=exclude)
for viewName,viewDDL in viewDDLs.items():
sqlDB.c.execute(f"DROP VIEW IF EXISTS {viewName}")
if show:
print(viewDDL)
sqlDB.c.execute(viewDDL)
[docs] @classmethod
def asPlantUml(cls,baseEntity='Event',exclude=None):
'''
return me as a plantUml Diagram markup
'''
schemaManager=None
uml=UML()
now=datetime.now()
nowYMD=now.strftime("%Y-%m-%d")
viewName=f"{baseEntity.lower()}"
tableList=EventStorage.getViewTableList(viewName, exclude=exclude)
for table in tableList:
tableName=table['name']
if 'instances' in table:
instanceNote=""
dataSource=self.getDataSource4TableName(tableName)
if dataSource is not None:
sourceConfig=dataSource.sourceConfig
instanceNote=f"[[{sourceConfig.url} {sourceConfig.title}]]"
instanceCount=table['instances']
instanceNote=f"{instanceNote}\n{instanceCount} instances "
table['notes']=instanceNote
title=f"""ConfIDent {baseEntity}
{nowYMD}
[[https://projects.tib.eu/en/confident/ © 2019-2022 ConfIDent project and Wolfgang Fahl]]
see also [[http://cc.bitplan.com Conference Corpus]]
"""
plantUml=uml.mergeSchema(schemaManager,tableList,title=title,packageName='DataSources',generalizeTo=baseEntity)
return plantUml
[docs] @classmethod
def getSignatureCache(cls,profile:bool=True,force:bool=False):
'''
cache the signature Data in a separate SQLite DB
Args:
profile(bool): if True show profiling information
force(bool): if True force the cache creation
'''
signatureCache=cls.getDBFile("Signature")
profiler=None
if (not os.path.isfile(signatureCache)) or force:
if profile:
msg="Reading events for Signature cache"
profiler=Profiler(msg)
sqlDB=EventStorage.getSqlDB()
events=sqlDB.query("""select * from event""")
if profiler:
profiler.time()
if profile:
msg=f"Storing Signature cache for {len(events)} events"
profiler=Profiler(msg)
signature=SQLDB(signatureCache)
entityInfo=signature.createTable(events, "event")
signature.store(events, entityInfo)
if profiler:
profiler.time()
ddls = [
"DROP INDEX if EXISTS eventsByCountry",
"CREATE INDEX eventsByCounty ON event(country)"]
for ddl in ddls:
signature.execute(ddl)
else:
signature=SQLDB(signatureCache)
return signature
[docs] @classmethod
def createLookup(cls,column:str,tables:list):
'''
create a lookup for a column for the given list of tables
Args:
column(str): the column to create the lookup for
tables(str): the names of the tables to take into account
'''
sqlDB=EventStorage.getSqlDB()
idColumn=f"{column}Wikidataid"
total=0
lookup={}
for table in tables:
totalQuery=f"SELECT count(*) AS total from event_{table}"
totalRows=sqlDB.query(totalQuery)
tableTotal=int(totalRows[0]["total"])
total+=tableTotal
sqlQuery=f"""SELECT COUNT(*) AS count,{column},{idColumn}
FROM event_{table}
GROUP BY {column},{idColumn}
ORDER BY 1 DESC"""
lookupRows=sqlDB.query(sqlQuery)
for lookupRow in lookupRows:
entry=lookupRow[column]
value=lookupRow["count"]
qid=lookupRow[idColumn]
if qid is not None:
if entry in lookup:
d=lookup[entry]
d["count"]+=value
d[table]=value
else:
lookup[entry]={"name":entry,"count":value,"qid":qid,f"{table}":value}
for entry in lookup:
d=lookup[entry]
d["frequency"]=round(d["count"]*100/total,3)
return lookup
[docs]class Event(JSONAble):
'''
base class for Event entities
'''
def __init__(self):
'''
Constructor
'''
super().__init__()
def __str__(self):
'''
return my string representation
Return:
str: the string representation
'''
text=self.__class__.__name__
attrs=["pageTitle","acronym","eventId","title","year","source","url"]
delim=":"
for attr in attrs:
if hasattr(self, attr):
value=getattr(self,attr)
text+=f"{delim}{value}"
delim=":"
return text
[docs] def getLookupAcronym(self):
'''
get the lookup acronym of this event e.g. add year information
Return:
str: the acronym to be used for lookup operations
'''
if hasattr(self,'acronym') and self.acronym is not None:
self.lookupAcronym=self.acronym
else:
if hasattr(self,'event'):
self.lookupAcronym=self.event
if hasattr(self,'lookupAcronym'):
if self.lookupAcronym is not None:
try:
if hasattr(self, 'year') and self.year is not None and not re.search(r'[0-9]{4}',self.lookupAcronym):
self.lookupAcronym="%s %s" % (self.lookupAcronym,str(self.year))
except TypeError as te:
print ('Warning getLookupAcronym failed for year: %s and lookupAcronym %s' % (self.year,self.lookupAcronym))
[docs] def getRecord(self):
'''
get my dict elements that are defined in getSamples
Return:
dict: fields of my __dict__ which are defined in getSamples
'''
fields = None
if hasattr(self, 'getSamples') and callable(getattr(self, 'getSamples')):
fields = LOD.getFields(self.getSamples())
record = {}
recordDict= self.__dict__
for field in fields:
if field in recordDict:
record[field] = recordDict[field]
return record
[docs] def mapFromDict(self,d:dict,maptuples):
'''
set my attributes from the given dict mapping with the given
mapping (key->attr) tuples
Args:
d(dict): the dictionary to map
maptuples(list): the list of tuples for mapping
'''
for key,attr in maptuples:
if key in d:
setattr(self,attr,d[key])
[docs] def asWikiMarkup(self,series:str,templateParamLookup:dict)->str:
'''
Args:
series(str): the name of the series
templateParamLookup(dict): the mapping of python attributes to Mediawiki template parameters to be used
Return:
str: my WikiMarkup
'''
nameValues={}
delim=""
for wikiName,attrName in templateParamLookup.items():
if hasattr(self, attrName):
value=getattr(self,attrName)
nameValues[wikiName]=value
markup=""
nameValues["Series"]=series.upper()
dblpConferenceId=re.sub(r"^https:\/\/dblp.org\/db\/conf\/","",self.url)
dblpConferenceId=dblpConferenceId.replace(".html","")
nameValues["DblpConferenceId"]=dblpConferenceId
for name,value in nameValues.items():
markup=f"{markup}{delim}|{name}={value}"
delim="\n"
markup=f"""{{{{Event
{markup}
}}}}"""
#|Type=Symposium
#|Submission deadline=2019/09/03
#|Homepage=http://ieeevr.org/2020/
#|City=Atlanta
#|Country=USA
#}}
return markup
[docs]class EventSeries(JSONAble):
'''
base class for Event Series entities
'''
def __init__(self):
'''
Constructor
'''
super().__init__()
def __str__(self):
'''
return my
'''
text=self.__class__.__name__
attrs=["pageTitle","acronym","eventSeriesId","title","source","url"]
delim=":"
for attr in attrs:
if hasattr(self, attr):
value=getattr(self,attr)
text+=f"{delim}{value}"
delim=":"
return text
[docs] def asWikiMarkup(self)->str:
'''
convert me to wikimarkup
see https://github.com/WolfgangFahl/ConferenceCorpus/issues/10
'''
#dblpPid=self.DBLP_pid
#if dblpPid:
# dblpPid=dblpPid.replace("conf/","")
# |WikiDataId=
#|Title={self.title}
#|Homepage={self.homepage}
markup=f"""{{{{Event series
|Acronym={self.acronym}
|DblpSeries={self.eventSeriesId}
}}}}"""
#
return markup
[docs]class EventBaseManager(EntityManager):
'''
common entity Manager for ConferenceCorpus
'''
def __init__(self,name,entityName,entityPluralName:str,listName:str=None,clazz=None,sourceConfig:EventDataSourceConfig=None,primaryKey:str=None,config=None,handleInvalidListTypes=False,filterInvalidListTypes=False,debug=False,profile=True):
'''
Constructor
Args:
name(string): name of this eventManager
entityName(string): entityType to be managed e.g. Country
entityPluralName(string): plural of the the entityType e.g. Countries
config(StorageConfig): the configuration to be used if None a default configuration will be used
handleInvalidListTypes(bool): True if invalidListTypes should be converted or filtered
filterInvalidListTypes(bool): True if invalidListTypes should be deleted
debug(boolean): override debug setting when default of config is used via config=None
profile(boolean): True if profiling/timing information should be shown for long-running operations
'''
self.profile=profile
if config is None:
config=EventStorage.getStorageConfig(debug=debug)
self.profile=config.profile
if sourceConfig is not None:
tableName=sourceConfig.getTableName(entityName)
else:
tableName=entityName
super().__init__(name, entityName, entityPluralName, listName, clazz, tableName, primaryKey, config, handleInvalidListTypes, filterInvalidListTypes, listSeparator='⇹',debug=debug)
[docs] def setAllAttr(self,listOfDicts,attr,value):
'''
set all attribute values of the given attr in the given list of Dict to the given value
'''
for record in listOfDicts:
record[attr]=value
[docs] def rateAll(self,ratingManager:RatingManager):
'''
rate all events and series based on the given rating Manager
'''
for entity in self.getList():
if hasattr(entity,"rate") and callable(entity.rate):
if isinstance(entity,Event):
rating=EventRating(entity)
elif isinstance(entity,EventSeries):
rating=EventSeriesRating(entity)
else:
raise Exception(f"rateAll for unknown entity type {type(entity).__name__}")
entity.rate(rating)
ratingManager.ratings.append(rating)
[docs] def fromCsv(self, csvString, separator:str= ',', overwriteEvents:bool = True, updateEntitiesCallback:Callable =None):
"""
Args:
csvString: csvString having all the csv content
separator: the separator of the csv
append: to append to the self object.
updateEntitiesCallback:
Returns: Nothing. The self object is upadated
"""
fields= None
# limit csv fields to the fields defined in the samples
if hasattr(self.clazz, 'getSamples') and callable(getattr(self.clazz, 'getSamples')):
fields = LOD.getFields(self.clazz.getSamples())
eventRecords= CSV.fromCSV(csvString=csvString,fields=None,delimiter=separator)
self.updateFromLod(eventRecords, overwriteEvents=overwriteEvents, updateEntitiesCallback=updateEntitiesCallback)
[docs] def updateFromLod(self, lod:list, overwriteEvents:bool = True, updateEntitiesCallback:Callable=None, restrictToSamples:bool=True):
"""
Updates the entities from the given LoD. If a entity does not already exist a new one will be added.
Args:
lod: data to update the entities
overwriteEvents: If False only missing values are added
updateEntitiesCallback: Callback function that is called on an updated entity
restrictToSamples(bool): If True only properties that are names in the samples are set.
Returns:
"""
originalEventsLookup = self.getLookup(attrName=self.primaryKey)[0]
for eventRecord in lod:
if self.primaryKey in eventRecord:
eventRecordPrimaryKey = eventRecord.get(self.primaryKey)
if eventRecordPrimaryKey in originalEventsLookup:
originalEvent = originalEventsLookup[eventRecordPrimaryKey]
if hasattr(originalEvent, self.primaryKey):
sampleProperties = []
if hasattr(originalEvent, 'getSamples') and callable(originalEvent.getSamples):
sampleProperties = LOD.getFields(originalEvent.getSamples())
for key, value in eventRecord.items():
if hasattr(originalEvent, key):
setattr(originalEvent, key, value)
else:
if restrictToSamples or key in sampleProperties:
setattr(originalEvent, key, value)
else:
pass
if updateEntitiesCallback is not None and callable(updateEntitiesCallback):
updateEntitiesCallback(originalEvent, overwrite=overwriteEvents)
else:
self.fromLoD(lod=[eventRecord], append=True, debug=self.debug)
# new entity was addded → update lookup
originalEventsLookup = self.getLookup(attrName=self.primaryKey)[0]
originalEvent = originalEventsLookup[eventRecordPrimaryKey]
if updateEntitiesCallback is not None and callable(updateEntitiesCallback):
updateEntitiesCallback(originalEvent, overwrite=overwriteEvents)
[docs] def fromCache(self,force:bool=False,getListOfDicts=None,append=False,sampleRecordCount=-1):
'''
overwritten version of fromCache that calls postProcessEntityList
'''
needsUpdate=not self.isCached() or force
super().fromCache(force, getListOfDicts, append, sampleRecordCount)
if needsUpdate:
# TODO
# this is inefficient and uses 2x the memory
# try postProcessing on lod instead
self.postProcessEntityList(debug=self.debug)
self.store()
[docs] def postProcessEntityList(self,debug:bool=False):
'''
postProcess my entities
'''
# override this method
pass
[docs] def asCsv(self, separator:str=',', selectorCallback:Callable=None):
"""
Converts the events to csv format
Args:
separator(str): character separating the row values
selectorCallback: callback functions returning events to be converted to csv. If None all events are converted.
Returns:
csv string of events
"""
events=self.getList()
if selectorCallback is not None and callable(selectorCallback):
events=selectorCallback()
if events and type(events) != list:
events=[events]
fields=None
# limit csv fields to the fields defined in the samples
if hasattr(self.clazz, 'getSamples') and callable(getattr(self.clazz, 'getSamples')):
fields=LOD.getFields(self.clazz.getSamples())
if events:
csvString=CSV.toCSV(events, includeFields=fields, delimiter=separator)
return csvString
return None
[docs] def postProcessLodRecords(self,listOfDicts:list,**kwArgs):
'''
post process the given list of Dicts with raw Events
Args:
listOfDicts(list): the list of raw Events to fix
'''
if hasattr(self.clazz,"postProcessLodRecord") and callable(self.clazz.postProcessLodRecord):
for rawEvent in listOfDicts:
self.clazz.postProcessLodRecord(rawEvent,**kwArgs)
[docs] def getLoDfromEndpoint(self)->list:
'''
get my content from my endpoint
Returns:
list: the list of dicts derived from the given SPARQL query
'''
sparql=SPARQL(self.endpoint)
query=self.getSparqlQuery()
try:
profiler=Profiler(f"SPARQL query to {self.endpoint}",profile=False)
listOfDicts=sparql.queryAsListOfDicts(query)
except Exception as ex:
# handle any Exception - e.g. there might be a syntax error in the query or the
# endpoint might not be able to handle it - the endpoint might not be available
# or there might be a timeout
msg=f"SPARQL query failed\nquery:\n{query}"
profiler.profile=True
profiler.time()
print(msg,file=sys.stderr)
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
exmessage = template.format(type(ex).__name__, ex.args)
print(exmessage,file=sys.stderr)
raise(ex)
self.postProcessLodRecords(listOfDicts)
self.setAllAttr(listOfDicts,"source",self.source)
return listOfDicts
[docs] def getEventByKey(self, keyToSearch, keytype='pageTitle'):
for event in self.getList():
if hasattr(event, keytype):
if getattr(event, keytype) == keyToSearch:
return event
else:
raise ValueError("Invalid keytype given")
[docs]class EventSeriesManager(EventBaseManager):
'''
Event series list
'''
def __init__(self,name:str,sourceConfig:EventDataSourceConfig=None,clazz=None,primaryKey:str=None,config:StorageConfig=None,debug=False):
'''
constructor
'''
super().__init__(name=name,entityName="EventSeries",entityPluralName="EventSeries",primaryKey=primaryKey,listName="series",clazz=clazz,sourceConfig=sourceConfig,handleInvalidListTypes=True,config=config,debug=debug)
[docs]class EventManager(EventBaseManager):
'''
Event entity list
'''
def __init__(self,name:str,sourceConfig:EventDataSourceConfig=None,clazz=None,primaryKey:str=None,config:StorageConfig=None,debug=False):
'''
constructor
'''
super(EventManager, self).__init__(name=name,entityName="Event",entityPluralName="Events",primaryKey=primaryKey,listName="events",clazz=clazz,sourceConfig=sourceConfig,config=config,handleInvalidListTypes=True,debug=debug,profile=config.profile if config else False)
[docs] def linkSeriesAndEvent(self, eventSeriesManager:EventSeriesManager, seriesKey:str="series"):
'''
link Series and Event using the given foreignKey
Args:
seriesKey(str): the key to be use for lookup
eventSeriesManager(EventSeriesManager):
'''
# get foreign key hashtable
self.seriesLookup = LOD.getLookup(self.getList(), seriesKey, withDuplicates=True)
# get "primary" key hashtable
self.seriesAcronymLookup = LOD.getLookup(eventSeriesManager.getList(), "acronym", withDuplicates=True)
for seriesAcronym in self.seriesLookup.keys():
if seriesAcronym in self.seriesAcronymLookup:
seriesEvents = self.seriesLookup[seriesAcronym]
if hasattr(self, 'verbose') and self.verbose:
print(f"{seriesAcronym}:{len(seriesEvents):4d}")
else:
if self.debug:
print(f"Event Series Acronym {seriesAcronym} lookup failed")
if self.debug:
print("%d events/%d eventSeries -> %d linked" % (
len(self.getList()), len(eventSeriesManager.getList()), len(self.seriesLookup)))
[docs] def getEventsInSeries(self,seriesAcronym):
"""
Return all the events in a given series.
"""
if seriesAcronym in self.seriesLookup:
seriesEvents = self.seriesLookup[seriesAcronym]
if self.debug:
print(f"{seriesAcronym}:{len(seriesEvents):4d}")
else:
if self.debug:
print(f"Event Series Acronym {seriesAcronym} lookup failed - Series not known")
return None
return seriesEvents
[docs] @staticmethod
def asWikiSon(eventDicts):
wikison=""
for eventDict in eventDicts:
wikison+=EventManager.eventDictToWikiSon(eventDict)
return wikison
[docs] @staticmethod
def eventDictToWikiSon(eventDict):
wikison="{{Event\n"
for key,value in eventDict.items():
if key not in ['foundBy','source','creation_date','modification_date']:
if value is not None:
wikison+="|%s=%s\n" % (key,value)
wikison+="}}\n"
return wikison