summaryrefslogtreecommitdiff
path: root/.local/share/Anki2/addons21/Anki_connect
diff options
context:
space:
mode:
authorThanosApollo <[email protected]>2022-09-01 07:41:39 +0300
committerThanosApollo <[email protected]>2022-09-01 07:41:39 +0300
commit0c89df9be987008e55c1b336489c41290653434b (patch)
tree73425d852d363acdd6c9297a97616b23f9311bd4 /.local/share/Anki2/addons21/Anki_connect
parenta6d1ec012952c5e955676af22732cd177d1f3abf (diff)
Remove anki addons
Diffstat (limited to '.local/share/Anki2/addons21/Anki_connect')
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/__init__.py1688
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/__pycache__/__init__.cpython-39.opt-2.pycbin41249 -> 0 bytes
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/__pycache__/edit.cpython-39.opt-2.pycbin16181 -> 0 bytes
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/__pycache__/util.cpython-39.opt-2.pycbin2261 -> 0 bytes
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/__pycache__/web.cpython-39.opt-2.pycbin6969 -> 0 bytes
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/config.json8
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/config.md1
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/edit.py458
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/meta.json1
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/util.py93
-rw-r--r--.local/share/Anki2/addons21/Anki_connect/web.py301
11 files changed, 0 insertions, 2550 deletions
diff --git a/.local/share/Anki2/addons21/Anki_connect/__init__.py b/.local/share/Anki2/addons21/Anki_connect/__init__.py
deleted file mode 100644
index 93c92d0..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/__init__.py
+++ /dev/null
@@ -1,1688 +0,0 @@
-# Copyright 2016-2021 Alex Yatskov
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import aqt
-
-anki_version = tuple(int(segment) for segment in aqt.appVersion.split("."))
-
-if anki_version < (2, 1, 45):
- raise Exception("Minimum Anki version supported: 2.1.45")
-
-import base64
-import glob
-import hashlib
-import inspect
-import json
-import os
-import os.path
-import platform
-import re
-import time
-import unicodedata
-
-import anki
-import anki.exporting
-import anki.storage
-from anki.cards import Card
-from anki.consts import MODEL_CLOZE
-from anki.exporting import AnkiPackageExporter
-from anki.importing import AnkiPackageImporter
-from anki.notes import Note
-from anki.errors import NotFoundError
-from aqt.qt import Qt, QTimer, QMessageBox, QCheckBox
-
-from .web import format_exception_reply, format_success_reply
-from .edit import Edit
-from . import web, util
-
-
-#
-# AnkiConnect
-#
-
-class AnkiConnect:
- def __init__(self):
- self.log = None
- self.timer = None
- self.server = web.WebServer(self.handler)
-
- def initLogging(self):
- logPath = util.setting('apiLogPath')
- if logPath is not None:
- self.log = open(logPath, 'w')
-
- def startWebServer(self):
- try:
- self.server.listen()
-
- # only keep reference to prevent garbage collection
- self.timer = QTimer()
- self.timer.timeout.connect(self.advance)
- self.timer.start(util.setting('apiPollInterval'))
- except:
- QMessageBox.critical(
- self.window(),
- 'AnkiConnect',
- 'Failed to listen on port {}.\nMake sure it is available and is not in use.'.format(util.setting('webBindPort'))
- )
-
- def save_model(self, models, ankiModel):
- models.update_dict(ankiModel)
-
- def logEvent(self, name, data):
- if self.log is not None:
- self.log.write('[{}]\n'.format(name))
- json.dump(data, self.log, indent=4, sort_keys=True)
- self.log.write('\n\n')
- self.log.flush()
-
-
- def advance(self):
- self.server.advance()
-
-
- def handler(self, request):
- self.logEvent('request', request)
-
- name = request.get('action', '')
- version = request.get('version', 4)
- params = request.get('params', {})
- key = request.get('key')
-
- try:
- if key != util.setting('apiKey') and name != 'requestPermission':
- raise Exception('valid api key must be provided')
-
- method = None
-
- for methodName, methodInst in inspect.getmembers(self, predicate=inspect.ismethod):
- apiVersionLast = 0
- apiNameLast = None
-
- if getattr(methodInst, 'api', False):
- for apiVersion, apiName in getattr(methodInst, 'versions', []):
- if apiVersionLast < apiVersion <= version:
- apiVersionLast = apiVersion
- apiNameLast = apiName
-
- if apiNameLast is None and apiVersionLast == 0:
- apiNameLast = methodName
-
- if apiNameLast is not None and apiNameLast == name:
- method = methodInst
- break
-
- if method is None:
- raise Exception('unsupported action')
-
- api_return_value = methodInst(**params)
- reply = format_success_reply(version, api_return_value)
-
- except Exception as e:
- reply = format_exception_reply(version, e)
-
- self.logEvent('reply', reply)
- return reply
-
-
- def window(self):
- return aqt.mw
-
-
- def reviewer(self):
- reviewer = self.window().reviewer
- if reviewer is None:
- raise Exception('reviewer is not available')
-
- return reviewer
-
-
- def collection(self):
- collection = self.window().col
- if collection is None:
- raise Exception('collection is not available')
-
- return collection
-
-
- def decks(self):
- decks = self.collection().decks
- if decks is None:
- raise Exception('decks are not available')
-
- return decks
-
-
- def scheduler(self):
- scheduler = self.collection().sched
- if scheduler is None:
- raise Exception('scheduler is not available')
-
- return scheduler
-
-
- def database(self):
- database = self.collection().db
- if database is None:
- raise Exception('database is not available')
-
- return database
-
-
- def media(self):
- media = self.collection().media
- if media is None:
- raise Exception('media is not available')
-
- return media
-
-
- def startEditing(self):
- self.window().requireReset()
-
-
- def stopEditing(self):
- if self.collection() is not None:
- self.window().maybeReset()
-
-
- def createNote(self, note):
- collection = self.collection()
-
- model = collection.models.byName(note['modelName'])
- if model is None:
- raise Exception('model was not found: {}'.format(note['modelName']))
-
- deck = collection.decks.byName(note['deckName'])
- if deck is None:
- raise Exception('deck was not found: {}'.format(note['deckName']))
-
- ankiNote = anki.notes.Note(collection, model)
- ankiNote.model()['did'] = deck['id']
- if 'tags' in note:
- ankiNote.tags = note['tags']
-
- for name, value in note['fields'].items():
- for ankiName in ankiNote.keys():
- if name.lower() == ankiName.lower():
- ankiNote[ankiName] = value
- break
-
- allowDuplicate = False
- duplicateScope = None
- duplicateScopeDeckName = None
- duplicateScopeCheckChildren = False
- duplicateScopeCheckAllModels = False
-
- if 'options' in note:
- options = note['options']
- if 'allowDuplicate' in options:
- allowDuplicate = options['allowDuplicate']
- if type(allowDuplicate) is not bool:
- raise Exception('option parameter "allowDuplicate" must be boolean')
- if 'duplicateScope' in options:
- duplicateScope = options['duplicateScope']
- if 'duplicateScopeOptions' in options:
- duplicateScopeOptions = options['duplicateScopeOptions']
- if 'deckName' in duplicateScopeOptions:
- duplicateScopeDeckName = duplicateScopeOptions['deckName']
- if 'checkChildren' in duplicateScopeOptions:
- duplicateScopeCheckChildren = duplicateScopeOptions['checkChildren']
- if type(duplicateScopeCheckChildren) is not bool:
- raise Exception('option parameter "duplicateScopeOptions.checkChildren" must be boolean')
- if 'checkAllModels' in duplicateScopeOptions:
- duplicateScopeCheckAllModels = duplicateScopeOptions['checkAllModels']
- if type(duplicateScopeCheckAllModels) is not bool:
- raise Exception('option parameter "duplicateScopeOptions.checkAllModels" must be boolean')
-
- duplicateOrEmpty = self.isNoteDuplicateOrEmptyInScope(
- ankiNote,
- deck,
- collection,
- duplicateScope,
- duplicateScopeDeckName,
- duplicateScopeCheckChildren,
- duplicateScopeCheckAllModels
- )
-
- if duplicateOrEmpty == 1:
- raise Exception('cannot create note because it is empty')
- elif duplicateOrEmpty == 2:
- if allowDuplicate:
- return ankiNote
- raise Exception('cannot create note because it is a duplicate')
- elif duplicateOrEmpty == 0:
- return ankiNote
- else:
- raise Exception('cannot create note for unknown reason')
-
-
- def isNoteDuplicateOrEmptyInScope(
- self,
- note,
- deck,
- collection,
- duplicateScope,
- duplicateScopeDeckName,
- duplicateScopeCheckChildren,
- duplicateScopeCheckAllModels
- ):
- # Returns: 1 if first is empty, 2 if first is a duplicate, 0 otherwise.
-
- # note.dupeOrEmpty returns if a note is a global duplicate with the specific model.
- # This is used as the default check, and the rest of this function is manually
- # checking if the note is a duplicate with additional options.
- if duplicateScope != 'deck' and not duplicateScopeCheckAllModels:
- return note.dupeOrEmpty() or 0
-
- # Primary field for uniqueness
- val = note.fields[0]
- if not val.strip():
- return 1
- csum = anki.utils.fieldChecksum(val)
-
- # Create dictionary of deck ids
- dids = None
- if duplicateScope == 'deck':
- did = deck['id']
- if duplicateScopeDeckName is not None:
- deck2 = collection.decks.byName(duplicateScopeDeckName)
- if deck2 is None:
- # Invalid deck, so cannot be duplicate
- return 0
- did = deck2['id']
-
- dids = {did: True}
- if duplicateScopeCheckChildren:
- for kv in collection.decks.children(did):
- dids[kv[1]] = True
-
- # Build query
- query = 'select id from notes where csum=?'
- queryArgs = [csum]
- if note.id:
- query += ' and id!=?'
- queryArgs.append(note.id)
- if not duplicateScopeCheckAllModels:
- query += ' and mid=?'
- queryArgs.append(note.mid)
-
- # Search
- for noteId in note.col.db.list(query, *queryArgs):
- if dids is None:
- # Duplicate note exists in the collection
- return 2
- # Validate that a card exists in one of the specified decks
- for cardDeckId in note.col.db.list('select did from cards where nid=?', noteId):
- if cardDeckId in dids:
- return 2
-
- # Not a duplicate
- return 0
-
- def getCard(self, card_id: int) -> Card:
- try:
- return self.collection().getCard(card_id)
- except NotFoundError:
- raise NotFoundError('Card was not found: {}'.format(card_id))
-
- def getNote(self, note_id: int) -> Note:
- try:
- return self.collection().getNote(note_id)
- except NotFoundError:
- raise NotFoundError('Note was not found: {}'.format(note_id))
-
- def deckStatsToJson(self, due_tree):
- deckStats = {'deck_id': due_tree.deck_id,
- 'name': due_tree.name,
- 'new_count': due_tree.new_count,
- 'learn_count': due_tree.learn_count,
- 'review_count': due_tree.review_count}
- if anki_version > (2, 1, 46):
- # total_in_deck is not supported on lower Anki versions
- deckStats['total_in_deck'] = due_tree.total_in_deck
- return deckStats
-
- def collectDeckTreeChildren(self, parent_node):
- allNodes = {parent_node.deck_id: parent_node}
- for child in parent_node.children:
- for deckId, childNode in self.collectDeckTreeChildren(child).items():
- allNodes[deckId] = childNode
- return allNodes
-
- #
- # Miscellaneous
- #
-
- @util.api()
- def version(self):
- return util.setting('apiVersion')
-
-
- @util.api()
- def requestPermission(self, origin, allowed):
- results = {
- "permission": "denied",
- }
-
- if allowed:
- results = {
- "permission": "granted",
- "requireApikey": bool(util.setting('apiKey')),
- "version": util.setting('apiVersion')
- }
-
- elif origin in util.setting('ignoreOriginList'):
- pass # defaults to denied
-
- else: # prompt the user
- msg = QMessageBox(None)
- msg.setWindowTitle("A website wants to access to Anki")
- msg.setText('"{}" requests permission to use Anki through AnkiConnect. Do you want to give it access?'.format(origin))
- msg.setInformativeText("By granting permission, you'll allow the website to modify your collection on your behalf, including the execution of destructive actions such as deck deletion.")
- msg.setWindowIcon(self.window().windowIcon())
- msg.setIcon(QMessageBox.Question)
- msg.setStandardButtons(QMessageBox.Yes|QMessageBox.No)
- msg.setDefaultButton(QMessageBox.No)
- msg.setCheckBox(QCheckBox(text='Ignore further requests from "{}"'.format(origin), parent=msg))
- msg.setWindowFlags(Qt.WindowStaysOnTopHint)
- pressedButton = msg.exec_()
-
- if pressedButton == QMessageBox.Yes:
- config = aqt.mw.addonManager.getConfig(__name__)
- config["webCorsOriginList"] = util.setting('webCorsOriginList')
- config["webCorsOriginList"].append(origin)
- aqt.mw.addonManager.writeConfig(__name__, config)
- results = {
- "permission": "granted",
- "requireApikey": bool(util.setting('apiKey')),
- "version": util.setting('apiVersion')
- }
-
- # if the origin isn't an empty string, the user clicks "No", and the ignore box is checked
- elif origin and pressedButton == QMessageBox.No and msg.checkBox().isChecked():
- config = aqt.mw.addonManager.getConfig(__name__)
- config["ignoreOriginList"] = util.setting('ignoreOriginList')
- config["ignoreOriginList"].append(origin)
- aqt.mw.addonManager.writeConfig(__name__, config)
-
- # else defaults to denied
-
- return results
-
-
- @util.api()
- def getProfiles(self):
- return self.window().pm.profiles()
-
-
- @util.api()
- def loadProfile(self, name):
- if name not in self.window().pm.profiles():
- return False
-
- if self.window().isVisible():
- cur_profile = self.window().pm.name
- if cur_profile != name:
- self.window().unloadProfileAndShowProfileManager()
-
- def waiter():
- # This function waits until main window is closed
- # It's needed cause sync can take quite some time
- # And if we call loadProfile until sync is ended things will go wrong
- if self.window().isVisible():
- QTimer.singleShot(1000, waiter)
- else:
- self.loadProfile(name)
-
- waiter()
- else:
- self.window().pm.load(name)
- self.window().loadProfile()
- self.window().profileDiag.closeWithoutQuitting()
-
- return True
-
-
- @util.api()
- def sync(self):
- self.window().onSync()
-
-
- @util.api()
- def multi(self, actions):
- return list(map(self.handler, actions))
-
-
- @util.api()
- def getNumCardsReviewedToday(self):
- return self.database().scalar('select count() from revlog where id > ?', (self.scheduler().dayCutoff - 86400) * 1000)
-
- @util.api()
- def getNumCardsReviewedByDay(self):
- return self.database().all('select date(id/1000 - ?, "unixepoch", "localtime") as day, count() from revlog group by day order by day desc',
- int(time.strftime("%H", time.localtime(self.scheduler().dayCutoff))) * 3600)
-
-
- @util.api()
- def getCollectionStatsHTML(self, wholeCollection=True):
- stats = self.collection().stats()
- stats.wholeCollection = wholeCollection
- return stats.report()
-
-
- #
- # Decks
- #
-
- @util.api()
- def deckNames(self):
- return self.decks().allNames()
-
-
- @util.api()
- def deckNamesAndIds(self):
- decks = {}
- for deck in self.deckNames():
- decks[deck] = self.decks().id(deck)
-
- return decks
-
-
- @util.api()
- def getDecks(self, cards):
- decks = {}
- for card in cards:
- did = self.database().scalar('select did from cards where id=?', card)
- deck = self.decks().get(did)['name']
- if deck in decks:
- decks[deck].append(card)
- else:
- decks[deck] = [card]
-
- return decks
-
-
- @util.api()
- def createDeck(self, deck):
- try:
- self.startEditing()
- did = self.decks().id(deck)
- finally:
- self.stopEditing()
-
- return did
-
-
- @util.api()
- def changeDeck(self, cards, deck):
- self.startEditing()
-
- did = self.collection().decks.id(deck)
- mod = anki.utils.intTime()
- usn = self.collection().usn()
-
- # normal cards
- scids = anki.utils.ids2str(cards)
- # remove any cards from filtered deck first
- self.collection().sched.remFromDyn(cards)
-
- # then move into new deck
- self.collection().db.execute('update cards set usn=?, mod=?, did=? where id in ' + scids, usn, mod, did)
- self.stopEditing()
-
-
- @util.api()
- def deleteDecks(self, decks, cardsToo=False):
- if not cardsToo:
- # since f592672fa952260655881a75a2e3c921b2e23857 (2.1.28)
- # (see anki$ git log "-Gassert cardsToo")
- # you can't delete decks without deleting cards as well.
- # however, since 62c23c6816adf912776b9378c008a52bb50b2e8d (2.1.45)
- # passing cardsToo to `rem` (long deprecated) won't raise an error!
- # this is dangerous, so let's raise our own exception
- raise Exception("Since Anki 2.1.28 it's not possible "
- "to delete decks without deleting cards as well")
- try:
- self.startEditing()
- decks = filter(lambda d: d in self.deckNames(), decks)
- for deck in decks:
- did = self.decks().id(deck)
- self.decks().rem(did, cardsToo=cardsToo)
- finally:
- self.stopEditing()
-
-
- @util.api()
- def getDeckConfig(self, deck):
- if deck not in self.deckNames():
- return False
-
- collection = self.collection()
- did = collection.decks.id(deck)
- return collection.decks.confForDid(did)
-
-
- @util.api()
- def saveDeckConfig(self, config):
- collection = self.collection()
-
- config['id'] = str(config['id'])
- config['mod'] = anki.utils.intTime()
- config['usn'] = collection.usn()
- if int(config['id']) not in [c['id'] for c in collection.decks.all_config()]:
- return False
- try:
- collection.decks.save(config)
- collection.decks.updateConf(config)
- except:
- return False
- return True
-
-
- @util.api()
- def setDeckConfigId(self, decks, configId):
- configId = int(configId)
- for deck in decks:
- if not deck in self.deckNames():
- return False
-
- collection = self.collection()
-
- for deck in decks:
- try:
- did = str(collection.decks.id(deck))
- deck_dict = aqt.mw.col.decks.decks[did]
- deck_dict['conf'] = configId
- collection.decks.save(deck_dict)
- except:
- return False
-
- return True
-
-
- @util.api()
- def cloneDeckConfigId(self, name, cloneFrom='1'):
- configId = int(cloneFrom)
- collection = self.collection()
- if configId not in [c['id'] for c in collection.decks.all_config()]:
- return False
-
- config = collection.decks.getConf(configId)
- return collection.decks.confId(name, config)
-
-
- @util.api()
- def removeDeckConfigId(self, configId):
- collection = self.collection()
- if int(configId) not in [c['id'] for c in collection.decks.all_config()]:
- return False
-
- collection.decks.remConf(configId)
- return True
-
- @util.api()
- def getDeckStats(self, decks):
- collection = self.collection()
- scheduler = self.scheduler()
- responseDict = {}
- deckIds = list(map(lambda d: collection.decks.id(d), decks))
-
- allDeckNodes = self.collectDeckTreeChildren(scheduler.deck_due_tree())
- for deckId, deckNode in allDeckNodes.items():
- if deckId in deckIds:
- responseDict[deckId] = self.deckStatsToJson(deckNode)
- return responseDict
-
- @util.api()
- def storeMediaFile(self, filename, data=None, path=None, url=None, skipHash=None, deleteExisting=True):
- if not (data or path or url):
- raise Exception('You must provide a "data", "path", or "url" field.')
- if data:
- mediaData = base64.b64decode(data)
- elif path:
- with open(path, 'rb') as f:
- mediaData = f.read()
- elif url:
- mediaData = util.download(url)
-
- if skipHash is None:
- skip = False
- else:
- m = hashlib.md5()
- m.update(mediaData)
- skip = skipHash == m.hexdigest()
-
- if skip:
- return None
- if deleteExisting:
- self.deleteMediaFile(filename)
- return self.media().writeData(filename, mediaData)
-
-
- @util.api()
- def retrieveMediaFile(self, filename):
- filename = os.path.basename(filename)
- filename = unicodedata.normalize('NFC', filename)
- filename = self.media().stripIllegal(filename)
-
- path = os.path.join(self.media().dir(), filename)
- if os.path.exists(path):
- with open(path, 'rb') as file:
- return base64.b64encode(file.read()).decode('ascii')
-
- return False
-
-
- @util.api()
- def getMediaFilesNames(self, pattern='*'):
- path = os.path.join(self.media().dir(), pattern)
- return [os.path.basename(p) for p in glob.glob(path)]
-
-
- @util.api()
- def deleteMediaFile(self, filename):
- try:
- self.media().syncDelete(filename)
- except AttributeError:
- self.media().trash_files([filename])
-
-
- @util.api()
- def addNote(self, note):
- ankiNote = self.createNote(note)
-
- self.addMediaFromNote(ankiNote, note)
-
- collection = self.collection()
- self.startEditing()
- nCardsAdded = collection.addNote(ankiNote)
- if nCardsAdded < 1:
- raise Exception('The field values you have provided would make an empty question on all cards.')
- collection.autosave()
- self.stopEditing()
-
- return ankiNote.id
-
-
- def addMediaFromNote(self, ankiNote, note):
- audioObjectOrList = note.get('audio')
- self.addMedia(ankiNote, audioObjectOrList, util.MediaType.Audio)
-
- videoObjectOrList = note.get('video')
- self.addMedia(ankiNote, videoObjectOrList, util.MediaType.Video)
-
- pictureObjectOrList = note.get('picture')
- self.addMedia(ankiNote, pictureObjectOrList, util.MediaType.Picture)
-
-
-
- def addMedia(self, ankiNote, mediaObjectOrList, mediaType):
- if mediaObjectOrList is None:
- return
-
- if isinstance(mediaObjectOrList, list):
- mediaList = mediaObjectOrList
- else:
- mediaList = [mediaObjectOrList]
-
- for media in mediaList:
- if media is not None and len(media['fields']) > 0:
- try:
- mediaFilename = self.storeMediaFile(media['filename'],
- data=media.get('data'),
- path=media.get('path'),
- url=media.get('url'),
- skipHash=media.get('skipHash'),
- deleteExisting=media.get('deleteExisting'))
-
- if mediaFilename is not None:
- for field in media['fields']:
- if field in ankiNote:
- if mediaType is util.MediaType.Picture:
- ankiNote[field] += u'<img src="{}">'.format(mediaFilename)
- elif mediaType is util.MediaType.Audio or mediaType is util.MediaType.Video:
- ankiNote[field] += u'[sound:{}]'.format(mediaFilename)
-
- except Exception as e:
- errorMessage = str(e).replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
- for field in media['fields']:
- if field in ankiNote:
- ankiNote[field] += errorMessage
-
-
- @util.api()
- def canAddNote(self, note):
- try:
- return bool(self.createNote(note))
- except:
- return False
-
-
- @util.api()
- def updateNoteFields(self, note):
- ankiNote = self.getNote(note['id'])
-
- self.startEditing()
- for name, value in note['fields'].items():
- if name in ankiNote:
- ankiNote[name] = value
-
- audioObjectOrList = note.get('audio')
- self.addMedia(ankiNote, audioObjectOrList, util.MediaType.Audio)
-
- videoObjectOrList = note.get('video')
- self.addMedia(ankiNote, videoObjectOrList, util.MediaType.Video)
-
- pictureObjectOrList = note.get('picture')
- self.addMedia(ankiNote, pictureObjectOrList, util.MediaType.Picture)
-
- ankiNote.flush()
-
- self.collection().autosave()
- self.stopEditing()
-
-
- @util.api()
- def addTags(self, notes, tags, add=True):
- self.startEditing()
- self.collection().tags.bulkAdd(notes, tags, add)
- self.stopEditing()
-
-
- @util.api()
- def removeTags(self, notes, tags):
- return self.addTags(notes, tags, False)
-
-
- @util.api()
- def getTags(self):
- return self.collection().tags.all()
-
-
- @util.api()
- def clearUnusedTags(self):
- self.collection().tags.registerNotes()
-
-
- @util.api()
- def replaceTags(self, notes, tag_to_replace, replace_with_tag):
- self.window().progress.start()
-
- for nid in notes:
- try:
- note = self.getNote(nid)
- except NotFoundError:
- continue
-
- if note.hasTag(tag_to_replace):
- note.delTag(tag_to_replace)
- note.addTag(replace_with_tag)
- note.flush()
-
- self.window().requireReset()
- self.window().progress.finish()
- self.window().reset()
-
-
- @util.api()
- def replaceTagsInAllNotes(self, tag_to_replace, replace_with_tag):
- self.window().progress.start()
-
- collection = self.collection()
- for nid in collection.db.list('select id from notes'):
- note = self.getNote(nid)
- if note.hasTag(tag_to_replace):
- note.delTag(tag_to_replace)
- note.addTag(replace_with_tag)
- note.flush()
-
- self.window().requireReset()
- self.window().progress.finish()
- self.window().reset()
-
-
- @util.api()
- def setEaseFactors(self, cards, easeFactors):
- couldSetEaseFactors = []
- for i, card in enumerate(cards):
- try:
- ankiCard = self.getCard(card)
- except NotFoundError:
- couldSetEaseFactors.append(False)
- continue
-
- couldSetEaseFactors.append(True)
- ankiCard.factor = easeFactors[i]
- ankiCard.flush()
-
- return couldSetEaseFactors
-
- @util.api()
- def setSpecificValueOfCard(self, card, keys,
- newValues, warning_check=False):
- if isinstance(card, list):
- print("card has to be int, not list")
- return False
-
- if not isinstance(keys, list) or not isinstance(newValues, list):
- print("keys and newValues have to be lists.")
- return False
-
- if len(newValues) != len(keys):
- print("Invalid list lengths.")
- return False
-
- for key in keys:
- if key in ["did", "id", "ivl", "lapses", "left", "mod", "nid",
- "odid", "odue", "ord", "queue", "reps", "type", "usn"]:
- if warning_check is False:
- return False
-
- result = []
- try:
- ankiCard = self.getCard(card)
- for i, key in enumerate(keys):
- setattr(ankiCard, key, newValues[i])
- ankiCard.flush()
- result.append(True)
- except Exception as e:
- result.append([False, str(e)])
- return result
-
-
- @util.api()
- def getEaseFactors(self, cards):
- easeFactors = []
- for card in cards:
- try:
- ankiCard = self.getCard(card)
- except NotFoundError:
- easeFactors.append(None)
- continue
-
- easeFactors.append(ankiCard.factor)
-
- return easeFactors
-
-
- @util.api()
- def suspend(self, cards, suspend=True):
- for card in cards:
- if self.suspended(card) == suspend:
- cards.remove(card)
-
- if len(cards) == 0:
- return False
-
- scheduler = self.scheduler()
- self.startEditing()
- if suspend:
- scheduler.suspendCards(cards)
- else:
- scheduler.unsuspendCards(cards)
- self.stopEditing()
-
- return True
-
-
- @util.api()
- def unsuspend(self, cards):
- self.suspend(cards, False)
-
-
- @util.api()
- def suspended(self, card):
- card = self.getCard(card)
- return card.queue == -1
-
-
- @util.api()
- def areSuspended(self, cards):
- suspended = []
- for card in cards:
- try:
- suspended.append(self.suspended(card))
- except NotFoundError:
- suspended.append(None)
-
- return suspended
-
-
- @util.api()
- def areDue(self, cards):
- due = []
- for card in cards:
- if self.findCards('cid:{} is:new'.format(card)):
- due.append(True)
- else:
- date, ivl = self.collection().db.all('select id/1000.0, ivl from revlog where cid = ?', card)[-1]
- if ivl >= -1200:
- due.append(bool(self.findCards('cid:{} is:due'.format(card))))
- else:
- due.append(date - ivl <= time.time())
-
- return due
-
-
- @util.api()
- def getIntervals(self, cards, complete=False):
- intervals = []
- for card in cards:
- if self.findCards('cid:{} is:new'.format(card)):
- intervals.append(0)
- else:
- interval = self.collection().db.list('select ivl from revlog where cid = ?', card)
- if not complete:
- interval = interval[-1]
- intervals.append(interval)
-
- return intervals
-
-
-
- @util.api()
- def modelNames(self):
- return self.collection().models.allNames()
-
-
- @util.api()
- def createModel(self, modelName, inOrderFields, cardTemplates, css = None, isCloze = False):
- # https://github.com/dae/anki/blob/b06b70f7214fb1f2ce33ba06d2b095384b81f874/anki/stdmodels.py
- if len(inOrderFields) == 0:
- raise Exception('Must provide at least one field for inOrderFields')
- if len(cardTemplates) == 0:
- raise Exception('Must provide at least one card for cardTemplates')
- if modelName in self.collection().models.allNames():
- raise Exception('Model name already exists')
-
- collection = self.collection()
- mm = collection.models
-
- # Generate new Note
- m = mm.new(modelName)
- if isCloze:
- m['type'] = MODEL_CLOZE
-
- # Create fields and add them to Note
- for field in inOrderFields:
- fm = mm.newField(field)
- mm.addField(m, fm)
-
- # Add shared css to model if exists. Use default otherwise
- if (css is not None):
- m['css'] = css
-
- # Generate new card template(s)
- cardCount = 1
- for card in cardTemplates:
- cardName = 'Card ' + str(cardCount)
- if 'Name' in card:
- cardName = card['Name']
-
- t = mm.newTemplate(cardName)
- cardCount += 1
- t['qfmt'] = card['Front']
- t['afmt'] = card['Back']
- mm.addTemplate(m, t)
-
- mm.add(m)
- return m
-
-
- @util.api()
- def modelNamesAndIds(self):
- models = {}
- for model in self.modelNames():
- models[model] = int(self.collection().models.byName(model)['id'])
-
- return models
-
-
- @util.api()
- def modelNameFromId(self, modelId):
- model = self.collection().models.get(modelId)
- if model is None:
- raise Exception('model was not found: {}'.format(modelId))
- else:
- return model['name']
-
-
- @util.api()
- def modelFieldNames(self, modelName):
- model = self.collection().models.byName(modelName)
- if model is None:
- raise Exception('model was not found: {}'.format(modelName))
- else:
- return [field['name'] for field in model['flds']]
-
-
- @util.api()
- def modelFieldsOnTemplates(self, modelName):
- model = self.collection().models.byName(modelName)
- if model is None:
- raise Exception('model was not found: {}'.format(modelName))
-
- templates = {}
- for template in model['tmpls']:
- fields = []
- for side in ['qfmt', 'afmt']:
- fieldsForSide = []
-
- # based on _fieldsOnTemplate from aqt/clayout.py
- matches = re.findall('{{[^#/}]+?}}', template[side])
- for match in matches:
- # remove braces and modifiers
- match = re.sub(r'[{}]', '', match)
- match = match.split(':')[-1]
-
- # for the answer side, ignore fields present on the question side + the FrontSide field
- if match == 'FrontSide' or side == 'afmt' and match in fields[0]:
- continue
- fieldsForSide.append(match)
-
- fields.append(fieldsForSide)
-
- templates[template['name']] = fields
-
- return templates
-
-
- @util.api()
- def modelTemplates(self, modelName):
- model = self.collection().models.byName(modelName)
- if model is None:
- raise Exception('model was not found: {}'.format(modelName))
-
- templates = {}
- for template in model['tmpls']:
- templates[template['name']] = {'Front': template['qfmt'], 'Back': template['afmt']}
-
- return templates
-
-
- @util.api()
- def modelStyling(self, modelName):
- model = self.collection().models.byName(modelName)
- if model is None:
- raise Exception('model was not found: {}'.format(modelName))
-
- return {'css': model['css']}
-
-
- @util.api()
- def updateModelTemplates(self, model):
- models = self.collection().models
- ankiModel = models.byName(model['name'])
- if ankiModel is None:
- raise Exception('model was not found: {}'.format(model['name']))
-
- templates = model['templates']
- for ankiTemplate in ankiModel['tmpls']:
- template = templates.get(ankiTemplate['name'])
- if template:
- qfmt = template.get('Front')
- if qfmt:
- ankiTemplate['qfmt'] = qfmt
-
- afmt = template.get('Back')
- if afmt:
- ankiTemplate['afmt'] = afmt
-
- self.save_model(models, ankiModel)
-
-
- @util.api()
- def updateModelStyling(self, model):
- models = self.collection().models
- ankiModel = models.byName(model['name'])
- if ankiModel is None:
- raise Exception('model was not found: {}'.format(model['name']))
-
- ankiModel['css'] = model['css']
-
- self.save_model(models, ankiModel)
-
-
- @util.api()
- def findAndReplaceInModels(self, modelName, findText, replaceText, front=True, back=True, css=True):
- if not modelName:
- ankiModel = self.collection().models.allNames()
- else:
- model = self.collection().models.byName(modelName)
- if model is None:
- raise Exception('model was not found: {}'.format(modelName))
- ankiModel = [modelName]
- updatedModels = 0
- for model in ankiModel:
- model = self.collection().models.byName(model)
- checkForText = False
- if css and findText in model['css']:
- checkForText = True
- model['css'] = model['css'].replace(findText, replaceText)
- for tmpls in model.get('tmpls'):
- if front and findText in tmpls['qfmt']:
- checkForText = True
- tmpls['qfmt'] = tmpls['qfmt'].replace(findText, replaceText)
- if back and findText in tmpls['afmt']:
- checkForText = True
- tmpls['afmt'] = tmpls['afmt'].replace(findText, replaceText)
- self.save_model(self.collection().models, model)
- if checkForText:
- updatedModels += 1
- return updatedModels
-
-
- @util.api()
- def deckNameFromId(self, deckId):
- deck = self.collection().decks.get(deckId)
- if deck is None:
- raise Exception('deck was not found: {}'.format(deckId))
-
- return deck['name']
-
-
- @util.api()
- def findNotes(self, query=None):
- if query is None:
- return []
-
- return list(map(int, self.collection().findNotes(query)))
-
-
- @util.api()
- def findCards(self, query=None):
- if query is None:
- return []
-
- return list(map(int, self.collection().findCards(query)))
-
-
- @util.api()
- def cardsInfo(self, cards):
- result = []
- for cid in cards:
- try:
- card = self.getCard(cid)
- model = card.model()
- note = card.note()
- fields = {}
- for info in model['flds']:
- order = info['ord']
- name = info['name']
- fields[name] = {'value': note.fields[order], 'order': order}
-
- result.append({
- 'cardId': card.id,
- 'fields': fields,
- 'fieldOrder': card.ord,
- 'question': util.cardQuestion(card),
- 'answer': util.cardAnswer(card),
- 'modelName': model['name'],
- 'ord': card.ord,
- 'deckName': self.deckNameFromId(card.did),
- 'css': model['css'],
- 'factor': card.factor,
- #This factor is 10 times the ease percentage,
- # so an ease of 310% would be reported as 3100
- 'interval': card.ivl,
- 'note': card.nid,
- 'type': card.type,
- 'queue': card.queue,
- 'due': card.due,
- 'reps': card.reps,
- 'lapses': card.lapses,
- 'left': card.left,
- 'mod': card.mod,
- })
- except NotFoundError:
- # Anki will give a NotFoundError if the card ID does not exist.
- # Best behavior is probably to add an 'empty card' to the
- # returned result, so that the items of the input and return
- # lists correspond.
- result.append({})
-
- return result
-
- @util.api()
- def cardsModTime(self, cards):
- result = []
- for cid in cards:
- try:
- card = self.getCard(cid)
- result.append({
- 'cardId': card.id,
- 'mod': card.mod,
- })
- except NotFoundError:
- # Anki will give a NotFoundError if the card ID does not exist.
- # Best behavior is probably to add an 'empty card' to the
- # returned result, so that the items of the input and return
- # lists correspond.
- result.append({})
- return result
-
-
- @util.api()
- def forgetCards(self, cards):
- self.startEditing()
- scids = anki.utils.ids2str(cards)
- self.collection().db.execute('update cards set type=0, queue=0, left=0, ivl=0, due=0, odue=0, factor=0 where id in ' + scids)
- self.stopEditing()
-
-
- @util.api()
- def relearnCards(self, cards):
- self.startEditing()
- scids = anki.utils.ids2str(cards)
- self.collection().db.execute('update cards set type=3, queue=1 where id in ' + scids)
- self.stopEditing()
-
-
- @util.api()
- def cardReviews(self, deck, startID):
- return self.database().all(
- 'select id, cid, usn, ease, ivl, lastIvl, factor, time, type from revlog ''where id>? and cid in (select id from cards where did=?)',
- startID,
- self.decks().id(deck)
- )
-
-
- @util.api()
- def reloadCollection(self):
- self.collection().reset()
-
-
- @util.api()
- def getLatestReviewID(self, deck):
- return self.database().scalar(
- 'select max(id) from revlog where cid in (select id from cards where did=?)',
- self.decks().id(deck)
- ) or 0
-
-
- @util.api()
- def insertReviews(self, reviews):
- if len(reviews) > 0:
- sql = 'insert into revlog(id,cid,usn,ease,ivl,lastIvl,factor,time,type) values '
- for row in reviews:
- sql += '(%s),' % ','.join(map(str, row))
- sql = sql[:-1]
- self.database().execute(sql)
-
-
- @util.api()
- def notesInfo(self, notes):
- result = []
- for nid in notes:
- try:
- note = self.getNote(nid)
- model = note.model()
-
- fields = {}
- for info in model['flds']:
- order = info['ord']
- name = info['name']
- fields[name] = {'value': note.fields[order], 'order': order}
-
- result.append({
- 'noteId': note.id,
- 'tags' : note.tags,
- 'fields': fields,
- 'modelName': model['name'],
- 'cards': self.collection().db.list('select id from cards where nid = ? order by ord', note.id)
- })
- except NotFoundError:
- # Anki will give a NotFoundError if the note ID does not exist.
- # Best behavior is probably to add an 'empty card' to the
- # returned result, so that the items of the input and return
- # lists correspond.
- result.append({})
-
- return result
-
-
- @util.api()
- def deleteNotes(self, notes):
- try:
- self.collection().remNotes(notes)
- finally:
- self.stopEditing()
-
-
- @util.api()
- def removeEmptyNotes(self):
- for model in self.collection().models.all():
- if self.collection().models.useCount(model) == 0:
- self.collection().models.rem(model)
- self.window().requireReset()
-
-
- @util.api()
- def cardsToNotes(self, cards):
- return self.collection().db.list('select distinct nid from cards where id in ' + anki.utils.ids2str(cards))
-
-
- @util.api()
- def guiBrowse(self, query=None):
- browser = aqt.dialogs.open('Browser', self.window())
- browser.activateWindow()
-
- if query is not None:
- browser.form.searchEdit.lineEdit().setText(query)
- if hasattr(browser, 'onSearch'):
- browser.onSearch()
- else:
- browser.onSearchActivated()
-
- return self.findCards(query)
-
-
- @util.api()
- def guiEditNote(self, note):
- Edit.open_dialog_and_show_note_with_id(note)
-
-
- @util.api()
- def guiSelectedNotes(self):
- (creator, instance) = aqt.dialogs._dialogs['Browser']
- if instance is None:
- return []
- return instance.selectedNotes()
-
- @util.api()
- def guiAddCards(self, note=None):
- if note is not None:
- collection = self.collection()
-
- deck = collection.decks.byName(note['deckName'])
- if deck is None:
- raise Exception('deck was not found: {}'.format(note['deckName']))
-
- collection.decks.select(deck['id'])
- savedMid = deck.pop('mid', None)
-
- model = collection.models.byName(note['modelName'])
- if model is None:
- raise Exception('model was not found: {}'.format(note['modelName']))
-
- collection.models.setCurrent(model)
- collection.models.update(model)
-
- ankiNote = anki.notes.Note(collection, model)
-
- # fill out card beforehand, so we can be sure of the note id
- if 'fields' in note:
- for name, value in note['fields'].items():
- if name in ankiNote:
- ankiNote[name] = value
-
- self.addMediaFromNote(ankiNote, note)
-
- if 'tags' in note:
- ankiNote.tags = note['tags']
-
- def openNewWindow():
- nonlocal ankiNote
-
- addCards = aqt.dialogs.open('AddCards', self.window())
-
- if savedMid:
- deck['mid'] = savedMid
-
- addCards.editor.set_note(ankiNote)
-
- addCards.activateWindow()
-
- aqt.dialogs.open('AddCards', self.window())
- addCards.setAndFocusNote(addCards.editor.note)
-
- currentWindow = aqt.dialogs._dialogs['AddCards'][1]
-
- if currentWindow is not None:
- currentWindow.closeWithCallback(openNewWindow)
- else:
- openNewWindow()
-
- return ankiNote.id
-
- else:
- addCards = aqt.dialogs.open('AddCards', self.window())
- addCards.activateWindow()
-
- return addCards.editor.note.id
-
-
- @util.api()
- def guiReviewActive(self):
- return self.reviewer().card is not None and self.window().state == 'review'
-
-
- @util.api()
- def guiCurrentCard(self):
- if not self.guiReviewActive():
- raise Exception('Gui review is not currently active.')
-
- reviewer = self.reviewer()
- card = reviewer.card
- model = card.model()
- note = card.note()
-
- fields = {}
- for info in model['flds']:
- order = info['ord']
- name = info['name']
- fields[name] = {'value': note.fields[order], 'order': order}
-
- buttonList = reviewer._answerButtonList()
- return {
- 'cardId': card.id,
- 'fields': fields,
- 'fieldOrder': card.ord,
- 'question': util.cardQuestion(card),
- 'answer': util.cardAnswer(card),
- 'buttons': [b[0] for b in buttonList],
- 'nextReviews': [reviewer.mw.col.sched.nextIvlStr(reviewer.card, b[0], True) for b in buttonList],
- 'modelName': model['name'],
- 'deckName': self.deckNameFromId(card.did),
- 'css': model['css'],
- 'template': card.template()['name']
- }
-
-
- @util.api()
- def guiStartCardTimer(self):
- if not self.guiReviewActive():
- return False
-
- card = self.reviewer().card
- if card is not None:
- card.startTimer()
- return True
-
- return False
-
-
- @util.api()
- def guiShowQuestion(self):
- if self.guiReviewActive():
- self.reviewer()._showQuestion()
- return True
-
- return False
-
-
- @util.api()
- def guiShowAnswer(self):
- if self.guiReviewActive():
- self.window().reviewer._showAnswer()
- return True
-
- return False
-
-
- @util.api()
- def guiAnswerCard(self, ease):
- if not self.guiReviewActive():
- return False
-
- reviewer = self.reviewer()
- if reviewer.state != 'answer':
- return False
- if ease <= 0 or ease > self.scheduler().answerButtons(reviewer.card):
- return False
-
- reviewer._answerCard(ease)
- return True
-
-
- @util.api()
- def guiDeckOverview(self, name):
- collection = self.collection()
- if collection is not None:
- deck = collection.decks.byName(name)
- if deck is not None:
- collection.decks.select(deck['id'])
- self.window().onOverview()
- return True
-
- return False
-
-
- @util.api()
- def guiDeckBrowser(self):
- self.window().moveToState('deckBrowser')
-
-
- @util.api()
- def guiDeckReview(self, name):
- if self.guiDeckOverview(name):
- self.window().moveToState('review')
- return True
-
- return False
-
-
- @util.api()
- def guiExitAnki(self):
- timer = QTimer()
- timer.timeout.connect(self.window().close)
- timer.start(1000) # 1s should be enough to allow the response to be sent.
-
-
- @util.api()
- def guiCheckDatabase(self):
- self.window().onCheckDB()
- return True
-
-
- @util.api()
- def addNotes(self, notes):
- results = []
- for note in notes:
- try:
- results.append(self.addNote(note))
- except:
- results.append(None)
-
- return results
-
-
- @util.api()
- def canAddNotes(self, notes):
- results = []
- for note in notes:
- results.append(self.canAddNote(note))
-
- return results
-
-
- @util.api()
- def exportPackage(self, deck, path, includeSched=False):
- collection = self.collection()
- if collection is not None:
- deck = collection.decks.byName(deck)
- if deck is not None:
- exporter = AnkiPackageExporter(collection)
- exporter.did = deck['id']
- exporter.includeSched = includeSched
- exporter.exportInto(path)
- return True
-
- return False
-
-
- @util.api()
- def importPackage(self, path):
- collection = self.collection()
- if collection is not None:
- try:
- self.startEditing()
- importer = AnkiPackageImporter(collection, path)
- importer.run()
- except:
- self.stopEditing()
- raise
- else:
- self.stopEditing()
- return True
-
- return False
-
-
- @util.api()
- def apiReflect(self, scopes=None, actions=None):
- if not isinstance(scopes, list):
- raise Exception('scopes has invalid value')
- if not (actions is None or isinstance(actions, list)):
- raise Exception('actions has invalid value')
-
- cls = type(self)
- scopes2 = []
- result = {'scopes': scopes2}
-
- if 'actions' in scopes:
- if actions is None:
- actions = dir(cls)
-
- methodNames = []
- for methodName in actions:
- if not isinstance(methodName, str):
- pass
- method = getattr(cls, methodName, None)
- if method is not None and getattr(method, 'api', False):
- methodNames.append(methodName)
-
- scopes2.append('actions')
- result['actions'] = methodNames
-
- return result
-
-
-#
-# Entry
-#
-
-# when run inside Anki, `__name__` would be either numeric,
-# or, if installed via `link.sh`, `AnkiConnectDev`
-if __name__ != "plugin":
- if platform.system() == "Windows" and anki_version == (2, 1, 50):
- util.patch_anki_2_1_50_having_null_stdout_on_windows()
-
- Edit.register_with_anki()
-
- ac = AnkiConnect()
- ac.initLogging()
- ac.startWebServer()
diff --git a/.local/share/Anki2/addons21/Anki_connect/__pycache__/__init__.cpython-39.opt-2.pyc b/.local/share/Anki2/addons21/Anki_connect/__pycache__/__init__.cpython-39.opt-2.pyc
deleted file mode 100644
index cd46468..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/__pycache__/__init__.cpython-39.opt-2.pyc
+++ /dev/null
Binary files differ
diff --git a/.local/share/Anki2/addons21/Anki_connect/__pycache__/edit.cpython-39.opt-2.pyc b/.local/share/Anki2/addons21/Anki_connect/__pycache__/edit.cpython-39.opt-2.pyc
deleted file mode 100644
index 8290761..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/__pycache__/edit.cpython-39.opt-2.pyc
+++ /dev/null
Binary files differ
diff --git a/.local/share/Anki2/addons21/Anki_connect/__pycache__/util.cpython-39.opt-2.pyc b/.local/share/Anki2/addons21/Anki_connect/__pycache__/util.cpython-39.opt-2.pyc
deleted file mode 100644
index d9529a9..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/__pycache__/util.cpython-39.opt-2.pyc
+++ /dev/null
Binary files differ
diff --git a/.local/share/Anki2/addons21/Anki_connect/__pycache__/web.cpython-39.opt-2.pyc b/.local/share/Anki2/addons21/Anki_connect/__pycache__/web.cpython-39.opt-2.pyc
deleted file mode 100644
index 44f3e41..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/__pycache__/web.cpython-39.opt-2.pyc
+++ /dev/null
Binary files differ
diff --git a/.local/share/Anki2/addons21/Anki_connect/config.json b/.local/share/Anki2/addons21/Anki_connect/config.json
deleted file mode 100644
index d3b51ea..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/config.json
+++ /dev/null
@@ -1,8 +0,0 @@
-{
- "apiKey": null,
- "apiLogPath": null,
- "webBindAddress": "127.0.0.1",
- "webBindPort": 8765,
- "webCorsOriginList": ["http://localhost"],
- "ignoreOriginList": []
-}
diff --git a/.local/share/Anki2/addons21/Anki_connect/config.md b/.local/share/Anki2/addons21/Anki_connect/config.md
deleted file mode 100644
index 6bacba2..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/config.md
+++ /dev/null
@@ -1 +0,0 @@
-Read the documentation on the [AnkiConnect](https://foosoft.net/projects/anki-connect/) project page for details.
diff --git a/.local/share/Anki2/addons21/Anki_connect/edit.py b/.local/share/Anki2/addons21/Anki_connect/edit.py
deleted file mode 100644
index d414575..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/edit.py
+++ /dev/null
@@ -1,458 +0,0 @@
-import aqt
-import aqt.editor
-import aqt.browser.previewer
-from aqt import gui_hooks
-from aqt.qt import QDialog, Qt, QKeySequence, QShortcut
-from aqt.utils import disable_help_button, restoreGeom, saveGeom, tooltip
-from anki.errors import NotFoundError
-from anki.consts import QUEUE_TYPE_SUSPENDED
-from anki.utils import ids2str
-
-from . import anki_version
-
-
-# Edit dialog. Like Edit Current, but:
-# * has a Preview button to preview the cards for the note
-# * has Previous/Back buttons to navigate the history of the dialog
-# * has a Browse button to open the history in the Browser
-# * has no bar with the Close button
-#
-# To register in Anki's dialog system:
-# > from .edit import Edit
-# > Edit.register_with_anki()
-#
-# To (re)open (note_id is an integer):
-# > Edit.open_dialog_and_show_note_with_id(note_id)
-
-
-DOMAIN_PREFIX = "foosoft.ankiconnect."
-
-
-def get_note_by_note_id(note_id):
- return aqt.mw.col.get_note(note_id)
-
-def is_card_suspended(card):
- return card.queue == QUEUE_TYPE_SUSPENDED
-
-def filter_valid_note_ids(note_ids):
- return aqt.mw.col.db.list(
- "select id from notes where id in " + ids2str(note_ids)
- )
-
-
-##############################################################################
-
-
-class DecentPreviewer(aqt.browser.previewer.MultiCardPreviewer):
- class Adapter:
- def get_current_card(self): raise NotImplementedError
- def can_select_previous_card(self): raise NotImplementedError
- def can_select_next_card(self): raise NotImplementedError
- def select_previous_card(self): raise NotImplementedError
- def select_next_card(self): raise NotImplementedError
-
- def __init__(self, adapter: Adapter):
- super().__init__(parent=None, mw=aqt.mw, on_close=lambda: None) # noqa
- self.adapter = adapter
- self.last_card_id = 0
-
- def card(self):
- return self.adapter.get_current_card()
-
- def card_changed(self):
- current_card_id = self.adapter.get_current_card().id
- changed = self.last_card_id != current_card_id
- self.last_card_id = current_card_id
- return changed
-
- # the check if we can select next/previous card is needed because
- # the buttons sometimes get disabled a tad too late
- # and can still be pressed by user.
- # this is likely due to Anki sometimes delaying rendering of cards
- # in order to avoid rendering them too fast?
- def _on_prev_card(self):
- if self.adapter.can_select_previous_card():
- self.adapter.select_previous_card()
- self.render_card()
-
- def _on_next_card(self):
- if self.adapter.can_select_next_card():
- self.adapter.select_next_card()
- self.render_card()
-
- def _should_enable_prev(self):
- return self.showing_answer_and_can_show_question() or \
- self.adapter.can_select_previous_card()
-
- def _should_enable_next(self):
- return self.showing_question_and_can_show_answer() or \
- self.adapter.can_select_next_card()
-
- def _render_scheduled(self):
- super()._render_scheduled() # noqa
- self._updateButtons()
-
- def showing_answer_and_can_show_question(self):
- return self._state == "answer" and not self._show_both_sides
-
- def showing_question_and_can_show_answer(self):
- return self._state == "question"
-
-
-class ReadyCardsAdapter(DecentPreviewer.Adapter):
- def __init__(self, cards):
- self.cards = cards
- self.current = 0
-
- def get_current_card(self):
- return self.cards[self.current]
-
- def can_select_previous_card(self):
- return self.current > 0
-
- def can_select_next_card(self):
- return self.current < len(self.cards) - 1
-
- def select_previous_card(self):
- self.current -= 1
-
- def select_next_card(self):
- self.current += 1
-
-
-##############################################################################
-
-
-# store note ids instead of notes, as note objects don't implement __eq__ etc
-class History:
- number_of_notes_to_keep_in_history = 25
-
- def __init__(self):
- self.note_ids = []
-
- def append(self, note):
- if note.id in self.note_ids:
- self.note_ids.remove(note.id)
- self.note_ids.append(note.id)
- self.note_ids = self.note_ids[-self.number_of_notes_to_keep_in_history:]
-
- def has_note_to_left_of(self, note):
- return note.id in self.note_ids and note.id != self.note_ids[0]
-
- def has_note_to_right_of(self, note):
- return note.id in self.note_ids and note.id != self.note_ids[-1]
-
- def get_note_to_left_of(self, note):
- note_id = self.note_ids[self.note_ids.index(note.id) - 1]
- return get_note_by_note_id(note_id)
-
- def get_note_to_right_of(self, note):
- note_id = self.note_ids[self.note_ids.index(note.id) + 1]
- return get_note_by_note_id(note_id)
-
- def get_last_note(self): # throws IndexError if history empty
- return get_note_by_note_id(self.note_ids[-1])
-
- def remove_invalid_notes(self):
- self.note_ids = filter_valid_note_ids(self.note_ids)
-
-history = History()
-
-
-# see method `find_cards` of `collection.py`
-def trigger_search_for_dialog_history_notes(search_context, use_history_order):
- search_context.search = " or ".join(
- f"nid:{note_id}" for note_id in history.note_ids
- )
-
- if use_history_order:
- search_context.order = f"""case c.nid {
- " ".join(
- f"when {note_id} then {n}"
- for (n, note_id) in enumerate(reversed(history.note_ids))
- )
- } end asc"""
-
-
-##############################################################################
-
-
-# noinspection PyAttributeOutsideInit
-class Edit(aqt.editcurrent.EditCurrent):
- dialog_geometry_tag = DOMAIN_PREFIX + "edit"
- dialog_registry_tag = DOMAIN_PREFIX + "Edit"
- dialog_search_tag = DOMAIN_PREFIX + "edit.history"
-
- # depending on whether the dialog already exists,
- # upon a request to open the dialog via `aqt.dialogs.open()`,
- # the manager will call either the constructor or the `reopen` method
- def __init__(self, note):
- QDialog.__init__(self, None, Qt.WindowType.Window)
- aqt.mw.garbage_collect_on_dialog_finish(self)
- self.form = aqt.forms.editcurrent.Ui_Dialog()
- self.form.setupUi(self)
- self.setWindowTitle("Edit")
- self.setMinimumWidth(250)
- self.setMinimumHeight(400)
- restoreGeom(self, self.dialog_geometry_tag)
- disable_help_button(self)
-
- self.form.buttonBox.setVisible(False) # hides the Close button bar
- self.setup_editor_buttons()
-
- self.show()
- self.bring_to_foreground()
-
- history.remove_invalid_notes()
- history.append(note)
- self.show_note(note)
-
- gui_hooks.operation_did_execute.append(self.on_operation_did_execute)
- gui_hooks.editor_did_load_note.append(self.editor_did_load_note)
-
- def reopen(self, note):
- history.append(note)
- self.show_note(note)
- self.bring_to_foreground()
-
- def cleanup_and_close(self):
- gui_hooks.editor_did_load_note.remove(self.editor_did_load_note)
- gui_hooks.operation_did_execute.remove(self.on_operation_did_execute)
-
- self.editor.cleanup()
- saveGeom(self, self.dialog_geometry_tag)
- aqt.dialogs.markClosed(self.dialog_registry_tag)
- QDialog.reject(self)
-
- # This method (mostly) solves (at least on my Windows 10 machine) three issues
- # with window activation. Without this not even too hacky a fix,
- # * When dialog is opened from Yomichan *for the first time* since app start,
- # the dialog opens in background (just like Browser does),
- # but does not flash in taskbar (unlike Browser);
- # * When dialog is opened, closed, *then main window is focused by clicking in it*,
- # then dialog is opened from Yomichan again, same issue as above arises;
- # * When dialog is restored from minimized state *and main window isn't minimized*,
- # opening the dialog from Yomichan does not reliably focus it;
- # sometimes it opens in foreground, sometimes in background.
- # With this fix, windows nearly always appear in foreground in all three cases.
- # In the case of the first two issues, strictly speaking, the fix is not ideal:
- # the window appears in background first, and then quickly pops into foreground.
- # It is not *too* unsightly, probably, no-one will notice this;
- # still, a better solution must be possible. TODO find one!
- #
- # Note that operation systems, notably Windows, and desktop managers, may restrict
- # background applications from raising windows to prevent them from interrupting
- # what the user is currently doing. For details, see:
- # https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-setforegroundwindow#remarks
- # https://doc.qt.io/qt-5/qwidget.html#activateWindow
- # https://wiki.qt.io/Technical_FAQ#QWidget_::activateWindow.28.29_-_behavior_under_windows
- def bring_to_foreground(self):
- aqt.mw.app.processEvents()
- self.activateWindow()
- self.raise_()
-
- #################################### hooks enabled during dialog lifecycle
-
- def on_operation_did_execute(self, changes, handler):
- if changes.note_text and handler is not self.editor:
- self.reload_notes_after_user_action_elsewhere()
-
- def editor_did_load_note(self, _editor):
- self.enable_disable_next_and_previous_buttons()
-
- ###################################################### load & reload notes
-
- # setting editor.card is required for the "Cards…" button to work properly
- def show_note(self, note):
- self.note = note
- cards = note.cards()
-
- self.editor.set_note(note)
- self.editor.card = cards[0] if cards else None
-
- if any(is_card_suspended(card) for card in cards):
- tooltip("Some of the cards associated with this note "
- "have been suspended", parent=self)
-
- def reload_notes_after_user_action_elsewhere(self):
- history.remove_invalid_notes()
-
- try:
- self.note.load() # this also updates the fields
- except NotFoundError:
- try:
- self.note = history.get_last_note()
- except IndexError:
- self.cleanup_and_close()
- return
-
- self.show_note(self.note)
-
- ################################################################## actions
-
- # search two times, one is to select the current note or its cards,
- # and another to show the whole history, while keeping the above selection
- # set sort column to our search tag, which:
- # * prevents the column sort indicator from being shown
- # * serves as a hint for us to show notes or cards in history order
- # (user can then click on any of the column names
- # to show history cards in the order of their choosing)
- def show_browser(self, *_):
- def search_input_select_all(hook_browser, *_):
- hook_browser.form.searchEdit.lineEdit().selectAll()
- gui_hooks.browser_did_change_row.remove(search_input_select_all)
- gui_hooks.browser_did_change_row.append(search_input_select_all)
-
- browser = aqt.dialogs.open("Browser", aqt.mw)
- browser.table._state.sort_column = self.dialog_search_tag # noqa
- browser.table._set_sort_indicator() # noqa
-
- browser.search_for(f"nid:{self.note.id}")
- browser.table.select_all()
- browser.search_for(self.dialog_search_tag)
-
- def show_preview(self, *_):
- if cards := self.note.cards():
- previewer = DecentPreviewer(ReadyCardsAdapter(cards))
- previewer.open()
- return previewer
- else:
- tooltip("No cards found", parent=self)
- return None
-
- def show_previous(self, *_):
- if history.has_note_to_left_of(self.note):
- self.show_note(history.get_note_to_left_of(self.note))
-
- def show_next(self, *_):
- if history.has_note_to_right_of(self.note):
- self.show_note(history.get_note_to_right_of(self.note))
-
- ################################################## button and hotkey setup
-
- def setup_editor_buttons(self):
- gui_hooks.editor_did_init.append(self.add_preview_button)
- gui_hooks.editor_did_init_buttons.append(self.add_right_hand_side_buttons)
-
- # on Anki 2.1.50, browser mode makes the Preview button visible
- extra_kwargs = {} if anki_version < (2, 1, 50) else {
- "editor_mode": aqt.editor.EditorMode.BROWSER
- }
-
- self.editor = aqt.editor.Editor(aqt.mw, self.form.fieldsArea, self,
- **extra_kwargs)
-
- gui_hooks.editor_did_init_buttons.remove(self.add_right_hand_side_buttons)
- gui_hooks.editor_did_init.remove(self.add_preview_button)
-
- # * on Anki < 2.1.50, make the button via js (`setupEditor` of browser.py);
- # also, make a copy of _links so that opening Anki's browser does not
- # screw them up as they are apparently shared between instances?!
- # the last part seems to have been fixed in Anki 2.1.50
- # * on Anki 2.1.50, the button is created by setting editor mode,
- # see above; so we only need to add the link.
- def add_preview_button(self, editor):
- QShortcut(QKeySequence("Ctrl+Shift+P"), self, self.show_preview)
-
- if anki_version < (2, 1, 50):
- editor._links = editor._links.copy()
- editor.web.eval("""
- $editorToolbar.then(({notetypeButtons}) =>
- notetypeButtons.appendButton(
- {component: editorToolbar.PreviewButton, id: 'preview'}
- )
- );
- """)
-
- editor._links["preview"] = lambda _editor: self.show_preview() and None
-
- # * on Anki < 2.1.50, button style is okay-ish from get-go,
- # except when disabled; adding class `btn` fixes that;
- # * on Anki 2.1.50, buttons have weird font size and are square';
- # the style below makes them in line with left-hand side buttons
- def add_right_hand_side_buttons(self, buttons, editor):
- if anki_version < (2, 1, 50):
- extra_button_class = "btn"
- else:
- extra_button_class = "anki-connect-button"
- editor.web.eval("""
- (function(){
- const style = document.createElement("style");
- style.innerHTML = `
- .anki-connect-button {
- white-space: nowrap;
- width: auto;
- padding: 0 2px;
- font-size: var(--base-font-size);
- }
- .anki-connect-button:disabled {
- pointer-events: none;
- opacity: .4;
- }
- `;
- document.head.appendChild(style);
- })();
- """)
-
- def add(cmd, function, label, tip, keys):
- button_html = editor.addButton(
- icon=None,
- cmd=DOMAIN_PREFIX + cmd,
- id=DOMAIN_PREFIX + cmd,
- func=function,
- label=f"&nbsp;&nbsp;{label}&nbsp;&nbsp;",
- tip=f"{tip} ({keys})",
- keys=keys,
- )
-
- button_html = button_html.replace('class="',
- f'class="{extra_button_class} ')
- buttons.append(button_html)
-
- add("browse", self.show_browser, "Browse", "Browse", "Ctrl+F")
- add("previous", self.show_previous, "&lt;", "Previous", "Alt+Left")
- add("next", self.show_next, "&gt;", "Next", "Alt+Right")
-
- def run_javascript_after_toolbar_ready(self, js):
- js = f"setTimeout(function() {{ {js} }}, 1)"
- if anki_version < (2, 1, 50):
- js = f'$editorToolbar.then(({{ toolbar }}) => {js})'
- else:
- js = f'require("anki/ui").loaded.then(() => {js})'
- self.editor.web.eval(js)
-
- def enable_disable_next_and_previous_buttons(self):
- def to_js(boolean):
- return "true" if boolean else "false"
-
- disable_previous = not(history.has_note_to_left_of(self.note))
- disable_next = not(history.has_note_to_right_of(self.note))
-
- self.run_javascript_after_toolbar_ready(f"""
- document.getElementById("{DOMAIN_PREFIX}previous")
- .disabled = {to_js(disable_previous)};
- document.getElementById("{DOMAIN_PREFIX}next")
- .disabled = {to_js(disable_next)};
- """)
-
- ##########################################################################
-
- @classmethod
- def browser_will_search(cls, search_context):
- if search_context.search == cls.dialog_search_tag:
- trigger_search_for_dialog_history_notes(
- search_context=search_context,
- use_history_order=cls.dialog_search_tag ==
- search_context.browser.table._state.sort_column # noqa
- )
-
- @classmethod
- def register_with_anki(cls):
- if cls.dialog_registry_tag not in aqt.dialogs._dialogs: # noqa
- aqt.dialogs.register_dialog(cls.dialog_registry_tag, cls)
- gui_hooks.browser_will_search.append(cls.browser_will_search)
-
- @classmethod
- def open_dialog_and_show_note_with_id(cls, note_id): # raises NotFoundError
- note = get_note_by_note_id(note_id)
- return aqt.dialogs.open(cls.dialog_registry_tag, note)
diff --git a/.local/share/Anki2/addons21/Anki_connect/meta.json b/.local/share/Anki2/addons21/Anki_connect/meta.json
deleted file mode 100644
index c27a474..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/meta.json
+++ /dev/null
@@ -1 +0,0 @@
-{"name": "AnkiConnect", "mod": 1657829726, "min_point_version": 45, "max_point_version": 45, "branch_index": 1, "disabled": false, "conflicts": [], "update_enabled": true} \ No newline at end of file
diff --git a/.local/share/Anki2/addons21/Anki_connect/util.py b/.local/share/Anki2/addons21/Anki_connect/util.py
deleted file mode 100644
index 3ae5eb1..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/util.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Copyright 2016-2021 Alex Yatskov
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import sys
-
-import anki
-import anki.sync
-import aqt
-import enum
-
-#
-# Utilities
-#
-
-class MediaType(enum.Enum):
- Audio = 1
- Video = 2
- Picture = 3
-
-
-def download(url):
- client = anki.sync.AnkiRequestsClient()
- client.timeout = setting('webTimeout') / 1000
-
- resp = client.get(url)
- if resp.status_code != 200:
- raise Exception('{} download failed with return code {}'.format(url, resp.status_code))
-
- return client.streamContent(resp)
-
-
-def api(*versions):
- def decorator(func):
- setattr(func, 'versions', versions)
- setattr(func, 'api', True)
- return func
-
- return decorator
-
-
-def cardQuestion(card):
- if getattr(card, 'question', None) is None:
- return card._getQA()['q']
-
- return card.question()
-
-
-def cardAnswer(card):
- if getattr(card, 'answer', None) is None:
- return card._getQA()['a']
-
- return card.answer()
-
-
-DEFAULT_CONFIG = {
- 'apiKey': None,
- 'apiLogPath': None,
- 'apiPollInterval': 25,
- 'apiVersion': 6,
- 'webBacklog': 5,
- 'webBindAddress': os.getenv('ANKICONNECT_BIND_ADDRESS', '127.0.0.1'),
- 'webBindPort': 8765,
- 'webCorsOrigin': os.getenv('ANKICONNECT_CORS_ORIGIN', None),
- 'webCorsOriginList': ['http://localhost'],
- 'ignoreOriginList': [],
- 'webTimeout': 10000,
-}
-
-def setting(key):
- try:
- return aqt.mw.addonManager.getConfig(__name__).get(key, DEFAULT_CONFIG[key])
- except:
- raise Exception('setting {} not found'.format(key))
-
-
-# see https://github.com/FooSoft/anki-connect/issues/308
-# fixed in https://github.com/ankitects/anki/commit/0b2a226d
-def patch_anki_2_1_50_having_null_stdout_on_windows():
- if sys.stdout is None:
- sys.stdout = open(os.devnull, "w", encoding="utf8")
diff --git a/.local/share/Anki2/addons21/Anki_connect/web.py b/.local/share/Anki2/addons21/Anki_connect/web.py
deleted file mode 100644
index 1f325b4..0000000
--- a/.local/share/Anki2/addons21/Anki_connect/web.py
+++ /dev/null
@@ -1,301 +0,0 @@
-# Copyright 2016-2021 Alex Yatskov
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import json
-import jsonschema
-import select
-import socket
-
-from . import util
-
-#
-# WebRequest
-#
-
-class WebRequest:
- def __init__(self, method, headers, body):
- self.method = method
- self.headers = headers
- self.body = body
-
-
-#
-# WebClient
-#
-
-class WebClient:
- def __init__(self, sock, handler):
- self.sock = sock
- self.handler = handler
- self.readBuff = bytes()
- self.writeBuff = bytes()
-
-
- def advance(self, recvSize=1024):
- if self.sock is None:
- return False
-
- rlist, wlist = select.select([self.sock], [self.sock], [], 0)[:2]
- self.sock.settimeout(5.0)
-
- if rlist:
- while True:
- try:
- msg = self.sock.recv(recvSize)
- except (ConnectionResetError, socket.timeout):
- self.close()
- return False
- if not msg:
- self.close()
- return False
- self.readBuff += msg
-
- req, length = self.parseRequest(self.readBuff)
- if req is not None:
- self.readBuff = self.readBuff[length:]
- self.writeBuff += self.handler(req)
- break
-
-
-
- if wlist and self.writeBuff:
- try:
- length = self.sock.send(self.writeBuff)
- self.writeBuff = self.writeBuff[length:]
- if not self.writeBuff:
- self.close()
- return False
- except:
- self.close()
- return False
- return True
-
-
- def close(self):
- if self.sock is not None:
- self.sock.close()
- self.sock = None
-
- self.readBuff = bytes()
- self.writeBuff = bytes()
-
-
- def parseRequest(self, data):
- parts = data.split('\r\n\r\n'.encode('utf-8'), 1)
- if len(parts) == 1:
- return None, 0
-
- lines = parts[0].split('\r\n'.encode('utf-8'))
- method = None
-
- if len(lines) > 0:
- request_line_parts = lines[0].split(' '.encode('utf-8'))
- method = request_line_parts[0].upper() if len(request_line_parts) > 0 else None
-
- headers = {}
- for line in lines[1:]:
- pair = line.split(': '.encode('utf-8'))
- headers[pair[0].lower()] = pair[1] if len(pair) > 1 else None
-
- headerLength = len(parts[0]) + 4
- bodyLength = int(headers.get('content-length'.encode('utf-8'), 0))
- totalLength = headerLength + bodyLength
-
- if totalLength > len(data):
- return None, 0
-
- body = data[headerLength : totalLength]
- return WebRequest(method, headers, body), totalLength
-
-#
-# WebServer
-#
-
-class WebServer:
- def __init__(self, handler):
- self.handler = handler
- self.clients = []
- self.sock = None
-
-
- def advance(self):
- if self.sock is not None:
- self.acceptClients()
- self.advanceClients()
-
-
- def acceptClients(self):
- rlist = select.select([self.sock], [], [], 0)[0]
- if not rlist:
- return
-
- clientSock = self.sock.accept()[0]
- if clientSock is not None:
- clientSock.setblocking(False)
- self.clients.append(WebClient(clientSock, self.handlerWrapper))
-
-
- def advanceClients(self):
- self.clients = list(filter(lambda c: c.advance(), self.clients))
-
-
- def listen(self):
- self.close()
-
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.sock.setblocking(False)
- self.sock.bind((util.setting('webBindAddress'), util.setting('webBindPort')))
- self.sock.listen(util.setting('webBacklog'))
-
-
- def handlerWrapper(self, req):
- allowed, corsOrigin = self.allowOrigin(req)
-
- if req.method == b'OPTIONS':
- body = ''.encode('utf-8')
- headers = self.buildHeaders(corsOrigin, body)
-
- if b'access-control-request-private-network' in req.headers and (
- req.headers[b'access-control-request-private-network'] == b'true'):
- # include this header so that if a public origin is included in the whitelist,
- # then browsers won't fail requests due to the private network access check
- headers.append(['Access-Control-Allow-Private-Network', 'true'])
-
- return self.buildResponse(headers, body)
-
- try:
- params = json.loads(req.body.decode('utf-8'))
- jsonschema.validate(params, request_schema)
- except (ValueError, jsonschema.ValidationError) as e:
- if allowed:
- if len(req.body) == 0:
- body = f"AnkiConnect v.{util.setting('apiVersion')}".encode()
- else:
- reply = format_exception_reply(util.setting('apiVersion'), e)
- body = json.dumps(reply).encode('utf-8')
- headers = self.buildHeaders(corsOrigin, body)
- return self.buildResponse(headers, body)
- else:
- params = {} # trigger the 403 response below
-
- if allowed or params.get('action', '') == 'requestPermission':
- if params.get('action', '') == 'requestPermission':
- params['params'] = params.get('params', {})
- params['params']['allowed'] = allowed
- params['params']['origin'] = b'origin' in req.headers and req.headers[b'origin'].decode() or ''
- if not allowed :
- corsOrigin = params['params']['origin']
-
- body = json.dumps(self.handler(params)).encode('utf-8')
- headers = self.buildHeaders(corsOrigin, body)
- else :
- headers = [
- ['HTTP/1.1 403 Forbidden', None],
- ['Access-Control-Allow-Origin', corsOrigin],
- ['Access-Control-Allow-Headers', '*']
- ]
- body = ''.encode('utf-8')
-
- return self.buildResponse(headers, body)
-
-
- def allowOrigin(self, req):
- # handle multiple cors origins by checking the 'origin'-header against the allowed origin list from the config
- webCorsOriginList = util.setting('webCorsOriginList')
-
- # keep support for deprecated 'webCorsOrigin' field, as long it is not removed
- webCorsOrigin = util.setting('webCorsOrigin')
- if webCorsOrigin:
- webCorsOriginList.append(webCorsOrigin)
-
- allowed = False
- corsOrigin = 'http://localhost'
- allowAllCors = '*' in webCorsOriginList # allow CORS for all domains
-
- if allowAllCors:
- corsOrigin = '*'
- allowed = True
- elif b'origin' in req.headers:
- originStr = req.headers[b'origin'].decode()
- if originStr in webCorsOriginList :
- corsOrigin = originStr
- allowed = True
- elif 'http://localhost' in webCorsOriginList and (
- originStr == 'http://127.0.0.1' or originStr == 'https://127.0.0.1' or # allow 127.0.0.1 if localhost allowed
- originStr.startswith('http://127.0.0.1:') or originStr.startswith('http://127.0.0.1:') or
- originStr.startswith('chrome-extension://') or originStr.startswith('moz-extension://') or originStr.startswith('safari-web-extension://') ) : # allow chrome, firefox and safari extension if localhost allowed
- corsOrigin = originStr
- allowed = True
- else:
- allowed = True
-
- return allowed, corsOrigin
-
-
- def buildHeaders(self, corsOrigin, body):
- return [
- ['HTTP/1.1 200 OK', None],
- ['Content-Type', 'text/json'],
- ['Access-Control-Allow-Origin', corsOrigin],
- ['Access-Control-Allow-Headers', '*'],
- ['Content-Length', str(len(body))]
- ]
-
-
- def buildResponse(self, headers, body):
- resp = bytes()
- for key, value in headers:
- if value is None:
- resp += '{}\r\n'.format(key).encode('utf-8')
- else:
- resp += '{}: {}\r\n'.format(key, value).encode('utf-8')
-
- resp += '\r\n'.encode('utf-8')
- resp += body
- return resp
-
-
- def close(self):
- if self.sock is not None:
- self.sock.close()
- self.sock = None
-
- for client in self.clients:
- client.close()
-
- self.clients = []
-
-
-def format_success_reply(api_version, result):
- if api_version <= 4:
- return result
- else:
- return {"result": result, "error": None}
-
-
-def format_exception_reply(_api_version, exception):
- return {"result": None, "error": str(exception)}
-
-
-request_schema = {
- "type": "object",
- "properties": {
- "action": {"type": "string", "minLength": 1},
- "version": {"type": "integer"},
- "params": {"type": "object"},
- },
- "required": ["action"],
-} \ No newline at end of file