diff options
author | Thanos Apollo <[email protected]> | 2022-08-04 09:50:48 +0300 |
---|---|---|
committer | Thanos Apollo <[email protected]> | 2022-08-04 09:50:48 +0300 |
commit | 4ddb7273098bee179bb77e0937e560fc0100960c (patch) | |
tree | cebc2f9412e45910408a7885ca78d7dedb77de78 /.local/share/Anki2/addons21/Anki_connect/__init__.py | |
parent | e83759ae9d0513024e390810ddcb18ffdd84675e (diff) |
Add anki addons
Diffstat (limited to '.local/share/Anki2/addons21/Anki_connect/__init__.py')
-rw-r--r-- | .local/share/Anki2/addons21/Anki_connect/__init__.py | 1688 |
1 files changed, 1688 insertions, 0 deletions
diff --git a/.local/share/Anki2/addons21/Anki_connect/__init__.py b/.local/share/Anki2/addons21/Anki_connect/__init__.py new file mode 100644 index 0000000..93c92d0 --- /dev/null +++ b/.local/share/Anki2/addons21/Anki_connect/__init__.py @@ -0,0 +1,1688 @@ +# Copyright 2016-2021 Alex Yatskov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import aqt + +anki_version = tuple(int(segment) for segment in aqt.appVersion.split(".")) + +if anki_version < (2, 1, 45): + raise Exception("Minimum Anki version supported: 2.1.45") + +import base64 +import glob +import hashlib +import inspect +import json +import os +import os.path +import platform +import re +import time +import unicodedata + +import anki +import anki.exporting +import anki.storage +from anki.cards import Card +from anki.consts import MODEL_CLOZE +from anki.exporting import AnkiPackageExporter +from anki.importing import AnkiPackageImporter +from anki.notes import Note +from anki.errors import NotFoundError +from aqt.qt import Qt, QTimer, QMessageBox, QCheckBox + +from .web import format_exception_reply, format_success_reply +from .edit import Edit +from . import web, util + + +# +# AnkiConnect +# + +class AnkiConnect: + def __init__(self): + self.log = None + self.timer = None + self.server = web.WebServer(self.handler) + + def initLogging(self): + logPath = util.setting('apiLogPath') + if logPath is not None: + self.log = open(logPath, 'w') + + def startWebServer(self): + try: + self.server.listen() + + # only keep reference to prevent garbage collection + self.timer = QTimer() + self.timer.timeout.connect(self.advance) + self.timer.start(util.setting('apiPollInterval')) + except: + QMessageBox.critical( + self.window(), + 'AnkiConnect', + 'Failed to listen on port {}.\nMake sure it is available and is not in use.'.format(util.setting('webBindPort')) + ) + + def save_model(self, models, ankiModel): + models.update_dict(ankiModel) + + def logEvent(self, name, data): + if self.log is not None: + self.log.write('[{}]\n'.format(name)) + json.dump(data, self.log, indent=4, sort_keys=True) + self.log.write('\n\n') + self.log.flush() + + + def advance(self): + self.server.advance() + + + def handler(self, request): + self.logEvent('request', request) + + name = request.get('action', '') + version = request.get('version', 4) + params = request.get('params', {}) + key = request.get('key') + + try: + if key != util.setting('apiKey') and name != 'requestPermission': + raise Exception('valid api key must be provided') + + method = None + + for methodName, methodInst in inspect.getmembers(self, predicate=inspect.ismethod): + apiVersionLast = 0 + apiNameLast = None + + if getattr(methodInst, 'api', False): + for apiVersion, apiName in getattr(methodInst, 'versions', []): + if apiVersionLast < apiVersion <= version: + apiVersionLast = apiVersion + apiNameLast = apiName + + if apiNameLast is None and apiVersionLast == 0: + apiNameLast = methodName + + if apiNameLast is not None and apiNameLast == name: + method = methodInst + break + + if method is None: + raise Exception('unsupported action') + + api_return_value = methodInst(**params) + reply = format_success_reply(version, api_return_value) + + except Exception as e: + reply = format_exception_reply(version, e) + + self.logEvent('reply', reply) + return reply + + + def window(self): + return aqt.mw + + + def reviewer(self): + reviewer = self.window().reviewer + if reviewer is None: + raise Exception('reviewer is not available') + + return reviewer + + + def collection(self): + collection = self.window().col + if collection is None: + raise Exception('collection is not available') + + return collection + + + def decks(self): + decks = self.collection().decks + if decks is None: + raise Exception('decks are not available') + + return decks + + + def scheduler(self): + scheduler = self.collection().sched + if scheduler is None: + raise Exception('scheduler is not available') + + return scheduler + + + def database(self): + database = self.collection().db + if database is None: + raise Exception('database is not available') + + return database + + + def media(self): + media = self.collection().media + if media is None: + raise Exception('media is not available') + + return media + + + def startEditing(self): + self.window().requireReset() + + + def stopEditing(self): + if self.collection() is not None: + self.window().maybeReset() + + + def createNote(self, note): + collection = self.collection() + + model = collection.models.byName(note['modelName']) + if model is None: + raise Exception('model was not found: {}'.format(note['modelName'])) + + deck = collection.decks.byName(note['deckName']) + if deck is None: + raise Exception('deck was not found: {}'.format(note['deckName'])) + + ankiNote = anki.notes.Note(collection, model) + ankiNote.model()['did'] = deck['id'] + if 'tags' in note: + ankiNote.tags = note['tags'] + + for name, value in note['fields'].items(): + for ankiName in ankiNote.keys(): + if name.lower() == ankiName.lower(): + ankiNote[ankiName] = value + break + + allowDuplicate = False + duplicateScope = None + duplicateScopeDeckName = None + duplicateScopeCheckChildren = False + duplicateScopeCheckAllModels = False + + if 'options' in note: + options = note['options'] + if 'allowDuplicate' in options: + allowDuplicate = options['allowDuplicate'] + if type(allowDuplicate) is not bool: + raise Exception('option parameter "allowDuplicate" must be boolean') + if 'duplicateScope' in options: + duplicateScope = options['duplicateScope'] + if 'duplicateScopeOptions' in options: + duplicateScopeOptions = options['duplicateScopeOptions'] + if 'deckName' in duplicateScopeOptions: + duplicateScopeDeckName = duplicateScopeOptions['deckName'] + if 'checkChildren' in duplicateScopeOptions: + duplicateScopeCheckChildren = duplicateScopeOptions['checkChildren'] + if type(duplicateScopeCheckChildren) is not bool: + raise Exception('option parameter "duplicateScopeOptions.checkChildren" must be boolean') + if 'checkAllModels' in duplicateScopeOptions: + duplicateScopeCheckAllModels = duplicateScopeOptions['checkAllModels'] + if type(duplicateScopeCheckAllModels) is not bool: + raise Exception('option parameter "duplicateScopeOptions.checkAllModels" must be boolean') + + duplicateOrEmpty = self.isNoteDuplicateOrEmptyInScope( + ankiNote, + deck, + collection, + duplicateScope, + duplicateScopeDeckName, + duplicateScopeCheckChildren, + duplicateScopeCheckAllModels + ) + + if duplicateOrEmpty == 1: + raise Exception('cannot create note because it is empty') + elif duplicateOrEmpty == 2: + if allowDuplicate: + return ankiNote + raise Exception('cannot create note because it is a duplicate') + elif duplicateOrEmpty == 0: + return ankiNote + else: + raise Exception('cannot create note for unknown reason') + + + def isNoteDuplicateOrEmptyInScope( + self, + note, + deck, + collection, + duplicateScope, + duplicateScopeDeckName, + duplicateScopeCheckChildren, + duplicateScopeCheckAllModels + ): + # Returns: 1 if first is empty, 2 if first is a duplicate, 0 otherwise. + + # note.dupeOrEmpty returns if a note is a global duplicate with the specific model. + # This is used as the default check, and the rest of this function is manually + # checking if the note is a duplicate with additional options. + if duplicateScope != 'deck' and not duplicateScopeCheckAllModels: + return note.dupeOrEmpty() or 0 + + # Primary field for uniqueness + val = note.fields[0] + if not val.strip(): + return 1 + csum = anki.utils.fieldChecksum(val) + + # Create dictionary of deck ids + dids = None + if duplicateScope == 'deck': + did = deck['id'] + if duplicateScopeDeckName is not None: + deck2 = collection.decks.byName(duplicateScopeDeckName) + if deck2 is None: + # Invalid deck, so cannot be duplicate + return 0 + did = deck2['id'] + + dids = {did: True} + if duplicateScopeCheckChildren: + for kv in collection.decks.children(did): + dids[kv[1]] = True + + # Build query + query = 'select id from notes where csum=?' + queryArgs = [csum] + if note.id: + query += ' and id!=?' + queryArgs.append(note.id) + if not duplicateScopeCheckAllModels: + query += ' and mid=?' + queryArgs.append(note.mid) + + # Search + for noteId in note.col.db.list(query, *queryArgs): + if dids is None: + # Duplicate note exists in the collection + return 2 + # Validate that a card exists in one of the specified decks + for cardDeckId in note.col.db.list('select did from cards where nid=?', noteId): + if cardDeckId in dids: + return 2 + + # Not a duplicate + return 0 + + def getCard(self, card_id: int) -> Card: + try: + return self.collection().getCard(card_id) + except NotFoundError: + raise NotFoundError('Card was not found: {}'.format(card_id)) + + def getNote(self, note_id: int) -> Note: + try: + return self.collection().getNote(note_id) + except NotFoundError: + raise NotFoundError('Note was not found: {}'.format(note_id)) + + def deckStatsToJson(self, due_tree): + deckStats = {'deck_id': due_tree.deck_id, + 'name': due_tree.name, + 'new_count': due_tree.new_count, + 'learn_count': due_tree.learn_count, + 'review_count': due_tree.review_count} + if anki_version > (2, 1, 46): + # total_in_deck is not supported on lower Anki versions + deckStats['total_in_deck'] = due_tree.total_in_deck + return deckStats + + def collectDeckTreeChildren(self, parent_node): + allNodes = {parent_node.deck_id: parent_node} + for child in parent_node.children: + for deckId, childNode in self.collectDeckTreeChildren(child).items(): + allNodes[deckId] = childNode + return allNodes + + # + # Miscellaneous + # + + @util.api() + def version(self): + return util.setting('apiVersion') + + + @util.api() + def requestPermission(self, origin, allowed): + results = { + "permission": "denied", + } + + if allowed: + results = { + "permission": "granted", + "requireApikey": bool(util.setting('apiKey')), + "version": util.setting('apiVersion') + } + + elif origin in util.setting('ignoreOriginList'): + pass # defaults to denied + + else: # prompt the user + msg = QMessageBox(None) + msg.setWindowTitle("A website wants to access to Anki") + msg.setText('"{}" requests permission to use Anki through AnkiConnect. Do you want to give it access?'.format(origin)) + msg.setInformativeText("By granting permission, you'll allow the website to modify your collection on your behalf, including the execution of destructive actions such as deck deletion.") + msg.setWindowIcon(self.window().windowIcon()) + msg.setIcon(QMessageBox.Question) + msg.setStandardButtons(QMessageBox.Yes|QMessageBox.No) + msg.setDefaultButton(QMessageBox.No) + msg.setCheckBox(QCheckBox(text='Ignore further requests from "{}"'.format(origin), parent=msg)) + msg.setWindowFlags(Qt.WindowStaysOnTopHint) + pressedButton = msg.exec_() + + if pressedButton == QMessageBox.Yes: + config = aqt.mw.addonManager.getConfig(__name__) + config["webCorsOriginList"] = util.setting('webCorsOriginList') + config["webCorsOriginList"].append(origin) + aqt.mw.addonManager.writeConfig(__name__, config) + results = { + "permission": "granted", + "requireApikey": bool(util.setting('apiKey')), + "version": util.setting('apiVersion') + } + + # if the origin isn't an empty string, the user clicks "No", and the ignore box is checked + elif origin and pressedButton == QMessageBox.No and msg.checkBox().isChecked(): + config = aqt.mw.addonManager.getConfig(__name__) + config["ignoreOriginList"] = util.setting('ignoreOriginList') + config["ignoreOriginList"].append(origin) + aqt.mw.addonManager.writeConfig(__name__, config) + + # else defaults to denied + + return results + + + @util.api() + def getProfiles(self): + return self.window().pm.profiles() + + + @util.api() + def loadProfile(self, name): + if name not in self.window().pm.profiles(): + return False + + if self.window().isVisible(): + cur_profile = self.window().pm.name + if cur_profile != name: + self.window().unloadProfileAndShowProfileManager() + + def waiter(): + # This function waits until main window is closed + # It's needed cause sync can take quite some time + # And if we call loadProfile until sync is ended things will go wrong + if self.window().isVisible(): + QTimer.singleShot(1000, waiter) + else: + self.loadProfile(name) + + waiter() + else: + self.window().pm.load(name) + self.window().loadProfile() + self.window().profileDiag.closeWithoutQuitting() + + return True + + + @util.api() + def sync(self): + self.window().onSync() + + + @util.api() + def multi(self, actions): + return list(map(self.handler, actions)) + + + @util.api() + def getNumCardsReviewedToday(self): + return self.database().scalar('select count() from revlog where id > ?', (self.scheduler().dayCutoff - 86400) * 1000) + + @util.api() + def getNumCardsReviewedByDay(self): + return self.database().all('select date(id/1000 - ?, "unixepoch", "localtime") as day, count() from revlog group by day order by day desc', + int(time.strftime("%H", time.localtime(self.scheduler().dayCutoff))) * 3600) + + + @util.api() + def getCollectionStatsHTML(self, wholeCollection=True): + stats = self.collection().stats() + stats.wholeCollection = wholeCollection + return stats.report() + + + # + # Decks + # + + @util.api() + def deckNames(self): + return self.decks().allNames() + + + @util.api() + def deckNamesAndIds(self): + decks = {} + for deck in self.deckNames(): + decks[deck] = self.decks().id(deck) + + return decks + + + @util.api() + def getDecks(self, cards): + decks = {} + for card in cards: + did = self.database().scalar('select did from cards where id=?', card) + deck = self.decks().get(did)['name'] + if deck in decks: + decks[deck].append(card) + else: + decks[deck] = [card] + + return decks + + + @util.api() + def createDeck(self, deck): + try: + self.startEditing() + did = self.decks().id(deck) + finally: + self.stopEditing() + + return did + + + @util.api() + def changeDeck(self, cards, deck): + self.startEditing() + + did = self.collection().decks.id(deck) + mod = anki.utils.intTime() + usn = self.collection().usn() + + # normal cards + scids = anki.utils.ids2str(cards) + # remove any cards from filtered deck first + self.collection().sched.remFromDyn(cards) + + # then move into new deck + self.collection().db.execute('update cards set usn=?, mod=?, did=? where id in ' + scids, usn, mod, did) + self.stopEditing() + + + @util.api() + def deleteDecks(self, decks, cardsToo=False): + if not cardsToo: + # since f592672fa952260655881a75a2e3c921b2e23857 (2.1.28) + # (see anki$ git log "-Gassert cardsToo") + # you can't delete decks without deleting cards as well. + # however, since 62c23c6816adf912776b9378c008a52bb50b2e8d (2.1.45) + # passing cardsToo to `rem` (long deprecated) won't raise an error! + # this is dangerous, so let's raise our own exception + raise Exception("Since Anki 2.1.28 it's not possible " + "to delete decks without deleting cards as well") + try: + self.startEditing() + decks = filter(lambda d: d in self.deckNames(), decks) + for deck in decks: + did = self.decks().id(deck) + self.decks().rem(did, cardsToo=cardsToo) + finally: + self.stopEditing() + + + @util.api() + def getDeckConfig(self, deck): + if deck not in self.deckNames(): + return False + + collection = self.collection() + did = collection.decks.id(deck) + return collection.decks.confForDid(did) + + + @util.api() + def saveDeckConfig(self, config): + collection = self.collection() + + config['id'] = str(config['id']) + config['mod'] = anki.utils.intTime() + config['usn'] = collection.usn() + if int(config['id']) not in [c['id'] for c in collection.decks.all_config()]: + return False + try: + collection.decks.save(config) + collection.decks.updateConf(config) + except: + return False + return True + + + @util.api() + def setDeckConfigId(self, decks, configId): + configId = int(configId) + for deck in decks: + if not deck in self.deckNames(): + return False + + collection = self.collection() + + for deck in decks: + try: + did = str(collection.decks.id(deck)) + deck_dict = aqt.mw.col.decks.decks[did] + deck_dict['conf'] = configId + collection.decks.save(deck_dict) + except: + return False + + return True + + + @util.api() + def cloneDeckConfigId(self, name, cloneFrom='1'): + configId = int(cloneFrom) + collection = self.collection() + if configId not in [c['id'] for c in collection.decks.all_config()]: + return False + + config = collection.decks.getConf(configId) + return collection.decks.confId(name, config) + + + @util.api() + def removeDeckConfigId(self, configId): + collection = self.collection() + if int(configId) not in [c['id'] for c in collection.decks.all_config()]: + return False + + collection.decks.remConf(configId) + return True + + @util.api() + def getDeckStats(self, decks): + collection = self.collection() + scheduler = self.scheduler() + responseDict = {} + deckIds = list(map(lambda d: collection.decks.id(d), decks)) + + allDeckNodes = self.collectDeckTreeChildren(scheduler.deck_due_tree()) + for deckId, deckNode in allDeckNodes.items(): + if deckId in deckIds: + responseDict[deckId] = self.deckStatsToJson(deckNode) + return responseDict + + @util.api() + def storeMediaFile(self, filename, data=None, path=None, url=None, skipHash=None, deleteExisting=True): + if not (data or path or url): + raise Exception('You must provide a "data", "path", or "url" field.') + if data: + mediaData = base64.b64decode(data) + elif path: + with open(path, 'rb') as f: + mediaData = f.read() + elif url: + mediaData = util.download(url) + + if skipHash is None: + skip = False + else: + m = hashlib.md5() + m.update(mediaData) + skip = skipHash == m.hexdigest() + + if skip: + return None + if deleteExisting: + self.deleteMediaFile(filename) + return self.media().writeData(filename, mediaData) + + + @util.api() + def retrieveMediaFile(self, filename): + filename = os.path.basename(filename) + filename = unicodedata.normalize('NFC', filename) + filename = self.media().stripIllegal(filename) + + path = os.path.join(self.media().dir(), filename) + if os.path.exists(path): + with open(path, 'rb') as file: + return base64.b64encode(file.read()).decode('ascii') + + return False + + + @util.api() + def getMediaFilesNames(self, pattern='*'): + path = os.path.join(self.media().dir(), pattern) + return [os.path.basename(p) for p in glob.glob(path)] + + + @util.api() + def deleteMediaFile(self, filename): + try: + self.media().syncDelete(filename) + except AttributeError: + self.media().trash_files([filename]) + + + @util.api() + def addNote(self, note): + ankiNote = self.createNote(note) + + self.addMediaFromNote(ankiNote, note) + + collection = self.collection() + self.startEditing() + nCardsAdded = collection.addNote(ankiNote) + if nCardsAdded < 1: + raise Exception('The field values you have provided would make an empty question on all cards.') + collection.autosave() + self.stopEditing() + + return ankiNote.id + + + def addMediaFromNote(self, ankiNote, note): + audioObjectOrList = note.get('audio') + self.addMedia(ankiNote, audioObjectOrList, util.MediaType.Audio) + + videoObjectOrList = note.get('video') + self.addMedia(ankiNote, videoObjectOrList, util.MediaType.Video) + + pictureObjectOrList = note.get('picture') + self.addMedia(ankiNote, pictureObjectOrList, util.MediaType.Picture) + + + + def addMedia(self, ankiNote, mediaObjectOrList, mediaType): + if mediaObjectOrList is None: + return + + if isinstance(mediaObjectOrList, list): + mediaList = mediaObjectOrList + else: + mediaList = [mediaObjectOrList] + + for media in mediaList: + if media is not None and len(media['fields']) > 0: + try: + mediaFilename = self.storeMediaFile(media['filename'], + data=media.get('data'), + path=media.get('path'), + url=media.get('url'), + skipHash=media.get('skipHash'), + deleteExisting=media.get('deleteExisting')) + + if mediaFilename is not None: + for field in media['fields']: + if field in ankiNote: + if mediaType is util.MediaType.Picture: + ankiNote[field] += u'<img src="{}">'.format(mediaFilename) + elif mediaType is util.MediaType.Audio or mediaType is util.MediaType.Video: + ankiNote[field] += u'[sound:{}]'.format(mediaFilename) + + except Exception as e: + errorMessage = str(e).replace('&', '&').replace('<', '<').replace('>', '>') + for field in media['fields']: + if field in ankiNote: + ankiNote[field] += errorMessage + + + @util.api() + def canAddNote(self, note): + try: + return bool(self.createNote(note)) + except: + return False + + + @util.api() + def updateNoteFields(self, note): + ankiNote = self.getNote(note['id']) + + self.startEditing() + for name, value in note['fields'].items(): + if name in ankiNote: + ankiNote[name] = value + + audioObjectOrList = note.get('audio') + self.addMedia(ankiNote, audioObjectOrList, util.MediaType.Audio) + + videoObjectOrList = note.get('video') + self.addMedia(ankiNote, videoObjectOrList, util.MediaType.Video) + + pictureObjectOrList = note.get('picture') + self.addMedia(ankiNote, pictureObjectOrList, util.MediaType.Picture) + + ankiNote.flush() + + self.collection().autosave() + self.stopEditing() + + + @util.api() + def addTags(self, notes, tags, add=True): + self.startEditing() + self.collection().tags.bulkAdd(notes, tags, add) + self.stopEditing() + + + @util.api() + def removeTags(self, notes, tags): + return self.addTags(notes, tags, False) + + + @util.api() + def getTags(self): + return self.collection().tags.all() + + + @util.api() + def clearUnusedTags(self): + self.collection().tags.registerNotes() + + + @util.api() + def replaceTags(self, notes, tag_to_replace, replace_with_tag): + self.window().progress.start() + + for nid in notes: + try: + note = self.getNote(nid) + except NotFoundError: + continue + + if note.hasTag(tag_to_replace): + note.delTag(tag_to_replace) + note.addTag(replace_with_tag) + note.flush() + + self.window().requireReset() + self.window().progress.finish() + self.window().reset() + + + @util.api() + def replaceTagsInAllNotes(self, tag_to_replace, replace_with_tag): + self.window().progress.start() + + collection = self.collection() + for nid in collection.db.list('select id from notes'): + note = self.getNote(nid) + if note.hasTag(tag_to_replace): + note.delTag(tag_to_replace) + note.addTag(replace_with_tag) + note.flush() + + self.window().requireReset() + self.window().progress.finish() + self.window().reset() + + + @util.api() + def setEaseFactors(self, cards, easeFactors): + couldSetEaseFactors = [] + for i, card in enumerate(cards): + try: + ankiCard = self.getCard(card) + except NotFoundError: + couldSetEaseFactors.append(False) + continue + + couldSetEaseFactors.append(True) + ankiCard.factor = easeFactors[i] + ankiCard.flush() + + return couldSetEaseFactors + + @util.api() + def setSpecificValueOfCard(self, card, keys, + newValues, warning_check=False): + if isinstance(card, list): + print("card has to be int, not list") + return False + + if not isinstance(keys, list) or not isinstance(newValues, list): + print("keys and newValues have to be lists.") + return False + + if len(newValues) != len(keys): + print("Invalid list lengths.") + return False + + for key in keys: + if key in ["did", "id", "ivl", "lapses", "left", "mod", "nid", + "odid", "odue", "ord", "queue", "reps", "type", "usn"]: + if warning_check is False: + return False + + result = [] + try: + ankiCard = self.getCard(card) + for i, key in enumerate(keys): + setattr(ankiCard, key, newValues[i]) + ankiCard.flush() + result.append(True) + except Exception as e: + result.append([False, str(e)]) + return result + + + @util.api() + def getEaseFactors(self, cards): + easeFactors = [] + for card in cards: + try: + ankiCard = self.getCard(card) + except NotFoundError: + easeFactors.append(None) + continue + + easeFactors.append(ankiCard.factor) + + return easeFactors + + + @util.api() + def suspend(self, cards, suspend=True): + for card in cards: + if self.suspended(card) == suspend: + cards.remove(card) + + if len(cards) == 0: + return False + + scheduler = self.scheduler() + self.startEditing() + if suspend: + scheduler.suspendCards(cards) + else: + scheduler.unsuspendCards(cards) + self.stopEditing() + + return True + + + @util.api() + def unsuspend(self, cards): + self.suspend(cards, False) + + + @util.api() + def suspended(self, card): + card = self.getCard(card) + return card.queue == -1 + + + @util.api() + def areSuspended(self, cards): + suspended = [] + for card in cards: + try: + suspended.append(self.suspended(card)) + except NotFoundError: + suspended.append(None) + + return suspended + + + @util.api() + def areDue(self, cards): + due = [] + for card in cards: + if self.findCards('cid:{} is:new'.format(card)): + due.append(True) + else: + date, ivl = self.collection().db.all('select id/1000.0, ivl from revlog where cid = ?', card)[-1] + if ivl >= -1200: + due.append(bool(self.findCards('cid:{} is:due'.format(card)))) + else: + due.append(date - ivl <= time.time()) + + return due + + + @util.api() + def getIntervals(self, cards, complete=False): + intervals = [] + for card in cards: + if self.findCards('cid:{} is:new'.format(card)): + intervals.append(0) + else: + interval = self.collection().db.list('select ivl from revlog where cid = ?', card) + if not complete: + interval = interval[-1] + intervals.append(interval) + + return intervals + + + + @util.api() + def modelNames(self): + return self.collection().models.allNames() + + + @util.api() + def createModel(self, modelName, inOrderFields, cardTemplates, css = None, isCloze = False): + # https://github.com/dae/anki/blob/b06b70f7214fb1f2ce33ba06d2b095384b81f874/anki/stdmodels.py + if len(inOrderFields) == 0: + raise Exception('Must provide at least one field for inOrderFields') + if len(cardTemplates) == 0: + raise Exception('Must provide at least one card for cardTemplates') + if modelName in self.collection().models.allNames(): + raise Exception('Model name already exists') + + collection = self.collection() + mm = collection.models + + # Generate new Note + m = mm.new(modelName) + if isCloze: + m['type'] = MODEL_CLOZE + + # Create fields and add them to Note + for field in inOrderFields: + fm = mm.newField(field) + mm.addField(m, fm) + + # Add shared css to model if exists. Use default otherwise + if (css is not None): + m['css'] = css + + # Generate new card template(s) + cardCount = 1 + for card in cardTemplates: + cardName = 'Card ' + str(cardCount) + if 'Name' in card: + cardName = card['Name'] + + t = mm.newTemplate(cardName) + cardCount += 1 + t['qfmt'] = card['Front'] + t['afmt'] = card['Back'] + mm.addTemplate(m, t) + + mm.add(m) + return m + + + @util.api() + def modelNamesAndIds(self): + models = {} + for model in self.modelNames(): + models[model] = int(self.collection().models.byName(model)['id']) + + return models + + + @util.api() + def modelNameFromId(self, modelId): + model = self.collection().models.get(modelId) + if model is None: + raise Exception('model was not found: {}'.format(modelId)) + else: + return model['name'] + + + @util.api() + def modelFieldNames(self, modelName): + model = self.collection().models.byName(modelName) + if model is None: + raise Exception('model was not found: {}'.format(modelName)) + else: + return [field['name'] for field in model['flds']] + + + @util.api() + def modelFieldsOnTemplates(self, modelName): + model = self.collection().models.byName(modelName) + if model is None: + raise Exception('model was not found: {}'.format(modelName)) + + templates = {} + for template in model['tmpls']: + fields = [] + for side in ['qfmt', 'afmt']: + fieldsForSide = [] + + # based on _fieldsOnTemplate from aqt/clayout.py + matches = re.findall('{{[^#/}]+?}}', template[side]) + for match in matches: + # remove braces and modifiers + match = re.sub(r'[{}]', '', match) + match = match.split(':')[-1] + + # for the answer side, ignore fields present on the question side + the FrontSide field + if match == 'FrontSide' or side == 'afmt' and match in fields[0]: + continue + fieldsForSide.append(match) + + fields.append(fieldsForSide) + + templates[template['name']] = fields + + return templates + + + @util.api() + def modelTemplates(self, modelName): + model = self.collection().models.byName(modelName) + if model is None: + raise Exception('model was not found: {}'.format(modelName)) + + templates = {} + for template in model['tmpls']: + templates[template['name']] = {'Front': template['qfmt'], 'Back': template['afmt']} + + return templates + + + @util.api() + def modelStyling(self, modelName): + model = self.collection().models.byName(modelName) + if model is None: + raise Exception('model was not found: {}'.format(modelName)) + + return {'css': model['css']} + + + @util.api() + def updateModelTemplates(self, model): + models = self.collection().models + ankiModel = models.byName(model['name']) + if ankiModel is None: + raise Exception('model was not found: {}'.format(model['name'])) + + templates = model['templates'] + for ankiTemplate in ankiModel['tmpls']: + template = templates.get(ankiTemplate['name']) + if template: + qfmt = template.get('Front') + if qfmt: + ankiTemplate['qfmt'] = qfmt + + afmt = template.get('Back') + if afmt: + ankiTemplate['afmt'] = afmt + + self.save_model(models, ankiModel) + + + @util.api() + def updateModelStyling(self, model): + models = self.collection().models + ankiModel = models.byName(model['name']) + if ankiModel is None: + raise Exception('model was not found: {}'.format(model['name'])) + + ankiModel['css'] = model['css'] + + self.save_model(models, ankiModel) + + + @util.api() + def findAndReplaceInModels(self, modelName, findText, replaceText, front=True, back=True, css=True): + if not modelName: + ankiModel = self.collection().models.allNames() + else: + model = self.collection().models.byName(modelName) + if model is None: + raise Exception('model was not found: {}'.format(modelName)) + ankiModel = [modelName] + updatedModels = 0 + for model in ankiModel: + model = self.collection().models.byName(model) + checkForText = False + if css and findText in model['css']: + checkForText = True + model['css'] = model['css'].replace(findText, replaceText) + for tmpls in model.get('tmpls'): + if front and findText in tmpls['qfmt']: + checkForText = True + tmpls['qfmt'] = tmpls['qfmt'].replace(findText, replaceText) + if back and findText in tmpls['afmt']: + checkForText = True + tmpls['afmt'] = tmpls['afmt'].replace(findText, replaceText) + self.save_model(self.collection().models, model) + if checkForText: + updatedModels += 1 + return updatedModels + + + @util.api() + def deckNameFromId(self, deckId): + deck = self.collection().decks.get(deckId) + if deck is None: + raise Exception('deck was not found: {}'.format(deckId)) + + return deck['name'] + + + @util.api() + def findNotes(self, query=None): + if query is None: + return [] + + return list(map(int, self.collection().findNotes(query))) + + + @util.api() + def findCards(self, query=None): + if query is None: + return [] + + return list(map(int, self.collection().findCards(query))) + + + @util.api() + def cardsInfo(self, cards): + result = [] + for cid in cards: + try: + card = self.getCard(cid) + model = card.model() + note = card.note() + fields = {} + for info in model['flds']: + order = info['ord'] + name = info['name'] + fields[name] = {'value': note.fields[order], 'order': order} + + result.append({ + 'cardId': card.id, + 'fields': fields, + 'fieldOrder': card.ord, + 'question': util.cardQuestion(card), + 'answer': util.cardAnswer(card), + 'modelName': model['name'], + 'ord': card.ord, + 'deckName': self.deckNameFromId(card.did), + 'css': model['css'], + 'factor': card.factor, + #This factor is 10 times the ease percentage, + # so an ease of 310% would be reported as 3100 + 'interval': card.ivl, + 'note': card.nid, + 'type': card.type, + 'queue': card.queue, + 'due': card.due, + 'reps': card.reps, + 'lapses': card.lapses, + 'left': card.left, + 'mod': card.mod, + }) + except NotFoundError: + # Anki will give a NotFoundError if the card ID does not exist. + # Best behavior is probably to add an 'empty card' to the + # returned result, so that the items of the input and return + # lists correspond. + result.append({}) + + return result + + @util.api() + def cardsModTime(self, cards): + result = [] + for cid in cards: + try: + card = self.getCard(cid) + result.append({ + 'cardId': card.id, + 'mod': card.mod, + }) + except NotFoundError: + # Anki will give a NotFoundError if the card ID does not exist. + # Best behavior is probably to add an 'empty card' to the + # returned result, so that the items of the input and return + # lists correspond. + result.append({}) + return result + + + @util.api() + def forgetCards(self, cards): + self.startEditing() + scids = anki.utils.ids2str(cards) + self.collection().db.execute('update cards set type=0, queue=0, left=0, ivl=0, due=0, odue=0, factor=0 where id in ' + scids) + self.stopEditing() + + + @util.api() + def relearnCards(self, cards): + self.startEditing() + scids = anki.utils.ids2str(cards) + self.collection().db.execute('update cards set type=3, queue=1 where id in ' + scids) + self.stopEditing() + + + @util.api() + def cardReviews(self, deck, startID): + return self.database().all( + 'select id, cid, usn, ease, ivl, lastIvl, factor, time, type from revlog ''where id>? and cid in (select id from cards where did=?)', + startID, + self.decks().id(deck) + ) + + + @util.api() + def reloadCollection(self): + self.collection().reset() + + + @util.api() + def getLatestReviewID(self, deck): + return self.database().scalar( + 'select max(id) from revlog where cid in (select id from cards where did=?)', + self.decks().id(deck) + ) or 0 + + + @util.api() + def insertReviews(self, reviews): + if len(reviews) > 0: + sql = 'insert into revlog(id,cid,usn,ease,ivl,lastIvl,factor,time,type) values ' + for row in reviews: + sql += '(%s),' % ','.join(map(str, row)) + sql = sql[:-1] + self.database().execute(sql) + + + @util.api() + def notesInfo(self, notes): + result = [] + for nid in notes: + try: + note = self.getNote(nid) + model = note.model() + + fields = {} + for info in model['flds']: + order = info['ord'] + name = info['name'] + fields[name] = {'value': note.fields[order], 'order': order} + + result.append({ + 'noteId': note.id, + 'tags' : note.tags, + 'fields': fields, + 'modelName': model['name'], + 'cards': self.collection().db.list('select id from cards where nid = ? order by ord', note.id) + }) + except NotFoundError: + # Anki will give a NotFoundError if the note ID does not exist. + # Best behavior is probably to add an 'empty card' to the + # returned result, so that the items of the input and return + # lists correspond. + result.append({}) + + return result + + + @util.api() + def deleteNotes(self, notes): + try: + self.collection().remNotes(notes) + finally: + self.stopEditing() + + + @util.api() + def removeEmptyNotes(self): + for model in self.collection().models.all(): + if self.collection().models.useCount(model) == 0: + self.collection().models.rem(model) + self.window().requireReset() + + + @util.api() + def cardsToNotes(self, cards): + return self.collection().db.list('select distinct nid from cards where id in ' + anki.utils.ids2str(cards)) + + + @util.api() + def guiBrowse(self, query=None): + browser = aqt.dialogs.open('Browser', self.window()) + browser.activateWindow() + + if query is not None: + browser.form.searchEdit.lineEdit().setText(query) + if hasattr(browser, 'onSearch'): + browser.onSearch() + else: + browser.onSearchActivated() + + return self.findCards(query) + + + @util.api() + def guiEditNote(self, note): + Edit.open_dialog_and_show_note_with_id(note) + + + @util.api() + def guiSelectedNotes(self): + (creator, instance) = aqt.dialogs._dialogs['Browser'] + if instance is None: + return [] + return instance.selectedNotes() + + @util.api() + def guiAddCards(self, note=None): + if note is not None: + collection = self.collection() + + deck = collection.decks.byName(note['deckName']) + if deck is None: + raise Exception('deck was not found: {}'.format(note['deckName'])) + + collection.decks.select(deck['id']) + savedMid = deck.pop('mid', None) + + model = collection.models.byName(note['modelName']) + if model is None: + raise Exception('model was not found: {}'.format(note['modelName'])) + + collection.models.setCurrent(model) + collection.models.update(model) + + ankiNote = anki.notes.Note(collection, model) + + # fill out card beforehand, so we can be sure of the note id + if 'fields' in note: + for name, value in note['fields'].items(): + if name in ankiNote: + ankiNote[name] = value + + self.addMediaFromNote(ankiNote, note) + + if 'tags' in note: + ankiNote.tags = note['tags'] + + def openNewWindow(): + nonlocal ankiNote + + addCards = aqt.dialogs.open('AddCards', self.window()) + + if savedMid: + deck['mid'] = savedMid + + addCards.editor.set_note(ankiNote) + + addCards.activateWindow() + + aqt.dialogs.open('AddCards', self.window()) + addCards.setAndFocusNote(addCards.editor.note) + + currentWindow = aqt.dialogs._dialogs['AddCards'][1] + + if currentWindow is not None: + currentWindow.closeWithCallback(openNewWindow) + else: + openNewWindow() + + return ankiNote.id + + else: + addCards = aqt.dialogs.open('AddCards', self.window()) + addCards.activateWindow() + + return addCards.editor.note.id + + + @util.api() + def guiReviewActive(self): + return self.reviewer().card is not None and self.window().state == 'review' + + + @util.api() + def guiCurrentCard(self): + if not self.guiReviewActive(): + raise Exception('Gui review is not currently active.') + + reviewer = self.reviewer() + card = reviewer.card + model = card.model() + note = card.note() + + fields = {} + for info in model['flds']: + order = info['ord'] + name = info['name'] + fields[name] = {'value': note.fields[order], 'order': order} + + buttonList = reviewer._answerButtonList() + return { + 'cardId': card.id, + 'fields': fields, + 'fieldOrder': card.ord, + 'question': util.cardQuestion(card), + 'answer': util.cardAnswer(card), + 'buttons': [b[0] for b in buttonList], + 'nextReviews': [reviewer.mw.col.sched.nextIvlStr(reviewer.card, b[0], True) for b in buttonList], + 'modelName': model['name'], + 'deckName': self.deckNameFromId(card.did), + 'css': model['css'], + 'template': card.template()['name'] + } + + + @util.api() + def guiStartCardTimer(self): + if not self.guiReviewActive(): + return False + + card = self.reviewer().card + if card is not None: + card.startTimer() + return True + + return False + + + @util.api() + def guiShowQuestion(self): + if self.guiReviewActive(): + self.reviewer()._showQuestion() + return True + + return False + + + @util.api() + def guiShowAnswer(self): + if self.guiReviewActive(): + self.window().reviewer._showAnswer() + return True + + return False + + + @util.api() + def guiAnswerCard(self, ease): + if not self.guiReviewActive(): + return False + + reviewer = self.reviewer() + if reviewer.state != 'answer': + return False + if ease <= 0 or ease > self.scheduler().answerButtons(reviewer.card): + return False + + reviewer._answerCard(ease) + return True + + + @util.api() + def guiDeckOverview(self, name): + collection = self.collection() + if collection is not None: + deck = collection.decks.byName(name) + if deck is not None: + collection.decks.select(deck['id']) + self.window().onOverview() + return True + + return False + + + @util.api() + def guiDeckBrowser(self): + self.window().moveToState('deckBrowser') + + + @util.api() + def guiDeckReview(self, name): + if self.guiDeckOverview(name): + self.window().moveToState('review') + return True + + return False + + + @util.api() + def guiExitAnki(self): + timer = QTimer() + timer.timeout.connect(self.window().close) + timer.start(1000) # 1s should be enough to allow the response to be sent. + + + @util.api() + def guiCheckDatabase(self): + self.window().onCheckDB() + return True + + + @util.api() + def addNotes(self, notes): + results = [] + for note in notes: + try: + results.append(self.addNote(note)) + except: + results.append(None) + + return results + + + @util.api() + def canAddNotes(self, notes): + results = [] + for note in notes: + results.append(self.canAddNote(note)) + + return results + + + @util.api() + def exportPackage(self, deck, path, includeSched=False): + collection = self.collection() + if collection is not None: + deck = collection.decks.byName(deck) + if deck is not None: + exporter = AnkiPackageExporter(collection) + exporter.did = deck['id'] + exporter.includeSched = includeSched + exporter.exportInto(path) + return True + + return False + + + @util.api() + def importPackage(self, path): + collection = self.collection() + if collection is not None: + try: + self.startEditing() + importer = AnkiPackageImporter(collection, path) + importer.run() + except: + self.stopEditing() + raise + else: + self.stopEditing() + return True + + return False + + + @util.api() + def apiReflect(self, scopes=None, actions=None): + if not isinstance(scopes, list): + raise Exception('scopes has invalid value') + if not (actions is None or isinstance(actions, list)): + raise Exception('actions has invalid value') + + cls = type(self) + scopes2 = [] + result = {'scopes': scopes2} + + if 'actions' in scopes: + if actions is None: + actions = dir(cls) + + methodNames = [] + for methodName in actions: + if not isinstance(methodName, str): + pass + method = getattr(cls, methodName, None) + if method is not None and getattr(method, 'api', False): + methodNames.append(methodName) + + scopes2.append('actions') + result['actions'] = methodNames + + return result + + +# +# Entry +# + +# when run inside Anki, `__name__` would be either numeric, +# or, if installed via `link.sh`, `AnkiConnectDev` +if __name__ != "plugin": + if platform.system() == "Windows" and anki_version == (2, 1, 50): + util.patch_anki_2_1_50_having_null_stdout_on_windows() + + Edit.register_with_anki() + + ac = AnkiConnect() + ac.initLogging() + ac.startWebServer() |