import hashlib
import json
import os
import re
import time
import uuid
from typing import List, Union, Optional
from urllib.parse import urljoin
from zipfile import ZipFile
from requests import Session, get, Response
from requests.adapters import HTTPAdapter
from requests.cookies import cookiejar_from_dict
from urllib3.util.retry import Retry
from notion.block.basic import Block
from notion.block.collection.basic import (
CollectionBlock,
TemplateBlock,
CollectionRowBlock,
)
from notion.block.collection.view import CollectionView
from notion.block.types import get_block_type, get_collection_view_type
from notion.logger import logger
from notion.monitor import Monitor
from notion.operations import operation_update_last_edited, build_operations
from notion.settings import API_BASE_URL
from notion.space import NotionSpace
from notion.store import RecordStore
from notion.user import NotionUser
from notion.utils import extract_id, now, to_list
[docs]class NotionApiError(Exception):
def __init__(self, message: str, **extra):
dumped_data = json.dumps(extra, indent=2)
logger.error(f"Exception: {dumped_data}")
super().__init__(message)
[docs]class InvalidCollectionViewUrl(NotionApiError):
pass
[docs]class NotionValidationError(NotionApiError):
pass
[docs]class NotionUnauthorizedError(NotionApiError):
pass
[docs]class Transaction:
"""
Transaction object.
"""
_is_nested = False
[docs] def __init__(self, client):
"""
Create Transaction object.
Arguments
---------
client : NotionClient
Client object to use for transaction.
"""
self.client = client
def __enter__(self):
if hasattr(self.client, "_transaction_operations"):
# client is already in a transaction, so we'll just
# make this one a no-op and let the outer one handle it
self._is_nested = True
return
self.client._transaction_operations = []
self.client._pages_to_refresh = []
self.client._blocks_to_refresh = []
def __exit__(self, exc_type, exc_value, traceback):
if self._is_nested:
return
operations = getattr(self.client, "_transaction_operations")
delattr(self.client, "_transaction_operations")
if not exc_type:
# submit the transaction if there was no exception
self.client.submit_transaction(operations=operations)
self.client._store.handle_post_transaction_refreshing()
[docs]class NotionClient:
"""
This is the entry point to using the API.
Create an instance of this class, passing it
the value of the "token_v2" cookie from a logged-in
browser session on Notion.so.
"""
[docs] def __init__(
self,
token_v2: str = "",
enable_monitoring: bool = False,
start_monitoring: bool = False,
enable_caching: bool = False,
cache_key: str = "",
):
"""
Create NotionClient object and fill its fields.
Arguments
---------
token_v2 : str, optional
The cookie from logged-in browser session on notion.so.
If not provided then all operations will be ran as if user
was not logged in.
Defaults to empty string.
enable_monitoring : bool, optional
Whether or not to monitor the records managed by NotionClient.
Defaults to False.
start_monitoring : bool, optional
Whether or not to start monitoring immediately upon logging in.
This option takes effect only when `monitor` is True.
Defaults to False.
enable_caching : bool, optional
Whether or not to enable caching of fetched data to file.
Defaults to False.
cache_key : str, optional
The key string used for storing all cached data in file.
This option takes effect only when `enable_caching` is True.
Defaults to SHA256 of token_v2.
"""
self.session = self._create_session(token_v2)
# noinspection InsecureHash
cache_key = cache_key or hashlib.sha256(token_v2.encode()).hexdigest()
cache_key = cache_key if enable_caching else None
self._store = RecordStore(self, cache_key=cache_key)
self._monitor = None
if enable_monitoring:
self._monitor = Monitor(self)
if start_monitoring:
self.start_monitoring()
if token_v2:
self._update_user_info()
@staticmethod
def _create_session(token_v2: str = "") -> Session:
"""
Helper method for creating a session object for API requests.
Arguments
---------
token_v2 : str, optional
Token to use for creating User session.
Defaults to empty string.
Returns
-------
Session
initialised Session object.
"""
retry = Retry(
total=5,
backoff_factor=0.3,
status_forcelist=(502, 503),
method_whitelist=(
"POST",
"HEAD",
"TRACE",
"GET",
"PUT",
"OPTIONS",
"DELETE",
),
)
session = Session()
session.mount("https://", HTTPAdapter(max_retries=retry))
session.cookies = cookiejar_from_dict({"token_v2": token_v2})
return session
@staticmethod
def _download_url(url: str, save_path: str, chunk_size: int = 128):
"""
Download the zip file and save it to a file.
Arguments
---------
url : str
URL from which to download.
save_path : str
File name to output the zip file into.
chunk_size : int, optional
Size of the downloaded chunk.
If set to 0 then the data will be read as it arrives
in whatever the size the chunks are received.
Defaults to 128.
https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content
"""
r = get(url, stream=True)
with open(save_path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size or None):
fd.write(chunk)
@staticmethod
def _unzip_file(file_name: str, delete: bool = True):
"""
Helper method to unzip the zipped file.
Arguments
---------
file_name : str
File name of the ZIP to unpack.
delete : bool, optional
Whether or not to remove the file after unpacking.
Defaults to True.
"""
with ZipFile(file_name) as f:
f.extractall()
if delete:
os.remove(file_name)
@staticmethod
def _maybe_prefix_url(endpoint: str) -> str:
if endpoint.startswith("http"):
return endpoint
return urljoin(API_BASE_URL, endpoint)
def _update_user_info(self) -> dict:
"""
Reload information about a Notion User.
Returns
-------
dict
User data.
"""
data = self.post("loadUserContent").json()
data = self._store.store_record_map(data)
first_user = list(data["notion_user"].keys())[0]
first_space = list(data["space"].keys())[0]
self.current_user = self.get_user(first_user)
self.current_space = self.get_space(first_space)
return data
[docs] def get_top_level_pages(self) -> list:
"""
Get list of top level pages defined in Notion Workspace.
Returns
-------
list of Block
Top level pages.
"""
blocks = self._update_user_info()["block"].keys()
return [self.get_block(bid) for bid in blocks]
[docs] def get_record_data(
self, table: str, url_or_id: str, force_refresh: bool = False
) -> dict:
"""
Get record data.
Arguments
---------
table : str
A "block type" in notion.so terminology.
url_or_id : str
Path or ID to block.
force_refresh : bool, optional
Whether or not to force a refresh of data.
Defaults to False.
Returns
-------
dict
Record data.
"""
return self._store.get(
table=table, url_or_id=url_or_id, force_refresh=force_refresh
)
[docs] def get_block(self, url_or_id: str, force_refresh: bool = False) -> Optional[Block]:
"""
Retrieve an instance of a subclass of Block that maps to
the block/page identified by the URL or ID passed in.
Arguments
---------
url_or_id : str
Path or ID to block.
force_refresh : bool, optional
Whether or not to force a refresh of data.
Defaults to False.
Returns
-------
Block or None
Found block or None.
"""
block_id = extract_id(url_or_id)
block = self.get_record_data("block", block_id, force_refresh)
if not block:
return None
if block.get("parent_table") == "collection":
if block.get("is_template"):
klass = TemplateBlock
else:
klass = CollectionRowBlock
else:
klass = get_block_type(block.get("type"))
return klass(client=self, block_id=block_id)
[docs] def get_collection(
self, collection_id: str, force_refresh: bool = False
) -> Optional[CollectionBlock]:
"""
Retrieve an instance of Collection that maps to
the collection identified by the ID passed in.
Arguments
---------
collection_id : str
ID of searched collection.
force_refresh : bool, optional
Whether or not to force a refresh of data.
Defaults to False.
Returns
-------
CollectionBlock
Found collection or None.
"""
record_data = self.get_record_data(
"collection", collection_id, force_refresh=force_refresh
)
if record_data:
return CollectionBlock(self, collection_id)
[docs] def get_collection_view(
self,
url_or_id: str,
collection: CollectionBlock = None,
force_refresh: bool = False,
) -> Optional[CollectionView]:
"""
Retrieve an instance of a subclass of CollectionView
that maps to the appropriate type.
The `url_or_id` argument can either be the URL
for a database page, or the ID of a collection_view
(in which case you must pass the collection)
Arguments
---------
url_or_id : str
ID of searched collection view.
collection : Collection
object representing ID of searched collection view.
force_refresh : bool, optional
Whether or not to force a refresh of data.
Defaults to False.
Raises
------
InvalidCollectionViewUrl
When passed in URL is invalid.
Returns
-------
CollectionView
Found collectionView or None.
"""
if url_or_id.startswith("http"):
# if it's a URL for a database page,
# try extracting the collection and view IDs
match = re.search(r"([a-f0-9]{32})\?v=([a-f0-9]{32})", url_or_id)
if not match:
raise InvalidCollectionViewUrl(
f"Could not find valid ID in URL '{url_or_id}'"
)
collection_id, view_id = match.groups()
collection_id = self.get_record_data(
table="block",
url_or_id=collection_id,
force_refresh=force_refresh,
)["collection_id"]
collection = self.get_collection(collection_id, force_refresh)
else:
view_id = url_or_id
if collection is None:
raise ValueError(
"If 'url_or_id' is an ID (not a URL), "
"you must also pass the 'collection'"
)
view = self.get_record_data(
table="collection_view",
url_or_id=view_id,
force_refresh=force_refresh,
)
if view:
klass = get_collection_view_type(view.get("type", ""))
return klass(self, view_id, collection=collection)
[docs] def get_user(
self, user_id: str, force_refresh: bool = False
) -> Optional[NotionUser]:
"""
Retrieve an instance of User that maps to
the notion_user identified by the ID passed in.
Arguments
---------
user_id : str
ID of searched user.
force_refresh : bool, optional
Whether or not to force a refresh of data.
Defaults to False.
Returns
-------
NotionUser
Found user or None.
"""
user = self.get_record_data("notion_user", user_id, force_refresh)
if user:
return NotionUser(self, user_id)
[docs] def get_space(
self, space_id: str, force_refresh: bool = False
) -> Optional[NotionSpace]:
"""
Retrieve an instance of Space that maps to
the space identified by the ID passed in.
Arguments
---------
space_id : str
ID of searched user.
force_refresh : bool, optional
Whether or not to force a refresh of data.
Defaults to False.
Returns
-------
NotionSpace
Found space or None.
"""
space = self.get_record_data("space", space_id, force_refresh)
if space:
return NotionSpace(self, space_id)
[docs] def start_monitoring(self):
"""
Start monitoring the tracked blocks.
This function will create new Thread.
"""
self._monitor.poll_async()
[docs] def refresh_records(self, **kwargs):
"""
The keyword arguments map table names into
lists of (or singular) record IDs to load for that table.
Use `True` instead of a list to refresh
all known records for that table.
"""
self._store.call_get_record_values(**kwargs)
[docs] def refresh_collection_rows(self, collection_id: str):
"""
Refresh collection rows.
Arguments
---------
collection_id : str
ID of the collection.
"""
collection = self.get_collection(collection_id)
row_ids = [row.id for row in collection.get_rows()]
self._store.set_collection_rows(collection_id, row_ids)
[docs] def download_block(
self,
block_id: str,
recursive: bool = False,
export_type: str = "markdown",
time_zone: str = "America/Chicago",
locale: str = "en",
):
"""
Download block.
TODO: Add support for downloading a list of blocks.
Arguments
---------
block_id : str
ID of the block.
recursive : bool, optional
Whether or not to include sub pages.
Defaults to False.
export_type : str
Type of the output file.
The options are "markdown", "pdf", "html".
Defaults to "markdown".
time_zone : str, optional
I don't know what values go here. I'm in the Chicago
timezone (central) and this is what I saw in the request.
Defaults to "America/Chicago".
TODO: test? hard code?
locale : str, optional
Locale for the export.
Defaults to "en".
"""
data = {
"task": {
"eventName": "exportBlock",
"request": {
"blockId": block_id,
"recursive": recursive,
"exportOptions": {
"exportType": export_type,
"timeZone": time_zone,
"locale": locale,
},
},
}
}
if export_type in ["pdf", "html"]:
data["task"]["request"]["exportOptions"]["pdfFormat"] = "Letter"
def fetch():
time.sleep(0.1)
return self.post("getTasks", {"taskIds": task_ids}).json()
task_ids = [self.post("enqueueTask", data).json()["taskId"]]
task = fetch()
# Ensure that we're getting the data when it's ready.
while "status" not in task["results"][0]:
task = fetch()
while "exportURL" not in task["results"][0]["status"]:
task = fetch()
url = task["results"][0]["status"]["exportURL"]
if export_type == "pdf":
self._download_url(url, f"{block_id}.pdf")
else:
tmp_zip = f"{block_id}.zip"
self._download_url(url, tmp_zip)
self._unzip_file(tmp_zip)
[docs] def get(self, endpoint: str) -> Response:
"""
Send HTTP GET request to given endpoint or URL.
Arguments
---------
endpoint : str
Notion's endpoint to aim at.
Returns
-------
Response
Whatever API sent back.
"""
url = self._maybe_prefix_url(endpoint)
return self.session.get(url=url)
[docs] def put(self, endpoint: str, data: dict = None, **kwargs) -> Response:
"""
Send HTTP PUT request to given endpoint or URL.
Arguments
---------
endpoint : str
Notion's endpoint to aim at.
data : dict
Data to send.
Defaults to None.
kwargs : dict
Additional params for put().
Defaults to empty dict.
Returns
-------
Response
Whatever API sent back.
"""
url = self._maybe_prefix_url(endpoint)
return self.session.put(url=url, data=data, **kwargs)
[docs] def post(self, endpoint: str, data: dict = None, **kwargs) -> Response:
"""
Send HTTP POST request to given endpoint or URL.
All API requests on Notion.so are done as POSTs,
except the websocket communications.
Arguments
---------
endpoint : str
Notion's endpoint to aim at.
data : dict
Data to send.
Defaults to empty dict.
kwargs : dict
Additional params to post().
Defaults to empty dict.
Raises
------
NotionValidationError
When POST fails with HTTP 400.
NotionUnauthorizedError
When POST fails with HTTP 401.
NotionApiError
When POST fails in a different way.
Returns
-------
Response
Whatever API sent back.
"""
url = self._maybe_prefix_url(endpoint)
resp = self.session.post(url, json=data or {}, **kwargs)
code = resp.status_code
res_data = resp.json()
if code < 400:
return resp
msg = res_data.get("message") or "<message was not provided>"
if code == 400:
raise NotionValidationError(msg, extra=res_data)
if code == 401:
raise NotionUnauthorizedError(msg, extra=res_data)
raise NotionApiError(msg, extra=res_data)
[docs] def submit_transaction(
self, operations: Union[list, dict], update_last_edited: bool = True
):
"""
Submit list of operations in atomic transaction block.
Arguments
---------
operations : list or dict
List of operations to submit.
update_last_edited : bool, optional
Whether or not to automatically update last edited records.
Defaults to True.
"""
if not operations:
return
operations = to_list(operations)
if update_last_edited:
updated_blocks = set(
[op["id"] for op in operations if op["table"] == "block"]
)
operations += [
operation_update_last_edited(self.current_user.id, block_id)
for block_id in updated_blocks
]
if self.in_transaction():
# TODO: fix that stuff, shouldn't look like that
ops = getattr(self, "_transaction_operations") + operations
setattr(self, "_transaction_operations", ops)
else:
self.post("submitTransaction", data={"operations": operations})
for operation in operations:
operation["record_id"] = operation.pop("id")
self._store.run_local_operation(**operation)
[docs] def build_and_submit_transaction(self, *args, **kwargs):
self.submit_transaction(build_operations(*args, **kwargs))
[docs] def as_atomic_transaction(self) -> Transaction:
"""
Returns a context manager that buffers up all calls
to `submit_transaction` and sends them as one
big transaction when the context manager exits.
Returns
-------
Transaction
Initialised transaction object.
"""
return Transaction(client=self)
[docs] def in_transaction(self) -> bool:
"""
Returns True if we're currently in a transaction, otherwise False.
"""
return hasattr(self, "_transaction_operations")
[docs] def search_pages_with_parent(
self, parent_id: str, search: str = "", limit: int = 10000
) -> list:
"""
Search for pages with parent.
Arguments
---------
parent_id : str
ID of parent block.
search : str, optional
Text to search by.
Defaults to empty string.
limit : int, optional
Max number of pages to return.
Defaults to 10_000.
Returns
-------
list
List of results.
"""
data = {
"query": search,
"parentId": parent_id,
"spaceId": self.current_space.id,
"limit": limit,
}
data = self.post("searchPagesWithParent", data).json()
self._store.store_record_map(data)
return data["results"]
[docs] def search_blocks(self, search: str, limit: int = 20) -> List[Block]:
"""
Search for blocks.
Arguments
---------
search : str
Text to search by.
limit : int, optional
Max number of blocks to return.
Defaults to 20.
Returns
-------
list
List of blocks.
"""
# TODO: convert `filters` to some kind of built-in type
# and make it passable / configurable
data = {
"type": "BlocksInSpace",
"query": search,
"limit": limit,
"sort": "Relevance",
"source": "quick_find",
"spaceId": self.current_space.id,
"filters": {
"isDeletedOnly": False,
"excludeTemplates": False,
"isNavigableOnly": False,
"requireEditPermissions": False,
"ancestors": [],
"createdBy": [],
"editedBy": [],
"lastEditedTime": {},
"createdTime": {},
},
}
data = self.post("search", data).json()
self._store.store_record_map(data)
return [self.get_block(bid) for bid in data["results"]]
[docs] def create_record(self, table: str, parent: Block, **kwargs) -> str:
"""
Create new record.
Arguments
---------
table : str
Table value.
parent : Block
Parent for the newly created record.
Returns
-------
str
ID of newly created record.
"""
# make up a new UUID; apparently we get to choose our own!
record_id = str(uuid.uuid4())
child_list_key = kwargs.get("child_list_key") or parent._child_list_key
args = {
"id": record_id,
"version": 1,
"alive": True,
"created_by": self.current_user.id,
"created_time": now(),
"parent_id": parent.id,
"parent_table": parent._table,
**kwargs,
}
with self.as_atomic_transaction():
self.build_and_submit_transaction(
record_id=record_id, path="", args=args, command="set", table=table
)
# add the record to the content list of the parent, if needed
if child_list_key:
self.build_and_submit_transaction(
record_id=parent.id,
path=child_list_key,
args={"id": record_id},
command="listAfter",
table=parent._table,
)
return record_id