rev: tip chemfp_examples/chembl_search_cli.py -rw-r--r-- 13.8 KiB View raw Log this file
871d0037866bAndrew Dalke added description of how the cli output SDF is generated 2 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
#!/usr/bin/env python

# Demo program to generate SDF output based on fingerprint similarity
# search and database lookup.
#
# This program requires RDKit, chemfp, and ChEMBL 28 data. 
# See 'python chembl_search_cli.py --help' for more details.

# Copyright 2021, Andrew Dalke <dalke@dalkescientific.com>
# Andrew Dalke Scientific AB (Sweden)
#
# Distributed under the terms of the MIT license.
# ------------------------------------------------
# 
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import time

# Create a context manager to help measure timings
class Timer:
    total_time = None
    def __enter__(self):
        self.start_time = time.time()
        return self
    def __exit__(self, *args):
        self.total_time = time.time() - self.start_time
    def __format__(self, format_spec):
        return format(self.total_time, format_spec)
    
with Timer() as import_time:
    import argparse
    import os
    import sqlite3
    import sys
    
    import chemfp # requires the 'chemfp' package from https://chemfp.com/

    # RDKit has a high import overhead, in part due to importing NumPy.
    # I'll import the Morgan fingerprinting module now so its import time
    # isn't included as part of the fingerprint generation time.
    from rdkit.Chem import rdMolDescriptors # requires RDKit, from http://rdkit.org/

# Helper function to report an error message and exit.
def die(msg):
    raise SystemExit("ERROR: " + msg)
    
# Set up the command-line parser
def positive_int(value):
    try:
        v = int(value)
        if v < 1:
            raise ValueError
    except ValueError:
        raise argparse.ArgumentTypeError("must be a positive integer")
    return v

epilog = """\

Given a query, which may be a ChEMBL id, ChEMBL molecule name, or a
SMILES string, do a fingerprint similarity search using the RDKit
Morgan fingerprints from the ChEMBL distribution, the use the contents
of the ChEMBL SQLite to generate the results in SDF format.

Use --sqlite to specify the location the SQLite file instead of using
the default location 'chembl_28.db' in the current directory. The file
can be extracted from
ftp://ftp.ebi.ac.uk/pub/databases/chembl/ChEMBLdb/releases/chembl_28/chembl_28_sqlite.tar.gz

Use --fingerprints to specify the location of the ChEMBL
fingerprints. By default the program looks for 'chembl_28.fpb',
'chembl_28.fpb.gz', or 'chembl_28.fpb.zst' in the current directory.
The file can be downloaded from
https://chemfp.com/datasets/chembl_28.fpb.gz . Additional details
available at https://chemfp.com/datasets/ .

The default search, if neither -k nor --threshold are specified, finds
the k=10 nearest neighbors. Use --threshold to find all records at or
above a given Tanimoto similarity (between 0.0 and 1.0). Use -k to
find the k-nearest neighbors, optionally also limited to a given
threshold.

The output is in SDF format, written to stdout. Each record includes
the corresponding molfile entry from SQLite file, with the ChEMBL id
in the title line, and with two data items: 'smiles' and 'similarity'.
based on the canonical SMILES from the SQLite file and from the
similarity search results, respectively.

The output records are sorted from most similar to least.

Status and timing information is written to stderr. The program was
carefully written to make it easier to understand where time is being
spent. By default, if the input is an uncompressed FPB file then
fingerprint search uses a memory-mapped file, causing the measured
search time to also include data load time. The --no-mmap option loads
the entire data set into memory first, so the search time is only the
search time.

Examples:

1) The default finds the k=10 nearest neighbors to
"Cn1cnc2c1c(=O)[nH]c(=O)n2C", which is a SMILES string for
theobromine.

  % python chembl_search_cli.py

2) Find the k=1 nearest neighbors to "caffeine". (The SMILES is found
via a compound name lookup in the SQLite file.)

  % python chembl_search_cli.py caffeine -k 1

3) Find the 544 records which are at least 0.4 similar to CHEMBL25:

  % python chembl_search_cli.py CHEMBL25 --threshold 0.4

4) Find up to 10 records which are at least 0.6 similar to the SMILES
for phenol (there are only two), and disable the use of memory-mapped
files:

  % python chembl_search_cli.py 'c1ccccc1O' -k 10 --threshold 0.6 --no-mmap

.
"""

