|
| 1 | +# SPDX-FileCopyrightText: 2024 Sebastian Andersson <sebastian@bittr.nu> |
| 2 | +# SPDX-License-Identifier: GPL-3.0-or-later |
| 3 | + |
| 4 | +""" NFC tag handling """ |
| 5 | + |
| 6 | +import time |
| 7 | +from threading import Lock, Event |
| 8 | + |
| 9 | +import ndef |
| 10 | +import nfc |
| 11 | + |
| 12 | + |
| 13 | +SPOOL = "SPOOL" |
| 14 | +FILAMENT = "FILAMENT" |
| 15 | +NDEF_TEXT_TYPE = "urn:nfc:wkt:T" |
| 16 | + |
| 17 | + |
| 18 | +# pylint: disable=R0902 |
| 19 | +class NfcHandler: |
| 20 | + """NFC Tag handling""" |
| 21 | + |
| 22 | + def __init__(self, nfc_device: str): |
| 23 | + self.status = "" |
| 24 | + self.nfc_device = nfc_device |
| 25 | + self.on_nfc_no_tag_present = None |
| 26 | + self.on_nfc_tag_present = None |
| 27 | + self.should_stop_event = Event() |
| 28 | + self.write_lock = Lock() |
| 29 | + self.write_event = Event() |
| 30 | + self.write_spool = None |
| 31 | + self.write_filament = None |
| 32 | + |
| 33 | + def set_no_tag_present_callback(self, on_nfc_no_tag_present): |
| 34 | + """Sets a callback that will be called when no tag is present""" |
| 35 | + self.on_nfc_no_tag_present = on_nfc_no_tag_present |
| 36 | + |
| 37 | + def set_tag_present_callback(self, on_nfc_tag_present): |
| 38 | + """Sets a callback that will be called when a tag has been read""" |
| 39 | + self.on_nfc_tag_present = on_nfc_tag_present |
| 40 | + |
| 41 | + @classmethod |
| 42 | + def get_data_from_ndef_records(cls, records: ndef.TextRecord): |
| 43 | + """Find wanted data from the NDEF records. |
| 44 | +
|
| 45 | + >>> import ndef |
| 46 | + >>> record0 = ndef.TextRecord("") |
| 47 | + >>> record1 = ndef.TextRecord("SPOOL:23\\n") |
| 48 | + >>> record2 = ndef.TextRecord("FILAMENT:14\\n") |
| 49 | + >>> record3 = ndef.TextRecord("SPOOL:23\\nFILAMENT:14\\n") |
| 50 | + >>> NfcHandler.get_data_from_ndef_records([record0]) |
| 51 | + (None, None) |
| 52 | + >>> NfcHandler.get_data_from_ndef_records([record3]) |
| 53 | + ('23', '14') |
| 54 | + >>> NfcHandler.get_data_from_ndef_records([record1]) |
| 55 | + ('23', None) |
| 56 | + >>> NfcHandler.get_data_from_ndef_records([record2]) |
| 57 | + (None, '14') |
| 58 | + >>> NfcHandler.get_data_from_ndef_records([record0, record3]) |
| 59 | + ('23', '14') |
| 60 | + >>> NfcHandler.get_data_from_ndef_records([record3, record0]) |
| 61 | + ('23', '14') |
| 62 | + >>> NfcHandler.get_data_from_ndef_records([record1, record2]) |
| 63 | + ('23', '14') |
| 64 | + >>> NfcHandler.get_data_from_ndef_records([record2, record1]) |
| 65 | + ('23', '14') |
| 66 | + """ |
| 67 | + |
| 68 | + spool = None |
| 69 | + filament = None |
| 70 | + |
| 71 | + for record in records: |
| 72 | + if record.type == NDEF_TEXT_TYPE: |
| 73 | + for line in record.text.splitlines(): |
| 74 | + line = line.split(":") |
| 75 | + if len(line) == 2: |
| 76 | + if line[0] == SPOOL: |
| 77 | + spool = line[1] |
| 78 | + if line[0] == FILAMENT: |
| 79 | + filament = line[1] |
| 80 | + else: |
| 81 | + print(f"Read other record: {record}", flush=True) |
| 82 | + |
| 83 | + return spool, filament |
| 84 | + |
| 85 | + def write_to_tag(self, spool: int, filament: int) -> bool: |
| 86 | + """Writes spool & filament info to tag. Returns true if worked.""" |
| 87 | + |
| 88 | + self._set_write_info(spool, filament) |
| 89 | + |
| 90 | + if self.write_event.wait(timeout=30): |
| 91 | + return True |
| 92 | + |
| 93 | + self._set_write_info(None, None) |
| 94 | + |
| 95 | + return False |
| 96 | + |
| 97 | + def run(self): |
| 98 | + """Run the NFC handler, won't return""" |
| 99 | + # Open NFC reader. Will throw an exception if it fails. |
| 100 | + with nfc.ContactlessFrontend(self.nfc_device) as clf: |
| 101 | + while not self.should_stop_event.is_set(): |
| 102 | + tag = clf.connect(rdwr={"on-connect": lambda tag: False}) |
| 103 | + if tag: |
| 104 | + self._check_for_write_to_tag(tag) |
| 105 | + if tag.ndef is None: |
| 106 | + if self.on_nfc_no_tag_present: |
| 107 | + self.on_nfc_no_tag_present() |
| 108 | + else: |
| 109 | + self._read_from_tag(tag) |
| 110 | + while clf.connect(rdwr={"on-connect": lambda tag: False}): |
| 111 | + if self._check_for_write_to_tag(tag): |
| 112 | + self._read_from_tag(tag) |
| 113 | + time.sleep(0.1) |
| 114 | + else: |
| 115 | + time.sleep(0.1) |
| 116 | + |
| 117 | + def stop(self): |
| 118 | + """Call to stop the handler""" |
| 119 | + self.should_stop_event.set() |
| 120 | + |
| 121 | + def _write_to_nfc_tag(self, tag, spool: int, filament: int) -> bool: |
| 122 | + """Write given spool/filament ids to the tag""" |
| 123 | + try: |
| 124 | + if tag.ndef and tag.ndef.is_writeable: |
| 125 | + tag.ndef.records = [ |
| 126 | + ndef.TextRecord(f"{SPOOL}:{spool}\n{FILAMENT}:{filament}\n") |
| 127 | + ] |
| 128 | + return True |
| 129 | + self.status = "Tag is write protected" |
| 130 | + except Exception as ex: # pylint: disable=W0718 |
| 131 | + print(ex) |
| 132 | + self.status = "Got error while writing" |
| 133 | + return False |
| 134 | + |
| 135 | + def _set_write_info(self, spool, filament): |
| 136 | + if self.write_lock.acquire(): # pylint: disable=R1732 |
| 137 | + self.write_spool = spool |
| 138 | + self.write_filament = filament |
| 139 | + self.write_event.clear() |
| 140 | + self.write_lock.release() |
| 141 | + |
| 142 | + def _check_for_write_to_tag(self, tag) -> bool: |
| 143 | + """Check if the tag should be written to and do it""" |
| 144 | + did_write = False |
| 145 | + if self.write_lock.acquire(): # pylint: disable=R1732 |
| 146 | + if self.write_spool: |
| 147 | + if self._write_to_nfc_tag(tag, self.write_spool, self.write_filament): |
| 148 | + self.write_event.set() |
| 149 | + did_write = True |
| 150 | + self.write_spool = None |
| 151 | + self.write_filament = None |
| 152 | + self.write_lock.release() |
| 153 | + return did_write |
| 154 | + |
| 155 | + def _read_from_tag(self, tag): |
| 156 | + """Read data from tag and call callback""" |
| 157 | + if self.on_nfc_tag_present: |
| 158 | + spool, filament = NfcHandler.get_data_from_ndef_records(tag.ndef.records) |
| 159 | + self.on_nfc_tag_present(spool, filament) |
0 commit comments