'''
Created on 2021-07-29
@author: wf
'''
import unittest
from unittest import TestCase
from corpus.eventcorpus import EventDataSource,EventCorpus
from corpus.datasources.dblp import DblpEventManager
import warnings
from lodstorage.lod import LOD
from geograpy.utils import Profiler
import getpass
import os
import sys
import argparse
from pydevd_file_utils import setup_client_server_paths
from corpus.lookup import CorpusLookup
[docs]class DataSourceTest(TestCase):
'''
test for EventDataSources
'''
[docs] @classmethod
def main(cls):
# python, unittest: is there a way to pass command line options to the app
# https://stackoverflow.com/a/8660290/1497139
description="EventCorpus DataSource Test"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-d", "--debug", dest="debug", action="store_true", help="set debug [default: %(default)s]")
parser.add_argument("--timeLimitPerTest", type=float, help="set the timeLimitPerTest [default: %(default)s]")
parser.add_argument('--debugServer',
help="remote debug Server")
parser.add_argument('--debugPort',type=int,
help="remote debug Port",default=5678)
parser.add_argument('--debugRemotePath',help="remote debug Server path mapping - remotePath")
parser.add_argument('--debugLocalPath',help="remote debug Server path mapping - localPath")
parser.add_argument('unittest_args', nargs='*')
args = parser.parse_args()
cls.args=args
cls.optionalDebug(args)
# Now set the sys.argv to the unittest_args (leaving sys.argv[0] alone)
sys.argv[1:] = args.unittest_args
unittest.main()
[docs] @classmethod
def optionalDebug(cls,args):
'''
start the remote debugger if the arguments specify so
Args:
args(): The command line arguments
'''
if args.debugServer:
import pydevd
print (f"remotePath: {args.debugRemotePath} localPath:{args.debugLocalPath}",flush=True)
if args.debugRemotePath and args.debugLocalPath:
MY_PATHS_FROM_ECLIPSE_TO_PYTHON = [
(args.debugRemotePath, args.debugLocalPath),
]
setup_client_server_paths(MY_PATHS_FROM_ECLIPSE_TO_PYTHON)
#os.environ["PATHS_FROM_ECLIPSE_TO_PYTHON"]='[["%s", "%s"]]' % (remotePath,localPath)
#print("trying to debug with PATHS_FROM_ECLIPSE_TO_PYTHON=%s" % os.environ["PATHS_FROM_ECLIPSE_TO_PYTHON"]);
pydevd.settrace(args.debugServer, port=args.debugPort,stdoutToServer=True, stderrToServer=True)
print(f"command line args are: {str(sys.argv)}")
[docs] def setUp(self,debug=False,profile=True,timeLimitPerTest=10.0):
'''
setUp test environment
'''
TestCase.setUp(self)
self.debug=debug
# are there command line arguments?
self.timeLimitPerTest=timeLimitPerTest
if hasattr(DataSourceTest,"args"):
self.debug=DataSourceTest.args.debug
if DataSourceTest.args.timeLimitPerTest is not None:
self.timeLimitPerTest=DataSourceTest.args.timeLimitPerTest
msg=(f"test {self._testMethodName} ... with debug={self.debug}")
# make sure there is an EventCorpus.db to speed up tests
EventCorpus.download()
DblpEventManager.cacheOnly=True
self.profiler=Profiler(msg=msg,profile=profile)
self.forceUpdate=False
# make sure unclosed socket warnings are not shown
warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning)
#self.longMessage=True
[docs] def tearDown(self):
self.profiler.time()
pass
[docs] def inCI(self):
'''
are we running in a Continuous Integration Environment?
'''
publicCI=getpass.getuser() in ["travis", "runner"]
jenkins= "JENKINS_HOME" in os.environ
return publicCI or jenkins
[docs] def checkDataSource(self,eventDataSource:EventDataSource, expectedSeries:int,expectedEvents:int,eventSample:str=None):
'''
check the given DataSource
Args:
eventDataSource(EventDataSource): the event data source to check
'''
esm=eventDataSource.eventSeriesManager
esm.configure()
esm.fromCache(force=self.forceUpdate)
esl=esm.getList()
if self.debug:
print(f"Found {len(esl)} {eventDataSource.name} scientific event Series")
if not esm.isCached() or self.forceUpdate:
esm.store()
em=eventDataSource.eventManager
em.configure()
em.fromCache(force=self.forceUpdate)
el=em.getList()
if self.debug:
print(f"Found {len(el)} {eventDataSource.name} scientific events")
if not em.isCached():
em.store()
msg=f"found {len(esl)} event series"
self.assertTrue(len(esl)>=expectedSeries,msg)
msg=f"found {len(el)} events"
self.assertTrue(len(el)>=expectedEvents,msg)
eventsByAcronym,_dup=LOD.getLookup(el, "acronym")
if eventSample is not None:
event=eventsByAcronym[eventSample]
print (f"Sample event for {eventDataSource.name}: {len(el)} events {len(esl)} eventseries")
print (event.toJSON())
return esl,el
[docs] @staticmethod
def getEventSeries(seriesAcronym:str):
"""
Returns the event series as dict of lod (records are categorized into the different data sources)
Args:
seriesAcronym: acronym of the series
Returns:
dict of lod
"""
lookup=CorpusLookup()
multiQuery = "select * from {event}"
variable = lookup.getMultiQueryVariable(multiQuery)
idQuery = f"""select source,eventId from event where acronym like "{seriesAcronym.replace('"','')}%" order by year desc"""
dictOfLod = lookup.getDictOfLod4MultiQuery(multiQuery, idQuery)
return dictOfLod