From aeda06de5916f106c801fcdb665f6c2f769cdac4 Mon Sep 17 00:00:00 2001 From: redxef Date: Tue, 6 Dec 2022 23:42:15 +0100 Subject: [PATCH] Fix sync. --- radicale_sql/__init__.py | 86 +++++++++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 19 deletions(-) diff --git a/radicale_sql/__init__.py b/radicale_sql/__init__.py index b7fc7a4..86eb630 100644 --- a/radicale_sql/__init__.py +++ b/radicale_sql/__init__.py @@ -137,8 +137,10 @@ class Collection(BaseCollection): ) if connection.execute(select_stmt).one_or_none() is None: connection.execute(insert_stmt) + self._storage._collection_updated(self._id, connection=connection) else: connection.execute(update_stmt) + self._storage._item_updated(self._id, href, connection=connection) self._update_history_etag(href, item, connection=connection) res = list(self._get_multi([href], connection=connection))[0][1] assert res is not None @@ -166,6 +168,7 @@ class Collection(BaseCollection): item_table.c.name == href, ), ) + self._storage._item_updated(self._id, href, connection=connection) connection.execute(delete_stmt) def delete(self, href: Optional[str] = None) -> None: @@ -209,6 +212,7 @@ class Collection(BaseCollection): ).values([dict(collection_id=self._id, key=k, value=v) for k, v in props.items()]) connection.execute(delete_stmt) connection.execute(insert_stmt) + self._storage._collection_updated(self._id, connection=connection) def set_meta(self, props: Mapping[str, str]) -> None: with self._storage._engine.begin() as c: @@ -255,6 +259,7 @@ class Collection(BaseCollection): history_etag = binascii.hexlify(os.urandom(16)).decode('ascii') etag = item.etag if item else '' if etag != cache_etag: + history_etag = radicale_item.get_etag(history_etag + '/' + etag).strip('\"') if exists: upsert = sa.update( item_history_table, @@ -297,7 +302,7 @@ class Collection(BaseCollection): item_table.c.id == None, ) for row in connection.execute(select_stmt): - yield row.href + yield row.name def _delete_history_refs(self, *, connection): item_history_table = self._storage._meta.tables['item_history'] @@ -310,8 +315,7 @@ class Collection(BaseCollection): item_history_table.c.modified < (datetime.datetime.now() - datetime.timedelta(seconds=self._storage.configuration.get('storage', 'max_sync_token_age'))) ), ) - with self._storage._engine.begin() as connection: - connection.execute(delete_stmt) + connection.execute(delete_stmt) def _sync(self, *, connection, old_token: str = '') -> Tuple[str, Iterable[str]]: # Parts of this method have been taken from @@ -333,15 +337,21 @@ class Collection(BaseCollection): old_token_name = old_token[len(_prefix):] if not check_token_name(old_token_name): raise ValueError(f'Malformed token: {old_token}') - state = {} - token_name_hash = sha256() # compute new state + state = {} + token_name_hash = sha256() for href, item in itertools.chain( ((item.href, item) for item in self._get_all(connection=connection)), ((href, None) for href in self._get_deleted_history_refs(connection=connection)) ): assert isinstance(href, str) + if href in state: + # we don't want to overwrite states + # this could happen with another storage collection + # which doesn't store the items itself, but + # derives them from another one + continue history_etag = self._update_history_etag(href, item, connection=connection) state[href] = history_etag token_name_hash.update((href + '/' + history_etag).encode()) @@ -365,19 +375,27 @@ class Collection(BaseCollection): collection_state_table.c.name == old_token_name, ), ) - state_row = connection.execute(select_stmt).one() - state = json.loads(state_row.state.decode()) + state_row = connection.execute(select_stmt).one_or_none() + state = json.loads(state_row.state.decode()) if state_row is not None else {} # store new state - ## should never be a duplicate - insert_stmt = sa.insert( + select_new_state = sa.select( + collection_state_table.c, + ).select_from( collection_state_table, - ).values( - collection_id=self._id, - name=token_name, - state=json.dumps(state).encode(), + ).where( + collection_state_table.c.collection_id == self._id, + collection_state_table.c.name == token_name, ) - connection.execute(insert_stmt) + if connection.execute(select_new_state).one_or_none() is None: + insert_stmt = sa.insert( + collection_state_table, + ).values( + collection_id=self._id, + name=token_name, + state=json.dumps(state).encode(), + ) + connection.execute(insert_stmt) changes = [] for href, history_etag in state.items(): @@ -409,9 +427,6 @@ class BdayCollection(Collection): def __repr__(self) -> str: return f'BdayCollection(id={self._id}, path={self._path}, birthday_source={self._birthday_source})' - def _sync(self, *, connection, old_token: str = '') -> Tuple[str, Iterable[str]]: - return self._birthday_source_collection._sync(connection=connection, old_token=old_token) - def _to_calendar_entry(self, o: vobject.base.Component) -> Optional[vobject.base.Component]: def vobj_str2date(content_line): v = content_line.value @@ -440,9 +455,11 @@ class BdayCollection(Collection): if new_vobject is None: return None new_vobject.add('uid').value = item.uid + assert item.href is not None return radicale_item.Item( collection=self, href=item.href, + #href=item.href + '.ics', last_modified=item.last_modified, text=new_vobject.serialize().strip(), ) @@ -515,6 +532,30 @@ class Storage(BaseStorage): # TODO: path return create_collection(self, id, '', birthday_source=row.birthday_source) + def _collection_updated(self, collection_id, *, connection): + collection_table = self._meta.tables['collection'] + connection.execute(sa.update( + collection_table, + ).values( + modified=datetime.datetime.now(), + ).where( + collection_table.c.id == collection_id, + )) + + def _item_updated(self, collection_id: uuid.UUID, href: str, *, connection): + item_table = self._meta.tables['item'] + item_row = connection.execute(sa.update( + item_table, + ).values( + modified=datetime.datetime.now(), + ).where( + sa.and_( + item_table.c.collection_id == collection_id, + item_table.c.name == href, + ), + ).returning(item_table.c)).one() + self._collection_updated(item_row.collection_id, connection=connection) + def _discover(self, path: str, *, connection, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: if path == '/': return [create_collection(self, self._root_collection.id, '', birthday_source=None)] @@ -590,25 +631,30 @@ class Storage(BaseStorage): l = [] self_collection = connection.execute(select_stmt).one_or_none() + if self_collection is None: + # None found return [] if self_collection.type_ != 'collection': + # Item found return [radicale_item.Item( collection=self._get_collection(self_collection.parent_id, connection=connection), href=self_collection.name, last_modified=self_collection.modified, text=self_collection.data.decode(), )] + + # collection found self_collection = create_collection(self, self_collection.id, '/'.join(path_parts), birthday_source=self_collection.birthday_source) l += [self_collection] - if select_sub_stmt is not None: + # collection should list contents + if depth != "0": for row in connection.execute(select_sub_stmt): path = '/'.join(path_parts) path += '/' path += row.name l += [create_collection(self, row.id, path, birthday_source=row.birthday_source)] l += list(self_collection._get_all(connection=connection)) - print(';;;; discovered items') return l def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: @@ -643,6 +689,8 @@ class Storage(BaseStorage): ) connection.execute(delete_stmt) connection.execute(update_stmt) + self._collection_updated(src_collection_id, connection=connection) + self._collection_updated(dst_collection_id, connection=connection) to_collection._update_history_etag(to_href, item, connection=connection) assert item.href is not None item.collection._update_history_etag(item.href, None, connection=connection)