parser = argparse.ArgumentParser(
    description = "ChEMBL similarity search, output as an SDF with matching records",
    formatter_class = argparse.RawDescriptionHelpFormatter,
    epilog=epilog,
    )
parser.add_argument(
    "-k", "--k", metavar = "INT", type = positive_int,
    help = "Number of structures to find.")
parser.add_argument(
    "-t", "--threshold", metavar = "FLOAT", type = float,
    help = "Find all structures with at least the given similarity.")
parser.add_argument(
    "-f", "--fingerprints", metavar = "FILENAME",
    dest = "fp_filename",
    help = "Location of the FPB file containing ChEMBL fingerprint.",
    )
parser.add_argument(
    "-s", "--sqlite", metavar = "FILENAME",
    dest = "sqlite_filename",
    help = "Location of 'chembl_28.db' SQLite3 file from the ChEMBL distribution.",
    )
parser.add_argument(
    "--no-mmap", action = "store_true",
    help = "Read uncompressed FPB files into memory instead of using a memory-mapped file",
    )
parser.add_argument(
    "-q", "--quiet", action = "store_true",
    help = "Do not write progress and timing information to stderr.",
    )
parser.add_argument(
    "query", nargs = "?",
    help = "Query structure, as a ChEMBL id, ChEMBL title, or SMILES. Defaults to the theobromine SMILES.",
    )

def quiet_info(msg):
    pass

def stderr_info(msg):
    sys.stderr.write(msg + "\n")

def main(argv = None):
    args = parser.parse_args(argv)

    if args.quiet:
        info = quiet_info
    else:
        info = stderr_info
    
    # Use the --fingerprints filename or search for the expected name
    # in the current directory.
    fp_filename = get_filename(
        args.fp_filename,
        ["chembl_28.fpb", "chembl_28.fpb.gz", "chembl_28.fpb.zst"],
        "Could not find the ChEMBL fingerprints.\n"
        "--fingerprints not specified and FPB file not found in the current directory.\n"
        "Download it from https://chemfp.com/datasets/chembl_28.fpb.gz .\n"
        "See https://hg.sr.ht/~dalke/chemfp_examples/ for setup details.")
    
    # Use the --sqlite filename or search for the expected name
    # in the current directory.
    sqlite_filename = get_filename(
        args.sqlite_filename,
        ["chembl_28.db"],
        "Could not find the ChEMBL SQLite distribution.\n"
        "--sqlite not specified and 'chembl_28.db' not found in the currrent directory.\n"
        "See https://hg.sr.ht/~dalke/chemfp_examples/ for setup details.")

    # Set the default search parameters if nothing was specified
    k, threshold = args.k, args.threshold
    if k is None and threshold is None:
        # No parameters specified. Default to a k=10 nearest search.
        k = 10
        threshold = 0.0
    elif threshold is None:
        # -k is specified, so set the minimum threshold to 0.0
        threshold = 0.0
    

    with Timer() as sqlite_connect_time:
        # Connect to ChEMBL's SQLite database.
        # Test if the file exist first, because of it doesn't then SQLite will create one.
        if not os.path.exists(sqlite_filename):
            die(f"No ChEMBL SQLite file found at {sqlite_filename!r}. Use --sqlite to specify an alternate location.")
        db = sqlite3.connect(sqlite_filename)
        c = db.cursor()
        version_name, version_comments = c.execute("SELECT name, comments FROM version").fetchone()

    with Timer() as id_lookup_time:
        # Figure out if the query is a ChEMBL id or name, otherwise assume it's a SMILES
        query_type, query_smiles = get_query_smiles(c, args.query)

    with Timer() as fingerprint_load_time:
        # Load the ChEMBL fingerprints.
        # If memory-mapped files are allowed then uncompressed FPB files
        # aren't actually loaded here. Instead, the load occurs with search.
        # Use --no-mmap to disable memory-mapping and read the file into memory.
        # This increases load time and decreases search time.
        arena = chemfp.load_fingerprints(fp_filename, allow_mmap=not args.no_mmap)
    
    # show some progress information
    info(f"Searching the {len(arena):,} fingerprints of {version_name} ({version_comments}).")

    # Convert the SMILES into a fingerprint
    with Timer() as fingerprint_generation_time:
        # Get the fingerprint type string from the metadata section of the
        # fingerprint file and return the a fingerprint type object.
        # Use it to parse the SMILES and return the corresponding fingerprint.
        fptype = arena.get_fingerprint_type()
        query_fp = fptype.parse_molecule_fingerprint(query_smiles, "smi", errors="ignore")
        if query_fp is None:
            die(f"Cannot generate a fingerprint for {query_type} {query_smiles!r}")

    with Timer() as search_time:
        # Do the appropriate similarity search depending on the query parameters.
        if k is None:
            info(f"Find all fingerprints at least {threshold} similar to the query.")
            hits = arena.threshold_tanimoto_search_fp(query_fp, threshold=threshold)
            # Threshold search might not be sorted. Reorder from most to least similar.
            hits.reorder("decreasing-score")
        else:
            info(f"Find the k={k} nearest fingerprints at least {threshold} similar to the query.")
            hits = arena.knearest_tanimoto_search_fp(query_fp, k=k, threshold=threshold)
            # The k-nearest search is sorted from most to least similar.

    info(f"Number of hits found: {len(hits):,}")

    with Timer() as output_time:
        # For each ChEMBL id, search for the corresponding molfile in the
        # database and add a title, 'smiles' tag, and 'similarity' tag to
        # give an output SDF.
        print_hits(c, hits)

    # Print timing information
    info(f"Import time: {import_time:.2f} seconds")
    info(f"SQLite connect time: {sqlite_connect_time:.2f} seconds")
    info(f"Id lookup time: {id_lookup_time:.2f} seconds")
    info(f"Fingerprint load time: {fingerprint_load_time:.2f} seconds")
    info(f"Fingerprint generation time: {fingerprint_generation_time:.2f} seconds")
    info(f"Similarity search time: {search_time:.2f} seconds")
    info(f"Output time: {output_time:.2f} seconds")

    ## Done!

    
