Create a Preservation

Learn how to create a Preservation using the API


💡 8 min read

Tip

This guide is written for Linux and macOS, depending on your Operating System you may need to adjust.

This guide is based on Python 3.7 and uses the hosted Onna SaaS platform.

# Requirements

Info

Make sure that the version of Python is the same as used with the Onna instance.

The hosted platforms (e.g https://$COMPANY.onna.io or https://enterprise.onna.com/$COMPANY) are using Python 3.7.4.

# Prerequisite

Creating preservations requires specific permissions.

Please contact support[at]onna.com for assistance with setting up the required permissions.

Info

To be able to create preservations in your Onna instance, you'll need the following credentials:

USERNAME: username
PASSWORD: password
ACCOUNT: the Onna account name
ACCOUNT_URL: the URL of your account, e.g https://$COMPANY.onna.io or https://enterprise.onna.com/$COMPANY

# Create a Preservation in Onna with the API

# Overview

Endpoints:

  • workspaces - creates a workspace
  • @smartactionCheck - ensures that your smart action query is scheduled to run
  • @frontsearch - Onna's search endpoint
  • @identitiesEmails - Check if emails match source accounts

This script programmatically creates a Preservation on an existing Slack Enterprise Datasource. The Preservation exists in a workspace which is created when the script is executed.

After the identities involved with the matter have been verified as part of the source, the preservation is created with optional start and end dates.

If you don't know the ID of the Datasource(s) you want to include in the Preservation, a utility function get_slack_enterprise_sources can be used, generating a file called Slack Enterprise Sources.csv.

Once the Datasource ID is known, include it in the command arguments. An example command is shown at the bottom of this page.

You will also need a file that lists matter names and email addresses of those identities that are associated with a matter:

Matter 1, name@example.com
Matter 1, name2@example.com
Matter 2, name3@example.com
Matter 2, name@example.com
1
2
3
4

# Setup

Create a directory for the script and its dependencies

mkdir $DIR #choose a name for the directory
1

Change into the newly created directory

cd $DIR
1

Create a file with the name requirements.txt with the following content

python-dateutil==2.8.1
requests==2.22.0
1
2

In the same directory create a file with the name create_preservation.py with the following content

# -*- encoding: utf-8 -*-
import argparse
import csv
import json
import requests
import sys

from dateutil.parser import parse


argparser = argparse.ArgumentParser(
    description="Create a Preservation in a workspace from Slack Enterprise datasource",
    epilog="You can also get a list of existing Slack Enterprise datasources in your account",
)
argparser.add_argument(
    "--username", required=True, type=str, help="Onna Account username"
)
argparser.add_argument("--password", required=True, type=str, help="password")
argparser.add_argument(
    "--account", required=True, type=str, help="The Onna account name"
)
argparser.add_argument(
    "--account_url",
    required=True,
    type=str,
    help="the URL of your account, e.g https://company.onna.io or https://enterprise.onna.com",
)
argparser.add_argument(
    "--from_date",
    type=str,
    help="start date. most date formats are accepted",
)
argparser.add_argument(
    "--to_date",
    type=str,
    help="end date. most date formats are accepted",
)
argparser.add_argument(
    "--container", required=False, default="rel0", help="name of the account container"
)
argparser.add_argument("--datasources", nargs="+", help="list of datasource ids")

argparser.add_argument(
    "--list_datasources",
    required=False,
    action="store_true",
    help="fetch datasource ids to include in the preservation",
)


def auth_code(url=None):
    if not url:
        raise Exception
    resp = requests.get(url)
    if resp.status_code == 200:
        return resp.json()["auth_code"]


def auth_token(auth_code, username, password, scope, base_url):
    payload = {
        "grant_type": "user",
        "code": auth_code,
        "username": username,
        "password": password,
        "scopes": [scope],
        "client_id": "canonical",
    }
    headers = {"Accept": "application/json"}
    resp = requests.post(
        f"{base_url}/auth/oauth/get_auth_token",
        headers=headers,
        data=json.dumps(payload),
    )
    if resp.status_code == 200:
        jwt_token = resp.text
    return jwt_token


def open_file_and_parse(file_path):
    with open(file_path, "r") as f:
        text = f.read()
    json_text = json.loads(text)
    return json_text


def open_file_and_get_lines(file_path):
    with open(file_path, "r", encoding="utf8") as f:
        content = f.readlines()
    content = [x.strip() for x in content]
    return content


def write_array_of_arrays_to_csv(array_info, output_file):
    with open(output_file, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerows(array_info)


def write_array_to_file(array_info, output_file):
    with open(output_file, "w", newline="\n", encoding="utf8") as f:
        for x in array_info:
            f.write(x)
            f.write("\r\n")


def verification_request(emails, token, account_url):

    url = f"{account_url}/@identitiesEmails"

    payload = json.dumps(emails)

    headers = {
        "authorization": f"Bearer {token}",
        "content-type": "application/json",
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    if response.status_code == 200:
        return response.json()
    else:
        print(response.status_code)
        return None


def parse_response(response):

    found_emails = []
    not_found_emails = []
    for account in response:
        if account.get("found") is True:
            found_emails.append(account.get("email"))
        else:
            not_found_emails.append(account.get("email"))

    return found_emails, not_found_emails


# Step 1
# Verify emails have identities


def verify_emails(token, account_url):

    matters_and_emails = open_file_and_get_lines("matters_emails.csv")

    matters_dict = dict()
    emails_dict = dict()

    for line in matters_and_emails:
        info = line.split(",")
        matter_name = info[0]
        email_address = info[1]

        if matter_name in matters_dict:
            existing_users = matters_dict[matter_name]["emails"]
            existing_users.append(email_address)
        else:
            matters_dict[matter_name] = {"emails": [email_address]}

        if info[1] in emails_dict:
            existing_matters = emails_dict[info[1]]
            existing_matters.append(matter_name)
        else:
            emails_dict[email_address] = [matter_name]

    # Batch email requests into groups of 1000
    email_batches = chunks(list(emails_dict.keys()), 1000)
    all_found_users = []
    all_not_found_users = []

    for email_batch in email_batches:
        verification_response = verification_request(email_batch, token, account_url)
        if verification_response is not None:
            found_users, not_found_users = parse_response(verification_response)
        else:
            print("invalid verification response")
            found_users = []
            not_found_users = []
        all_found_users.extend(found_users)
        all_not_found_users.extend(not_found_users)

    # Write file with not found users and matters
    not_found_array = [["matter", "email"]]
    for user in all_not_found_users:
        user_matters = emails_dict[user]
        for um in user_matters:
            not_found_array.append([um, user])

    write_array_of_arrays_to_csv(not_found_array, "Users Not Found.csv")

    return all_found_users, emails_dict, matters_dict


def get_email_identities(email_addresses, account_url, token):

    url = f"{account_url}/@frontsearch"

    payload = (
        '{"advanced":{"and":[{"in":[{"var":"type_name"},["RealIdentity"]]},{"in":[{'
        '"var":"from_mail.keyword"},' + json.dumps(email_addresses) + "]"
        '}]},"from":0,"sort":{"field":"title.keyword","direction":"asc"},"includes":["title",'
        '"from_mail"],"size":' + str(len(email_addresses)) + "} "
    )
    headers = {"authorization": f"Bearer {token}", "content-type": "application/json"}

    response = requests.request("POST", url, headers=headers, data=payload)

    if response.status_code == 200:
        return response.json()
    else:
        return None


def parse_email_identities(email_identities_resp):
    email_identities_dict = dict()
    for member in email_identities_resp["member"]:
        if member["from_mail"] not in email_identities_dict:
            email_identities_dict[member["from_mail"]] = member["@uid"]

    return email_identities_dict


def create_preservation(
    preservation_name, identities, sources, from_date, to_date, token, account_url
):

    preservation_id = preservation_name.lower().replace(" ", "-")
    preservation_id = preservation_id.replace(".", "")

    url = f"{account_url}/workspaces"

    raw_payload = {
        "id": preservation_id,
        "@type": "Workspace",
        "title": preservation_name,
        "legal_hold": {
            "query": {
                "advanced": {
                    "and": [
                        {"in": [{"var": "parent_datasource.uuid"}, sources]},
                        {"in": [{"var": "identity-member"}, identities]},
                    ]
                }
            }
        },
    }
    if from_date is not None:
        raw_payload["legal_hold"]["query"]["advanced"]["and"].append(
            {">": [{"var": "date_modified"}, from_date]}
        )

    if to_date is not None:
        raw_payload["legal_hold"]["query"]["advanced"]["and"].append(
            {"<": [{"var": "date_modified"}, to_date]}
        )

    payload = json.dumps(raw_payload)

    headers = {
        "accept": "application/json",
        "authorization": f"Bearer {token}",
        "content-type": "application/json",
    }

    response = requests.request("POST", url, headers=headers, data=payload)

    if response.status_code == 201:
        trigger_smart_action(response.json(), token)


def trigger_smart_action(response, token):

    workspace_url = response.get("@id", None)
    if workspace_url is not None:
        url = f"{workspace_url}/@smartactionCheck"

        payload = "{}"
        headers = {"authorization": f"Bearer {token}"}

        response = requests.request("POST", url, headers=headers, data=payload)

        if response.status_code == 200:
            print("scheduled")


def chunks(items, n):
    final = [items[i * n : (i + 1) * n] for i in range((len(items) + n - 1) // n)]
    return final


def get_slack_enterprise_sources(token, account_url):
    """parse only the Enterprise Slack sources to a csv with the Title, creation date, and UUID"""
    url = f"{account_url}/@data?types=SlackEDatasource"

    payload = {}
    headers = {
        "authorization": f"Bearer {token}",
    }

    response = requests.request("GET", url, headers=headers, data=payload)

    if response.status_code == 200:
        sources_array = []
        sources = response.json()
        for source in sources["updates"]:
            sources_array.append(
                [source["title"], source["@uid"], source["creation_date"]]
            )

        write_array_of_arrays_to_csv(sources_array, "Slack Enterprise Sources.csv")


def main():
    try:
        from_date = int(parse(args.from_date).timestamp())
    except (TypeError, ValueError):
        from_date = None
    try:
        to_date = int(parse(args.to_date).timestamp())
    except (TypeError, ValueError):
        to_date = None

    username = args.username
    password = args.password
    account = args.account
    base_url = args.account_url
    container = args.container

    account_url = f"{base_url}/api/{container}/{account}"

    auth_code_url = f"{account_url}/@oauthgetcode?client_id=canonical&scope={account}"
    code = auth_code(auth_code_url)
    token = auth_token(code, username, password, account, base_url)

    if args.list_datasources:
        get_slack_enterprise_sources(token, account_url)
        sys.exit(0)

    datasource_ids = args.datasources

    print("starting to verify emails")
    emails, emails_to_matters, matters_info = verify_emails(token, account_url)
    print("starting to get identities")
    email_identities_response = get_email_identities(emails, account_url, token)

    if email_identities_response is not None:
        email_identities_dictionary = parse_email_identities(email_identities_response)
    else:
        return

    for matter in matters_info.keys():
        matter_emails = matters_info[matter]["emails"]

        matter_identities = []
        for email in matter_emails:
            if email in email_identities_dictionary:
                matter_identities.append(email_identities_dictionary[email])

        matters_info[matter]["identities"] = matter_identities

    data_source_ids = datasource_ids

    for matter in matters_info.keys():
        matters_info[matter]["sources"] = data_source_ids

    matters_to_create = []
    for matter in matters_info.keys():
        matters_to_create.append(
            [
                matter,
                len(matters_info[matter]["sources"]),
                len(matters_info[matter]["identities"]),
                len(matters_info[matter]["emails"]),
            ]
        )
    write_array_of_arrays_to_csv(matters_to_create, "matters_to_create.csv")

    print("creating preservations")
    count = 0
    for matter in matters_info.keys():
        count += 1
        print(f"creating {str(count)} of {str(len(matters_info))}")
        sources = matters_info[matter]["sources"]
        identities = matters_info[matter]["identities"]
        # from_date = None
        # to_date = None

        create_preservation(
            matter,
            identities,
            sources,
            from_date,
            to_date,
            token,
            account_url,
        )


if __name__ == "__main__":
    args = argparser.parse_args()
    main()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402

In the same directory create a CSV file named matters_emails.csv and populate it with matter names and email addresses. The first column is the title of the Matter and the second is an identity, in this case an email address, associated with that matter.

Still in the same directory, create a virtual environment

python3 -m venv env
1

Activate the virtualenv

source env/bin/activate
1

Install all dependencies with pip

pip install -r requirements.txt
1

# Script

# Usage

Tip

Since you are running the script in your activated Python 3 virtual environment python is linked to Python 3.

# Example

python create_preservation.py --username example1 --password 1234 --account example \
--account_url https://enterprise.onna.com \
--from_date "01/01/2020" --to_date "01/20/2020" \
--container example --datasources 05d175216e544a85bc4a0a582537fb5f 91db0f443c9243d99960d4bbf1f53d81\
1
2
3
4

To generate a file with the ID, title and creation date of Slack Enterprise datasources in your Onna account:

python create_preservation.py --username example1 --password 1234 --account example \
--account_url https://enterprise.onna.com \
--container example --list-datasources
1
2
3
Last Updated: 11/25/2020, 4:48:46 PM