def get_filename(filename, default_filenames, errmsg):
    if filename is not None:
        return filename
    for filename in default_filenames:
        if os.path.exists(filename):
            return filename
    die(errmsg)

# Helper function to figure out what the query was and turn it into a SMILES.
def get_query_smiles(c, query):
    if query is None:
        return ("default query", "Cn1cnc2c1c(=O)[nH]c(=O)n2C")
    
    if query.startswith("CHEMBL"):
        c.execute("SELECT entity_type, entity_id FROM chembl_id_lookup WHERE chembl_id = ?",
                      (query,))
        row = c.fetchone()
        if row is None:
            die(f"{query} not present in the database")
        
        entity_type, entity_id = row
        if entity_type != "COMPOUND":
            die(f"{query} type is '{entity_type}', expecting 'COMPOUND'")

        molregno = entity_id
        c.execute("SELECT canonical_smiles FROM compound_structures WHERE molregno = ?",
                      (molregno,))
        row = c.fetchone()
        if row is None:
            die(f"Unable to get canonical SMILES for {query} ({molregno})")
        return ("ChEMBL id", row[0])

    # See if it's a compound name
    c.execute("""
SELECT canonical_smiles FROM compound_structures, compound_records
                       WHERE compound_structures.molregno = compound_records.molregno
                         AND compound_name = ?
                     LIMIT 1""", (query,))
    row = c.fetchone()
    if row is not None:
        return ("ChEMBL title", row[0])

    # Treat it as a SMILES
    return ("query SMILES", query)



def print_hits(c, hits):
    for hit_id, score in hits.get_ids_and_scores():
        c.execute("""
SELECT molfile, canonical_smiles FROM compound_structures, chembl_id_lookup
              WHERE compound_structures.molregno = chembl_id_lookup.entity_id
                AND chembl_id = ?""", (hit_id,))
        row = c.fetchone()
        if row is None:
            sys.stderr.write(f"ERROR: Could not fetch molfile for {hit_id}. Skipping.\n")
            continue

        # Convert the molfile into an SDF record
        molfile, smiles = row
        # Add the hit_id to the title
        print(hit_id + molfile)
        # Write "smiles" and "similarity" tags as the tag data
        for (tag, value) in (("smiles", smiles), ("similarity", score)):
            print(f"> <{tag}>\n{value}\n")
        # End the tag data
        print("$$$$")

    
if __name__ == "__main__":
    main()