BitMagic-C++
xsample07a.cpp
Go to the documentation of this file.
1/*
2Copyright(c) 2002-2020 Anatoliy Kuznetsov(anatoliy_kuznetsov at yahoo.com)
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15
16For more information please visit: http://bitmagic.io
17*/
18
19/** \example xsample07a.cpp
20 Use of bvector<> for k-mer fingerprint K should be short,
21 no minimizers here
22*/
23
24/*! \file xsample07a.cpp
25 \brief Example: Use of bvector<> for k-mer fingerprint
26 K should be short, no minimizers here (k < 24)
27
28 Example loads FASTA file (large multi-molecule file is expected,
29 builds a collection of k-mers for each molecule and runs
30 clusterization algorithm on the input collection using
31 set intersect (logical AND) as a similarity measure.
32
33 Example uses std::async for running parallel jobs.
34*/
35
36#include <assert.h>
37#include <stdlib.h>
38#include <math.h>
39
40#include <iostream>
41#include <vector>
42#include <list>
43#include <map>
44#include <algorithm>
45#include <utility>
46#include <memory>
47
48#include <future>
49#include <thread>
50#include <mutex>
51#include <atomic>
52
53#include "bm64.h" // use 48-bit vectors
54#include "bmalgo.h"
55#include "bmserial.h"
56#include "bmaggregator.h"
57#include "bmsparsevec_compr.h"
58#include "bmsparsevec_algo.h"
59#include "bmrandom.h"
60
61
62// BitMagic utilities for debug and timings
63#include "bmdbg.h"
64#include "bmtimer.h"
65#include "bmundef.h" /* clear the pre-proc defines from BM */
66
67#include "dna_finger.h"
68
69using namespace std;
70
71
72
73// Arguments
74//
75std::string ifa_name;
76std::string ikd_name;
77std::string ikd_counts_name;
78std::string kh_name;
79std::string ikd_rep_name;
80std::string ikd_freq_name;
81bool is_diag = false;
82bool is_timing = false;
83bool is_bench = false;
84unsigned ik_size = 8;
85unsigned parallel_jobs = 4;
86unsigned f_percent = 5; // percent of k-mers we try to clear as over-represented
87
88#include "cmd_args.h"
89
90
91
92// Global types
93//
95typedef std::vector<char> vector_char_type;
96typedef bm::dynamic_heap_matrix<unsigned, bm::bvector<>::allocator_type> distance_matrix_type;
97typedef std::vector<std::unique_ptr<bvector_type> > bvector_ptr_vector_type;
99typedef std::vector<std::pair<bv_size_type, bv_size_type> > bv_ranges_vector;
100
101
102// Global vars
103//
105
106
107/// wait for any opening in a list of futures
108/// used to schedule parallel tasks with CPU overbooking control
109///
110template<typename FV>
111void wait_for_slot(FV& futures, unsigned* parallel_cnt, unsigned concurrency)
112{
113 do
114 {
115 for (auto e = futures.begin(); e != futures.end(); ++e)
116 {
117 std::future_status status = e->wait_for(std::chrono::milliseconds(100));
118 if (status == std::future_status::ready)
119 {
120 (*parallel_cnt) -= 1;
121 futures.erase(e);
122 break;
123 }
124 } // for e
125 } while (*parallel_cnt >= concurrency);
126}
127
128/// Collection of sequences and k-mer fingerprint vectors
129///
131{
132public:
133 typedef std::vector<unsigned char> buffer_type;
134public:
135
137 {}
138 CSequenceColl(const CSequenceColl&) = delete;
139
140 void add_sequence(const string& acc, vector_char_type* seq_ptr)
141 {
142 m_acc.push_back(acc);
143 m_seqs.emplace_back(seq_ptr);
144 }
145
146 void set_buffer(size_t i, const buffer_type& buf)
147 {
148 unique_ptr<buffer_type> buf_ptr(new buffer_type(buf));
149 {
150 static std::mutex mtx_counts_lock;
151 std::lock_guard<std::mutex> guard(mtx_counts_lock);
152
153 if (m_kmer_bufs.size() <= i)
154 m_kmer_bufs.resize(i+1);
155 m_kmer_bufs[i].reset(buf_ptr.release());
156 }
157 }
159 {
160 m_kmer_bufs.resize(this->size());
161 }
162
163 size_t size() const
164 { assert(m_seqs.size() == m_acc.size()); return m_seqs.size(); }
165
166 const string& get_acc(size_t i) const { return m_acc[i]; }
167 const vector_char_type& get_sequence(size_t i) const { return *(m_seqs[i]); }
168
169 size_t seq_size(size_t i) const { return m_seqs[i]->size(); }
170
171 size_t total_seq_size() const
172 {
173 size_t sum = 0;
174 for (size_t i = 0; i < m_seqs.size(); ++i)
175 sum += seq_size(i);
176 return sum;
177 }
178
179 ///
180 size_t buf_size() const { return m_kmer_bufs.size(); }
181
182 /// Get k-mer vector BLOB size
183 size_t get_buf_size(size_t i) const { return m_kmer_bufs[i]->size(); }
184
185 /// Get k-mer BLOB pointer
186 const unsigned char* get_buf(size_t i) const
187 {
188 const buffer_type* p = m_kmer_bufs[i].get();
189 if (!p)
190 return 0;
191 return p->data();
192 }
193
194 /// Deserialize group of k-mer fingerprint vectors
195 ///
197 const bm::bvector<>& bv_req,
198 bm::bvector<>::size_type bv_req_count) const;
199
200
201
202private:
203 vector<unique_ptr<vector_char_type> > m_seqs;
204 vector<string> m_acc;
205 vector<unique_ptr<buffer_type> > m_kmer_bufs;
206};
207
209 const bm::bvector<>& bv_req,
210 bm::bvector<>::size_type bv_req_count) const
211{
212 std::list<std::future<void> > futures;
213
214 k_mers_vect.resize(0);
215 k_mers_vect.reserve(bv_req_count);
216
217 bm::bvector<>::enumerator en(bv_req.first());
218 for (; en.valid(); ++en)
219 {
220 auto i_idx = *en;
221 bm::bvector<>* bv = new bm::bvector<>(); // k-mer fingerprint
222 k_mers_vect.emplace_back(bv);
223 const unsigned char* buf = this->get_buf(i_idx);
224 if (!buf)
225 continue;
226 futures.emplace_back( // async decompress
227 std::async(std::launch::async,
228 [bv, buf]()
229 { BM_DECLARE_TEMP_BLOCK(tb); bm::deserialize(*bv, buf, tb); }
230 ));
231
232 } // for en
233 for (auto& e : futures)
234 e.wait();
235}
236
237
238// -----------------------------------------------------------------------
239//
240
241/// Load multi-sequence FASTA
242///
243static
244int load_FASTA(const std::string& fname, CSequenceColl& seq_coll)
245{
246 unique_ptr<vector_char_type> seq_vect(new vector_char_type());
247 std::string line, acc;
248
249 std::ifstream fin(fname.c_str(), std::ios::in);
250 if (!fin.good())
251 return -1;
252 for (size_t i = 0; std::getline(fin, line); ++i)
253 {
254 if (line.empty())
255 continue;
256
257 if (line.front() == '>') // defline
258 {
259 if (!acc.empty())
260 {
261 seq_vect->shrink_to_fit();
262 seq_coll.add_sequence(acc, seq_vect.release());
263 acc.resize(0);
264 seq_vect.reset(new vector_char_type());
265 }
266
267 std::size_t pos = line.find_first_of(":");
268 if (pos == std::string::npos) // not found
269 {
270 acc = line;
271 }
272 else
273 {
274 acc = line.substr(1, pos-1);
275 }
276 continue;
277 }
278 for (std::string::iterator it = line.begin(); it != line.end(); ++it)
279 seq_vect->push_back(*it);
280 } // for
281
282 if (!acc.empty())
283 {
284 seq_vect->shrink_to_fit();
285 seq_coll.add_sequence(acc, seq_vect.release());
286 }
287
288 cout << "\r \r" << endl;
289 return 0;
290}
291
292/// save k-mer vectors to a file
293static
294void save_kmer_buffers(const std::string& fname, const CSequenceColl& seq_coll)
295{
296 char magic_ch = '\t';
297 std::ofstream bfile (fname, std::ios::out | std::ios::binary);
298 if (!bfile.good())
299 {
300 std::cerr << "Cannot open file for write: " << fname << std::endl;
301 exit(1);
302 }
303
304 // save collection size
305 size_t sz = seq_coll.size();
306 bfile.write((char*)&sz, std::streamsize(sizeof(sz)));
307
308 // save the collection elements
309 //
310 for (size_t i = 0; i < sz; ++i)
311 {
312 size_t buf_size = 0;
313 const unsigned char* buf = seq_coll.get_buf(i);
314 if (!buf)
315 {
316 bfile.write((char*)&buf_size, std::streamsize(sizeof(buf_size)));
317 continue;
318 }
319 buf_size = seq_coll.get_buf_size(i);
320 bfile.write((char*)&buf_size, std::streamsize(sizeof(buf_size)));
321 if (buf_size)
322 {
323 bfile.write((char*)buf, std::streamsize(buf_size));
324 bfile.write((char*)&magic_ch, 1);
325 }
326 } // for i
327}
328
329/// Load k-mer vectors
330///
331static
332void load_kmer_buffers(const std::string& fname, CSequenceColl& seq_coll)
333{
334 char magic_ch = '\t';
335 std::ifstream bfile (fname, std::ios::in | std::ios::binary);
336 if (!bfile.good())
337 {
338 std::cerr << "Cannot open file for read: " << fname << std::endl;
339 exit(1);
340 }
341
342 // save collection size
343 size_t sz;
344 bfile.read((char*)&sz, std::streamsize(sizeof(sz)));
345
347
348 // load the collection elements
349 //
350 for (size_t i = 0; i < sz; ++i)
351 {
352 size_t buf_size = 0;
353 bfile.read((char*)&buf_size, std::streamsize(sizeof(buf_size)));
354 if (buf_size)
355 {
356 buf.resize(buf_size);
357 bfile.read((char*) buf.data(), std::streamsize(buf_size));
358 char control_ch = 0;
359 bfile.read((char*)&control_ch, 1);
360 if (control_ch != magic_ch)
361 {
362 cerr << "Error: read failure!" << endl;
363 exit(1);
364 }
365 seq_coll.set_buffer(i, buf);
366 }
367
368 } // for i
369}
370
371
372
373inline
374bool get_DNA_code(char bp, bm::id64_t& dna_code)
375{
376 switch (bp)
377 {
378 case 'A':
379 dna_code = 0; // 00
380 break;
381 case 'T':
382 dna_code = 1; // 01
383 break;
384 case 'G':
385 dna_code = 2; // 10
386 break;
387 case 'C':
388 dna_code = 3; // 11
389 break;
390 default: // ambiguity codes are ignored (for simplicity)
391 return false;
392 }
393 return true;
394}
395
396/// Calculate k-mer as an unsigned long integer
397///
398///
399/// @return true - if k-mer is "true" (not 'NNNNNN')
400///
401inline
402bool get_kmer_code(const char* dna,
403 size_t pos, unsigned k_size,
404 bm::id64_t& k_mer)
405{
406 // generate k-mer
407 //
408 bm::id64_t k_acc = 0;
409 unsigned shift = 0;
410 dna += pos;
411 for (size_t i = 0; i < k_size; ++i)
412 {
413 char bp = dna[i];
414 bm::id64_t dna_code;
415 bool valid = get_DNA_code(bp, dna_code);
416 if (!valid)
417 return false;
418 k_acc |= (dna_code << shift); // accumulate new code within 64-bit accum
419 shift += 2; // each DNA base pair needs 2-bits to store
420 } // for i
421 k_mer = k_acc;
422 return true;
423}
424
425
426/// Translate integer code to DNA letter
427///
428inline
429char int2DNA(unsigned code)
430{
431 static char lut[] = { 'A', 'T', 'G', 'C', 'N', 'M', '$' };
432 if (code < 5)
433 return lut[code];
434 assert(0);
435 return 'N';
436}
437
438/// Translate k-mer code into ATGC DNA string
439///
440/// @param dna - target string
441/// @param k_mer - k-mer code
442/// @param k_size -
443inline
444void translate_kmer(std::string& dna, bm::id64_t kmer_code, unsigned k_size)
445{
446 dna.resize(k_size);
447 for (size_t i = 0; i < k_size; ++i)
448 {
449 unsigned dna_code = unsigned(kmer_code & 3);
450 char bp = int2DNA(dna_code);
451 dna[i] = bp;
452 kmer_code >>= 2;
453 } // for i
454 assert(!kmer_code);
455}
456
457
458
459/**
460 This function turns each k-mer into an integer number and encodes it
461 in a bit-vector (presense vector)
462 The natural limitation here is that integer has to be less tha 48-bits
463 (limitations of bm::bvector<>)
464 This method build a presense k-mer fingerprint vector which can be
465 used for Jaccard distance comparison.
466
467 @param bv - [out] - target bit-vector
468 @param seq_vect - [out] DNA sequence vector
469 @param k-size - dimention for k-mer generation
470 @param k_buf - sort buffer for generated k-mers
471 @param chunk_size - sort buffer size (number of k-mers per sort)
472 */
473template<typename BV>
475 const vector_char_type& seq_vect,
476 unsigned k_size,
477 std::vector<bm::id64_t>& k_buf,
478 const bm::id64_t chunk_size = 400000000
479 )
480{
481 bv.clear();
482 bv.init(); // need to explicitly init to use bvector<>::set_bit_no_check()
483 if (seq_vect.empty())
484 return;
485 const char* dna_str = &seq_vect[0];
486
487 k_buf.reserve(size_t(chunk_size));
488 k_buf.resize(0);
489
490 {
491 bm::id64_t k_mer_code;
492 vector_char_type::size_type dna_sz = seq_vect.size()-(k_size-1);
493 vector_char_type::size_type pos = 0;
494 bool valid = false;
495 for (; pos < dna_sz; ++pos)
496 {
497 valid = get_kmer_code(dna_str, pos, k_size, k_mer_code);
498 if (valid)
499 {
500 k_buf.push_back(k_mer_code);
501 break;
502 }
503 } // for pos
504
505 const unsigned k_shift = (k_size-1) * 2;
506 if (valid)
507 {
508 for (++pos; pos < dna_sz; ++pos)
509 {
510 bm::id64_t bp_code;
511 valid = get_DNA_code(dna_str[pos + (k_size - 1)], bp_code);
512 if (!valid)
513 {
514 pos += k_size; // wind fwrd to the next BP char
515 for (; pos < dna_sz; ++pos) // search for the next valid k-mer
516 {
517 valid = get_kmer_code(dna_str, pos, k_size, k_mer_code);
518 if (valid)
519 {
520 k_buf.push_back(k_mer_code);
521 break;
522 }
523 }
524 continue;
525 }
526 // shift out the previous base pair code, OR the new arrival
527 k_mer_code = ((k_mer_code >> 2) | (bp_code << k_shift));
528 // generated k-mer codes are accumulated in buffer for sorting
529 k_buf.push_back(k_mer_code);
530
531 if (k_buf.size() == chunk_size) // soring check.point
532 {
533 std::sort(k_buf.begin(), k_buf.end());
534 if (k_buf.size())
535 {
536 bv.set(&k_buf[0], k_buf.size(), bm::BM_SORTED); // fast bulk set
537 k_buf.resize(0);
538 bv.optimize(); // periodically re-optimize to save memory
539 }
540
541 float pcnt = float(pos) / float(dna_sz);
542 pcnt *= 100;
543 cout << "\r" << unsigned(pcnt) << "% of " << dna_sz
544 << " (" << (pos+1) <<") "
545 << flush;
546 }
547 } // for pos
548 }
549
550 if (k_buf.size()) // add last incomplete chunk here
551 {
552 std::sort(k_buf.begin(), k_buf.end());
553 bv.set(&k_buf[0], k_buf.size(), bm::BM_SORTED); // fast bulk set
554 }
555 }
556}
557
558std::atomic_ullong k_mer_progress_count(0);
559
560static
561void generate_k_mers(CSequenceColl& seq_coll, unsigned k_size,
562 size_t from, size_t to)
563{
564 assert(from <= to);
565 if (!seq_coll.size() || (from >= seq_coll.size()))
566 return;
567
568 std::vector<bm::id64_t> k_buf; // sort buffer
570
572 typedef bm::bvector<>::allocator_type allocator_type;
573 typedef allocator_type::allocator_pool_type allocator_pool_type;
574 allocator_pool_type pool; // local pool for blocks
575
576 bm::bvector<> bv;
577 bm::bvector<>::mem_pool_guard mp_guard_bv; // memory pool reduces allocation calls to heap
578 mp_guard_bv.assign_if_not_set(pool, bv);
579
580 if (!to || to >= seq_coll.size())
581 to = seq_coll.size()-1;
582
583 bm::serializer<bm::bvector<> > bvs; // serializer object
584 bvs.set_bookmarks(false);
585
586 unsigned cnt = 0;
587 for (size_t i = from; i <= to; ++i)
588 {
589 const vector_char_type& seq_vect = seq_coll.get_sequence(i);
590 generate_k_mer_bvector(bv, seq_vect, k_size, k_buf);
591
592 // serialize the vector
593 //
594 typename bm::bvector<>::statistics st;
596
597 buf.resize(st.max_serialize_mem);
598
599 size_t blob_size = bvs.serialize(bv, &buf[0], buf.size());
600 buf.resize(blob_size);
601
602 seq_coll.set_buffer(i, buf);
603
604 // local progress report counter is just to avoid atomic too often
605 ++cnt;
606 if (cnt >= 100)
607 {
608 k_mer_progress_count.fetch_add(cnt);
609 cnt = 0;
610 }
611
612 } // for i
613}
614
615static
616void generate_k_mers_parallel(CSequenceColl& seq_coll, unsigned k_size,
617 unsigned concurrency)
618{
619 size_t total_seq_size = seq_coll.total_seq_size();
620
621 if (!concurrency)
622 concurrency = 1;
623
624 size_t batch_size = total_seq_size / concurrency;
625 if (!batch_size)
626 batch_size = total_seq_size;
627 std::list<std::future<void> > futures;
628
629 for (size_t from = 0; from <= seq_coll.size(); )
630 {
631 size_t to = from;
632 for (size_t to_pick = 0; to < seq_coll.size(); ++to)
633 {
634 to_pick += seq_coll.seq_size(to);
635 if (to_pick >= batch_size)
636 break;
637 } // for
638
639 futures.emplace_back(
640 std::async(std::launch::async,
641 [&seq_coll, k_size, from, to]() { generate_k_mers(seq_coll, k_size, from, to); }
642 ));
643
644 from = to+1;
645 } // for from
646
647 // wait for all jobs to finish, print progress report
648 //
649 unsigned long long cnt = seq_coll.size();
650 for (auto& e : futures)
651 {
652 unsigned long long c_prev = 0;
653 while(1)
654 {
655 std::future_status status = e.wait_for(std::chrono::seconds(60));
656 if (status == std::future_status::ready)
657 break;
658
659 // progress report (entertainment)
660 //
661 unsigned long long c = k_mer_progress_count;
662 auto delta = c - c_prev;
663 c_prev = c;
664
665 auto remain_cnt = cnt - c;
666 auto remain_min = remain_cnt / delta;
667 cout << "\r" << c << ": progress per minute=" << delta;
668 if (remain_min < 120)
669 {
670 cout << " wait for " << remain_min << "m " << flush;
671 }
672 else
673 {
674 auto remain_h = remain_min / 60;
675 cout << " wait for " << remain_h << "h " << flush;
676 }
677 } // while
678 } // for
679 cout << endl;
680}
681
682// -----------------------------------------------------------------------
683
684/// Group (clustrer) of sequences
685///
687{
688public:
689 CSeqGroup(bm::id64_t lead_id = ~0ull)
690 : m_lead_id(lead_id), m_bv_members(bm::BM_GAP)
691 {
692 m_bv_members.set(lead_id);
693 }
694
695 /// set id for the group representative
696 void set_lead(bm::id64_t lead_id)
697 { add_member(m_lead_id = lead_id); }
698 /// Get lead id
699 bm::id64_t get_lead() const { return m_lead_id; }
700
701 /// check is cluster is non-empty
702 bool is_assigned() { return m_lead_id != ~0ull; }
703
704 /// add a member to the group
705 void add_member(bm::id64_t id) { m_bv_members.set_bit_no_check(id); }
706 void add_member(bm::id64_t id, const bm::bvector<>& bv_kmer)
707 {
708 m_bv_members.set_bit_no_check(id);
709 m_bv_kmer_union |= bv_kmer;
710 }
711 void add_member_sync(bm::id64_t id, const bm::bvector<>& bv_kmer)
712 {
713 std::lock_guard<std::mutex> guard(mtx_add_member_lock);
714 m_bv_members.set_bit_no_check(id); // a bit faster than set()
715 m_bv_kmer_union |= bv_kmer;
716 }
717
719 {
720 std::lock_guard<std::mutex> guard(mtx_add_member_lock);
721 m_bv_members.merge(bv_seq);
722 m_bv_kmer_union.merge(bv_kmer);
723 }
724
726 {
727 std::lock_guard<std::mutex> guard(mtx_add_member_lock);
728 return bm::count_and(bv, m_bv_kmer_union);
729 }
730
731 void clear_member(bm::id64_t id) { m_bv_members.set(id, false); }
732
733 bm::bvector<>& get_rep() { return m_bv_rep; }
734 const bm::bvector<>& get_rep() const { return m_bv_rep; }
735
736 const bm::bvector<>& get_members() const { return m_bv_members; }
737 bm::bvector<>& get_members() { return m_bv_members; }
738
739 bm::bvector<>& get_kmer_union() { return m_bv_kmer_union; }
740 const bm::bvector<>& get_kmer_union() const { return m_bv_kmer_union; }
741
742 friend class CSeqClusters;
743
744private:
745 bm::id64_t m_lead_id; ///< groups lead vector ID
746 bm::bvector<> m_bv_members; ///< vector of all group member ids
747 std::mutex mtx_add_member_lock; ///< concurrency protection for add_member_sync()
748
749 bm::bvector<> m_bv_rep; ///< group's representative vector
750 bm::bvector<> m_bv_kmer_union; ///< union of all k-mers of members
751};
752
753// -----------------------------------------------------------------------
754//
755//
756
758{
759public:
760 typedef std::vector<std::unique_ptr<CSeqGroup> > groups_vector_type;
761public:
763 {}
765
766 void add_group(CSeqGroup* sg) { m_seq_groups.emplace_back(sg); }
767
768 /// memebers moved into their own group
769 void take_group(bm::bvector<>& bv_members);
770
771 /// Acquire all groups from another cluster collection
772 ///
773 void merge_from(CSeqClusters& sc);
774
775 /// Remove groups which turned empty after clusterization
776 void clear_empty_groups();
777
778 /// Compute union of all cluster group members
780
781 /// Resolve duplicate membership between groups
782 ///
783 void resolve_duplicates(const CSequenceColl& seq_coll);
784
785 /// Find the best representatives in all cluster groups
786 /// the criteria is maximum absolute similarity to all members
787 ///
788 void elect_leaders(const CSequenceColl& seq_coll,
789 unsigned concurrency);
790
791 /// calculate avg cluster population count
793
794
795 size_t groups_size() const { return m_seq_groups.size(); }
796 CSeqGroup* get_group(size_t idx) { return m_seq_groups[idx].get(); }
797
798 /// print clusterization report
799 void print_summary(const char* title) const;
800
801private:
802 bm::bvector<> m_all_members; ///< Union of all group members
803 groups_vector_type m_seq_groups; ///< vector of all formed clusters
804 bm::aggregator<bvector_type> agg; ///< fast aggregator for set UNION ops
805};
806
808{
809 for (groups_vector_type::iterator it = m_seq_groups.begin();
810 it != m_seq_groups.end(); )
811 {
812 CSeqGroup* sg = it->get();
813 bm::bvector<>& bv_mem = sg->get_members();
814 auto cnt = bv_mem.count();
815 if (cnt < 2) // practically empty group
816 it = m_seq_groups.erase(it);
817 else
818 ++it;
819 } // for
820}
821
823{
824 bm::id64_t lead_id = bv_members.get_first();
825 CSeqGroup* sg = new CSeqGroup(lead_id);
826 sg->get_members().swap(bv_members); // move members to the cluster
827 add_group(sg);
828}
829
831{
832 agg.set_optimization();
833 m_all_members.clear();
834 for (groups_vector_type::const_iterator it = m_seq_groups.begin();
835 it != m_seq_groups.end(); ++it)
836 {
837 const CSeqGroup* sg = it->get();
838 agg.add(&sg->get_members());
839 } // for
840
841 agg.combine_or(m_all_members); // run UNION of all member vectors
842 agg.reset();
843
844 return m_all_members;
845}
846
848{
849 bm::id64_t cnt = 0;
850 for (groups_vector_type::const_iterator it = m_seq_groups.begin();
851 it != m_seq_groups.end(); ++it)
852 {
853 const CSeqGroup* sg = it->get();
854 cnt += sg->get_members().count();
855 }
856 cnt = cnt / m_seq_groups.size();
857 return cnt;
858}
859
860
862{
863 for (auto it = sc.m_seq_groups.begin(); it != sc.m_seq_groups.end(); ++it)
864 m_seq_groups.emplace_back(it->release());
865 sc.m_seq_groups.clear();
866}
867
868
869void resolve_duplicates(CSeqGroup& seq_group1,
870 CSeqGroup& seq_group2,
871 const CSequenceColl& seq_coll);
872
874{
875 for (size_t i = 0; i < m_seq_groups.size(); ++i)
876 {
877 CSeqGroup* sg1 = m_seq_groups[i].get();
878 for (size_t j = 0; j < i; ++j)
879 {
880 CSeqGroup* sg2 = m_seq_groups[j].get();
881 // resolve pairwise conflicts between cluster groups
882 ::resolve_duplicates(*sg1, *sg2, seq_coll);
883 } // for j
884 } // for i
885}
886
887/// Compute similarity distances for one row/vector (1:N) of distance matrix
888///
889static
891 unsigned* row,
892 const bm::bvector<>* bv_i,
893 size_t i,
894 const std::vector<std::unique_ptr<bvector_type> >& k_mers_vect)
895{
896 size_t j;
897 for (j = 0; j < i; ++j)
898 {
899 const bm::bvector<>* bv_j = k_mers_vect[j].get();
900 bm::id64_t and_cnt = bm::count_and(*bv_i, *bv_j);
901 row[j] = unsigned(and_cnt);
902 }
903 auto cnt = bv_i->count();
904 row[j] = unsigned(cnt);
905}
906
907/// Compute similarity distances matrix (COUNT(AND(a, b))
908///
909static
911 const CSequenceColl& seq_coll,
912 const bm::bvector<>& bv_mem,
913 bm::bvector<>::size_type bv_mem_cnt,
914 unsigned concurrency)
915{
916 if (concurrency < 1)
917 concurrency = 1;
918 auto N = bv_mem_cnt;
919
920 const unsigned k_max_electors = 500;
921 bm::bvector<> bv_sub; // subset vector
922 if (N > k_max_electors) // TODO: parameterize the
923 {
925 rsub.sample(bv_sub, bv_mem, k_max_electors); // pick random sunset
926 }
927
928
929 bvector_ptr_vector_type k_mers_vect;
930
931 // materialize list of vectors used in distance calculation
932 //
933 seq_coll.deserialize_k_mers(k_mers_vect, bv_mem, N);
934
935 std::list<std::future<void> > futures;
936 unsigned parallel_cnt = 0; // number of jobs in flight at a time
937
938 size_t i;
939 for (i = 0; i < N; ++i)
940 {
941 const bm::bvector<>* bv_i = k_mers_vect[i].get();
942 unsigned* row = dm.row(i);
943 do
944 {
945 if (parallel_cnt < concurrency)
946 {
947 futures.emplace_back(
948 std::async(std::launch::async,
949 [row, bv_i, i, &k_mers_vect]()
950 { compute_and_sim_row(row, bv_i, i, k_mers_vect); }
951 ));
952 ++parallel_cnt;
953 break;
954 }
955 else
956 {
957 // wait for an async() slot to open (overbooking control)
958 ::wait_for_slot(futures, &parallel_cnt, concurrency);
959 }
960 } while(1);
961 } // for i
962
963 // sync point
964 for (auto& e : futures)
965 e.wait();
966
967 dm.replicate_triange(); // copy to full simetrical distances from triangle
968}
969
970/// Compute union (Universe) of all k-mers in the cluster group
971/// Implemented as a OR of all k-mer fingerprints
972static
974 const CSequenceColl& seq_coll)
975{
977
978 bm::bvector<>& bv_kmer_union = seq_group.get_kmer_union();
979 bv_kmer_union.clear();
980
981 bm::bvector<>& bv_all_members = seq_group.get_members();
982 for(bm::bvector<>::enumerator en(bv_all_members) ;en.valid(); ++en)
983 {
984 auto idx = *en;
985 const unsigned char* buf = seq_coll.get_buf(idx);
986 if (!buf)
987 continue;
988
989 bm::deserialize(bv_kmer_union, buf, tb); // deserialize is an OR operation
990 } // for en
991 bv_kmer_union.optimize(tb);
992}
993
995 unsigned concurrency)
996{
999
1000 std::list<std::future<void> > futures;
1001
1002 for (size_t k = 0; k < m_seq_groups.size(); ++k)
1003 {
1004 CSeqGroup* sg = m_seq_groups[k].get();
1005 bm::bvector<>& bv_all_members = sg->get_members();
1006 bv_all_members.set(sg->get_lead());
1007 auto N = bv_all_members.count();
1008 auto all_members_count = N; (void) all_members_count;
1009
1010 // determine the size of the "electoral colledge" on available concurrency
1011 unsigned k_max_electors = 200 * unsigned(log2(concurrency));
1012 if (k_max_electors < 500)
1013 k_max_electors = 500;
1014
1015 const bm::bvector<>* bv_mem = &bv_all_members; // vector of electors
1016 bm::bvector<> bv_sub; // subset vector
1017 if (N > k_max_electors) // TODO: parameterize the
1018 {
1019 // pick a random sub-set as the "electoral colledge"
1020 rsub.sample(bv_sub, bv_all_members, k_max_electors);
1021 bv_sub.set(sg->get_lead()); // current leader always takes part
1022 bv_mem = &bv_sub;
1023 N = bv_sub.count();
1024 }
1025
1026 // NxN distance matrix between members
1027 //
1028 distance_matrix_type dm(N, N);
1029 dm.init();
1030 dm.set_zero();
1031
1032 // compute triangular distance matrix
1033 //
1034 compute_and_sim(dm, seq_coll, *bv_mem, N, concurrency);
1035
1036 // leader election based on maximum similarity to other cluster
1037 // elements
1038 //
1039 bm::id64_t best_score = 0;
1040 bm::id64_t leader_idx = sg->get_lead();
1041 assert(bv_mem->test(leader_idx));
1042 auto rank = bv_mem->count_range(0, leader_idx);
1043 assert(rank);
1044 dm.sum(best_score, rank-1); // use sum or all similarities as a score for the leader
1045 bm::id64_t old_leader_idx = leader_idx;
1046
1047 bm::bvector<>::enumerator en(bv_mem->first());
1048 for (size_t i = 0; en.valid(); ++en, ++i)
1049 {
1050 bm::id64_t cand_score;
1051 dm.sum(cand_score, i); // use sum or all similarities as a score
1052 if (cand_score > best_score) // better candidate for a leader
1053 {
1054 best_score = cand_score;
1055 leader_idx = *en;
1056 }
1057 } // for en
1058
1059 if (leader_idx != old_leader_idx) // found a new leader
1060 {
1061 sg->set_lead(leader_idx);
1062 const unsigned char* buf = seq_coll.get_buf(leader_idx);
1063 assert(buf);
1064 bm::bvector<>* bv = &sg->get_rep();
1065
1066 // async replace the leader representative k-mer vector
1067 //
1068 futures.emplace_back(
1069 std::async(std::launch::async,
1070 [bv, buf]() { bv->clear(); bm::deserialize(*bv, buf); }
1071 ));
1072 // re-compute k-mer UNION of all vectors
1073 futures.emplace_back(
1074 std::async(std::launch::async,
1075 [sg, &seq_coll]() { compute_seq_group_union(*sg, seq_coll); }
1076 ));
1077 }
1078
1079 } // for i
1080
1081 for (auto& e : futures) // wait for all forked tasks
1082 e.wait();
1083
1084}
1085
1086
1087void CSeqClusters::print_summary(const char* title) const
1088{
1089 cout << title << endl;
1090 for (size_t i = 0; i < m_seq_groups.size(); ++i)
1091 {
1092 const CSeqGroup* sg = m_seq_groups[i].get();
1093 const bm::bvector<>& bv_mem = sg->get_members();
1094 cout << sg->get_lead() << ": "
1095 << bv_mem.count() << endl;
1096 }
1097 cout << "-----------\nTotal: " << m_seq_groups.size() << endl << endl;
1098}
1099
1100///
1101static
1102void compute_group(CSeqGroup& seq_group,
1103 const CSequenceColl& seq_coll,
1104 const bm::bvector<>& bv_exceptions,
1105 float similarity_cut_off)
1106{
1107 assert(similarity_cut_off < 1);
1108 assert(seq_group.is_assigned());
1109
1110 auto sz = seq_coll.buf_size();
1111 size_t lead_id = seq_group.get_lead();
1112 if (lead_id >= sz)
1113 return;
1114
1115 const unsigned char* buf = seq_coll.get_buf(lead_id);
1116 if (!buf)
1117 return;
1118
1119 bm::bvector<>& bv = seq_group.get_rep();
1120 bm::deserialize(bv, buf);
1121
1122 auto i_cnt = bv.count();
1123 // approximate number of k-mers we consider similar
1124 float similarity_target = float(i_cnt * float(similarity_cut_off));
1125
1126
1128
1129 bool found = false;
1130 for (size_t i = 0; i < sz; ++i)
1131 {
1132 bool is_except = bv_exceptions.test(i);
1133 if (is_except)
1134 continue;
1135 buf = seq_coll.get_buf(i);
1136 if (!buf)
1137 continue;
1138 // constant deserializer AND just to count the product
1139 // without actual deserialization (from the compressed BLOB)
1140 //
1141 bm::id64_t and_cnt = od.deserialize(bv, buf, 0, bm::set_COUNT_AND);
1142
1143 if (and_cnt && (float(and_cnt) > similarity_target)) // similar enough to be a candidate
1144 {
1145 seq_group.add_member(i);
1146 found = true;
1147 }
1148 } // for i
1149
1150 if (!found)
1151 seq_group.clear_member(lead_id);
1152}
1153
1154/// Resolve duplicate members between two groups
1156 CSeqGroup& seq_group2,
1157 const CSequenceColl& seq_coll)
1158{
1159 if (&seq_group1 == & seq_group2) // self check
1160 return;
1161
1162 bm::bvector<>& bv1 = seq_group1.get_members();
1163 bm::bvector<>& bv2 = seq_group2.get_members();
1164
1165 // build intersect between group members
1166 bm::bvector<> bv_and;
1167 bv_and.bit_and(bv1, bv2, bm::bvector<>::opt_none);
1168
1169 if (bv_and.any()) // double membership detected
1170 {
1171 bm::bvector<>& bv_rep1 = seq_group1.get_rep();
1172 bm::bvector<>& bv_rep2 = seq_group2.get_rep();
1173
1175
1176 // evaluate each double member for best membership placement
1177 //
1178 for (bm::bvector<>::enumerator en(bv_and); en.valid(); ++en)
1179 {
1180 auto idx = *en;
1181 auto lead_idx1 = seq_group1.get_lead();
1182 auto lead_idx2 = seq_group2.get_lead();
1183 assert(lead_idx1 != lead_idx2);
1184
1185 if (idx == lead_idx1)
1186 {
1187 seq_group2.clear_member(idx);
1188 continue;
1189 }
1190 if (idx == lead_idx2)
1191 {
1192 seq_group1.clear_member(idx);
1193 continue;
1194 }
1195
1196 const unsigned char* buf = seq_coll.get_buf(idx);
1197 assert(buf);
1198 // resolve conflict based on max.absolute similarity
1199 //
1200 bm::id64_t and_cnt1 = od.deserialize(bv_rep1, buf, 0, bm::set_COUNT_AND);
1201 bm::id64_t and_cnt2 = od.deserialize(bv_rep2, buf, 0, bm::set_COUNT_AND);
1202 if (and_cnt1 >= and_cnt2)
1203 seq_group2.clear_member(idx);
1204 else
1205 seq_group1.clear_member(idx);
1206
1207 } // for
1208 }
1209}
1210
1211/// Utility class to accumulate cahnges to cluster
1212/// before commiting it (mutex syncronous operation)
1213///
1215{
1216 CKMerAcc(size_t sz)
1217 : bv_members(sz), bv_kmers(sz)
1218 {}
1219
1220 void add(size_t cluster_id,
1222 {
1223 bm::bvector<>* bv_m = bv_members[cluster_id].get();
1224 bm::bvector<>* bv_k = bv_kmers[cluster_id].get();
1225 if (!bv_m)
1226 {
1227 assert(!bv_kmers[cluster_id].get());
1228 bv_members[cluster_id].reset(new bm::bvector<>(bm::BM_GAP));
1229 bv_kmers[cluster_id].reset(new bm::bvector<>(bm::BM_GAP));
1230 bv_m = bv_members[cluster_id].get();
1231 bv_k = bv_kmers[cluster_id].get();
1232 }
1233 bv_m->set(m_id);
1234 bv_k->merge(bv_kmer);
1235 }
1236
1239};
1240
1241/// Compute AND similarity to all available clusters assign to the most similar
1242/// using cluster representative
1243///
1244static
1246 const CSequenceColl& seq_coll,
1247 const bm::bvector<>& bv_seq_ids,
1248 bm::bvector<>::size_type seq_id_from,
1249 bm::bvector<>::size_type seq_id_to)
1250{
1253
1254 CKMerAcc acc(cluster_groups.groups_size());
1255
1256 bm::bvector<>::enumerator en(bv_seq_ids);
1257 en.go_to(seq_id_from);
1258
1259 for ( ;en.valid(); ++en)
1260 {
1261 auto seq_id = *en;
1262 if (seq_id > seq_id_to)
1263 break;
1264 const unsigned char* buf = seq_coll.get_buf(seq_id);
1265 if (!buf)
1266 continue;
1267
1268 bm::bvector<> bv_k_mer;
1269 bv_k_mer.set_allocator_pool(&pool); // for faster memory recycling
1270
1271 bm::deserialize(bv_k_mer, buf, tb);
1272
1273 bm::bvector<>::size_type best_score(0);
1274 size_t cluster_idx(~0ull);
1275
1276 // analyse candidate's similarity to all clusters via representative
1277 //
1278 for (size_t i = 0; i < cluster_groups.groups_size(); ++i)
1279 {
1280 CSeqGroup* sg = cluster_groups.get_group(i);
1281 // - COUNT(AND) similarity to the representative of each cluster
1282 //
1283 bm::bvector<>& bv_rep_k_mer = sg->get_rep();
1284 auto rep_and_cnt = bm::count_and(bv_k_mer, bv_rep_k_mer);
1285 if (rep_and_cnt > best_score)
1286 {
1287 cluster_idx = i; best_score = rep_and_cnt;
1288 }
1289 } // for i
1290
1291 if (cluster_idx != ~0ull) // cluster association via representative
1292 {
1293 acc.add(cluster_idx, seq_id, bv_k_mer);
1294 }
1295 } // for all seq-ids in the range
1296
1297 // merge all accumulated cluster assignmnets all at once
1298 //
1299 for (size_t i = 0; i < cluster_groups.groups_size(); ++i)
1300 {
1301 bm::bvector<>* bv_m = acc.bv_members[i].get();
1302 if (!bv_m)
1303 continue;
1304 bm::bvector<>* bv_k = acc.bv_kmers[i].get();
1305 CSeqGroup* sg = cluster_groups.get_group(i);
1306 sg->merge_member_sync(*bv_m, *bv_k);
1307 } // for
1308
1309}
1310
1311/// Compute AND similarity to all available clusters assign to the most similar
1312/// using UNION of k-mers in the cluster
1313/// This is a more relaxed assignmnet, used when representative does not work
1314static
1316 const CSequenceColl& seq_coll,
1317 const bm::bvector<>& bv_seq_ids,
1318 bm::bvector<>::size_type seq_id_from,
1319 bm::bvector<>::size_type seq_id_to)
1320{
1323
1324 bm::bvector<>::enumerator en(bv_seq_ids);
1325 en.go_to(seq_id_from);
1326
1327 for ( ;en.valid(); ++en)
1328 {
1329 auto seq_id = *en;
1330 if (seq_id > seq_id_to)
1331 break;
1332 const unsigned char* buf = seq_coll.get_buf(seq_id);
1333 if (!buf)
1334 continue;
1335
1336 bm::bvector<> bv_k_mer;
1337 bv_k_mer.set_allocator_pool(&pool); // for faster memory recycling
1338
1339 bm::deserialize(bv_k_mer, buf, tb);
1340
1341 bm::bvector<>::size_type best_score(0);
1342 size_t cluster_idx(~0ull);
1343
1344 {
1345 // try to extend search using UNION of all cluster k-mers
1346 //
1347 best_score = 0;
1348 for (size_t i = 0; i < cluster_groups.groups_size(); ++i)
1349 {
1350 CSeqGroup* sg = cluster_groups.get_group(i);
1351 // - COUNT(AND) similarity to the representative of each cluster
1352 //
1353 bm::id64_t uni_and_cnt = sg->count_and_union_sync(bv_k_mer);
1354 if (uni_and_cnt > best_score)
1355 {
1356 cluster_idx = i; best_score = uni_and_cnt;
1357 }
1358 } // for i
1359 if (cluster_idx != ~0ull) // cluster association via representative
1360 {
1361 CSeqGroup* sg = cluster_groups.get_group(cluster_idx);
1362 sg->add_member_sync(seq_id, bv_k_mer);
1363 }
1364 }
1365 } // for all seq-ids in the range
1366
1367}
1368
1369
1370/// Pick random sequences as cluster seed elements, try attach
1371/// initial sequences based on weighted similarity measure
1372///
1373static
1375 const CSequenceColl& seq_coll,
1376 const bm::bvector<>& bv_total,
1378 unsigned num_clust,
1379 float similarity_cut_off,
1380 unsigned concurrency)
1381{
1382 bm::bvector<> bv_rsub; // random subset of sequences
1383 rsub.sample(bv_rsub, bv_total, num_clust); // pick random sequences as seeds
1384
1385 std::list<std::future<void> > futures;
1386 unsigned parallel_cnt = 0;
1387
1388 for (bm::bvector<>::enumerator en=bv_rsub.first(); en.valid(); ++en)
1389 {
1390 auto idx = *en;
1391 CSeqGroup* sg = new CSeqGroup(idx);
1392 cluster_groups.add_group(sg);
1393
1394 do
1395 {
1396 if (parallel_cnt < concurrency)
1397 {
1398 futures.emplace_back(
1399 std::async(std::launch::async,
1400 [&seq_coll, sg, &bv_rsub, similarity_cut_off]()
1401 { compute_group(*sg, seq_coll, bv_rsub, similarity_cut_off); }
1402 ));
1403 ++parallel_cnt;
1404 break;
1405 }
1406 else
1407 {
1408 // wait for an async() slot to open
1409 ::wait_for_slot(futures, &parallel_cnt, concurrency);
1410 }
1411 } while(1);
1412 } // for en
1413
1414 // wait for completion of initial cluster group formation
1415 for (auto& e : futures)
1416 e.wait();
1417}
1418
1419static
1421 const CSequenceColl& seq_coll,
1422 unsigned num_clust,
1423 float similarity_cut_off,
1424 unsigned concurrency)
1425{
1426 assert(similarity_cut_off < 1);
1427 if (!seq_coll.buf_size())
1428 return; // nothing to do
1429
1430 bm::random_subset<bm::bvector<> > rsub; // sub-set getter
1431
1432 bm::bvector<> bv_total;
1433 bv_total.set_range(0, seq_coll.buf_size());
1434
1435 bm::id64_t rcount = 0;
1436
1437 const unsigned max_pass = 3;
1438 for (unsigned pass = 0; pass < max_pass; ++pass)
1439 {
1440 CSeqClusters cluster_groups;
1441
1442 compute_random_clusters(cluster_groups, seq_coll, bv_total, rsub,
1443 num_clust, similarity_cut_off, concurrency);
1444
1445 // remove possible empty clusters (inital seeds were picked at random)
1446 //
1447 cluster_groups.clear_empty_groups();
1448
1449 cluster_groups.resolve_duplicates(seq_coll);
1450
1451 cluster_groups.clear_empty_groups();
1452
1453 // print summary after the initial formation of cluster groups
1454 //
1455 cluster_groups.print_summary("Inital cluster formations:");
1456
1457 // re-elect representatives
1458 cluster_groups.elect_leaders(seq_coll, concurrency);
1459
1460 cluster_groups.print_summary("After lead re-election:");
1461
1462 // sub-set of sequence ids already distributed into clusters
1463 bm::bvector<>::size_type total_count = bv_total.count();
1464 {
1465 cout << " total = " << total_count << endl;
1466 const bm::bvector<>& bv_clust = cluster_groups.union_all_groups();
1467 cout << " clustered = " << bv_clust.count() << endl;
1468
1469 bv_total -= bv_clust; // exlude all already clustered
1470 total_count = bv_total.count();
1471 cout << " remain = " << total_count << endl;
1472 }
1473
1474 if (!total_count)
1475 break;
1476
1477 std::list<std::future<void> > futures;
1478
1479 // run split algorithm to determine approximately equal ranges
1480 // for parallel processing
1481 bv_ranges_vector pair_vect;
1482 bm::bvector<>::size_type split_rank = total_count / concurrency; // target population count per job
1483 bm::rank_range_split(bv_total, split_rank, pair_vect);
1484 assert(pair_vect.size());
1485 for (size_t k = 0; k < pair_vect.size(); ++k)
1486 {
1487 auto seq_id_from = pair_vect[k].first;
1488 auto seq_id_to = pair_vect[k].second;
1489 futures.emplace_back(
1490 std::async(std::launch::async,
1491 [&cluster_groups, &seq_coll, &bv_total, seq_id_from, seq_id_to]()
1492 { assign_to_best_cluster(cluster_groups, seq_coll, bv_total, seq_id_from, seq_id_to); }
1493 ));
1494 }
1495 for (auto& e : futures) // sync point
1496 e.wait();
1497
1498 cluster_groups.print_summary("Clusters after phase 2 recruitment");
1499
1500 // check if there are sequences not yet belonging to any cluster
1501 {
1502 const bm::bvector<>& bv_clust = cluster_groups.union_all_groups();
1503 bv_total -= bv_clust; // exlude all already clustered
1504 rcount = bv_total.count();
1505 if (rcount)
1506 {
1507 cout << "Undistributed sequences = " << rcount << endl;
1508 }
1509 else
1510 {
1511 seq_clusters.merge_from(cluster_groups);
1512 break;
1513 }
1514 }
1515 seq_clusters.merge_from(cluster_groups);
1516 bm::id64_t avg_group_count = seq_clusters.compute_avg_count();
1517 if (rcount < avg_group_count) // not worth another pass ?
1518 {
1519 seq_clusters.take_group(bv_total);
1520 break;
1521 }
1522
1523 cout << "PASS=" << (pass+1) << endl << endl;
1524 } // for pass
1525
1526 // try to assign to the global pool of clusters using UNION
1527 // which relaxes assignmnet
1528
1529 if (rcount)
1530 {
1531 {
1532 const bm::bvector<>& bv_clust = seq_clusters.union_all_groups();
1533 cout << endl << " clustered = " << bv_clust.count() << endl;
1534
1535 bv_total -= bv_clust; // exlude all already clustered
1536 rcount = bv_total.count();
1537 cout << " remain = " << rcount << endl;
1538 }
1539
1540 if (rcount)
1541 {
1542 bv_ranges_vector pair_vect;
1543 bm::bvector<>::size_type split_rank = rcount / concurrency;
1544 bm::rank_range_split(bv_total, split_rank, pair_vect);
1545
1546 std::list<std::future<void> > futures;
1547
1548 for (size_t k = 0; k < pair_vect.size(); ++k)
1549 {
1550 auto seq_id_from = pair_vect[k].first;
1551 auto seq_id_to = pair_vect[k].second;
1552 futures.emplace_back(
1553 std::async(std::launch::async,
1554 [&seq_clusters, &seq_coll, &bv_total, seq_id_from, seq_id_to]()
1555 { assign_to_best_cluster_union(seq_clusters, seq_coll, bv_total, seq_id_from, seq_id_to); }
1556 ));
1557 }
1558 for (auto& e : futures) // sync point
1559 e.wait();
1560 {
1561 const bm::bvector<>& bv_clust = seq_clusters.union_all_groups();
1562 cout << endl << " clustered = " << bv_clust.count() << endl;
1563
1564 bv_total -= bv_clust; // exlude all already clustered
1565 rcount = bv_total.count();
1566 cout << " remain = " << rcount << endl;
1567 }
1568 }
1569 }
1570
1571 if (rcount)
1572 {
1573 seq_clusters.take_group(bv_total);
1574 }
1575
1576 seq_clusters.clear_empty_groups();
1577 seq_clusters.resolve_duplicates(seq_coll);
1578 seq_clusters.clear_empty_groups();
1579
1580 seq_clusters.print_summary("Final clusters summary:");
1581}
1582
1583
1584int main(int argc, char *argv[])
1585{
1586 CSequenceColl seq_coll;
1587
1588 try
1589 {
1590 auto ret = parse_args(argc, argv);
1591 if (ret != 0)
1592 {
1593 cerr << "cmd-line parse error. " << endl;
1594 return ret;
1595 }
1596
1597 if (!ifa_name.empty()) // FASTA file load
1598 {
1599 bm::chrono_taker tt1(cout, "1. Load FASTA", 1, &timing_map);
1600
1601 // limitation: loads a single molecule only
1602 //
1603 auto res = load_FASTA(ifa_name, seq_coll);
1604 if (res != 0)
1605 return res;
1606 }
1607
1608 cout << "Sequences size = " << seq_coll.size() << endl;
1609
1610 if (ik_size && !ifa_name.empty())
1611 {
1612 {
1613 bm::chrono_taker tt1(cout, "2. Generate k-mers", 1, &timing_map);
1614 seq_coll.sync_buffers_size();
1616 }
1617
1618 if (!ikd_name.empty())
1619 {
1620 bm::chrono_taker tt1(cout, "3. Save k-mers", 1, &timing_map);
1621 save_kmer_buffers(ikd_name, seq_coll);
1622 }
1623 }
1624
1625 if (ik_size && ifa_name.empty() && !ikd_name.empty())
1626 {
1627 {
1628 bm::chrono_taker tt1(cout, "4. Load k-mers", 1, &timing_map);
1629 load_kmer_buffers(ikd_name, seq_coll);
1630 }
1631
1632 if (seq_coll.buf_size())
1633 {
1634 CSeqClusters seq_clusters;
1635 bm::chrono_taker tt1(cout, "5. k-mer similarity clustering", 1, &timing_map);
1636 compute_jaccard_clusters(seq_clusters, seq_coll, 10, 0.05f, parallel_jobs);
1637 }
1638 }
1639
1640
1641 if (is_timing)
1642 {
1643 std::cout << std::endl << "Performance:" << std::endl;
1645 }
1646
1647 }
1648 catch(std::exception& ex)
1649 {
1650 std::cerr << ex.what() << std::endl;
1651 return 1;
1652 }
1653
1654
1655
1656 return 0;
1657}
1658
#define BM_DECLARE_TEMP_BLOCK(x)
Definition: bm.h:47
Algorithms for fast aggregation of N bvectors.
Algorithms for bvector<> (main include)
Generation of random subset.
Serialization / compression of bvector<>. Set theoretical operations on compressed BLOBs.
Algorithms for bm::sparse_vector.
Compressed sparse container rsc_sparse_vector<> for integer types.
Timing utilities for benchmarking (internal)
pre-processor un-defines to avoid global space pollution (internal)
void merge_from(CSeqClusters &sc)
Acquire all groups from another cluster collection.
Definition: xsample07a.cpp:861
size_t groups_size() const
Definition: xsample07a.cpp:795
void elect_leaders(const CSequenceColl &seq_coll, unsigned concurrency)
Find the best representatives in all cluster groups the criteria is maximum absolute similarity to al...
Definition: xsample07a.cpp:994
void resolve_duplicates(const CSequenceColl &seq_coll)
Resolve duplicate membership between groups.
Definition: xsample07a.cpp:873
CSeqClusters(const CSeqClusters &)=delete
void clear_empty_groups()
Remove groups which turned empty after clusterization.
Definition: xsample07a.cpp:807
CSeqGroup * get_group(size_t idx)
Definition: xsample07a.cpp:796
void print_summary(const char *title) const
print clusterization report
void add_group(CSeqGroup *sg)
Definition: xsample07a.cpp:766
std::vector< std::unique_ptr< CSeqGroup > > groups_vector_type
Definition: xsample07a.cpp:760
const bm::bvector & union_all_groups()
Compute union of all cluster group members.
Definition: xsample07a.cpp:830
bm::id64_t compute_avg_count() const
calculate avg cluster population count
Definition: xsample07a.cpp:847
void take_group(bm::bvector<> &bv_members)
memebers moved into their own group
Definition: xsample07a.cpp:822
Group (clustrer) of sequences.
Definition: xsample07a.cpp:687
const bm::bvector & get_members() const
Definition: xsample07a.cpp:736
void add_member(bm::id64_t id, const bm::bvector<> &bv_kmer)
Definition: xsample07a.cpp:706
bm::id64_t count_and_union_sync(const bm::bvector<> &bv)
Definition: xsample07a.cpp:725
const bm::bvector & get_rep() const
Definition: xsample07a.cpp:734
bm::bvector & get_kmer_union()
Definition: xsample07a.cpp:739
CSeqGroup(bm::id64_t lead_id=~0ull)
Definition: xsample07a.cpp:689
const bm::bvector & get_kmer_union() const
Definition: xsample07a.cpp:740
void add_member_sync(bm::id64_t id, const bm::bvector<> &bv_kmer)
Definition: xsample07a.cpp:711
void set_lead(bm::id64_t lead_id)
set id for the group representative
Definition: xsample07a.cpp:696
bm::bvector & get_members()
Definition: xsample07a.cpp:737
bm::bvector & get_rep()
Definition: xsample07a.cpp:733
void add_member(bm::id64_t id)
add a member to the group
Definition: xsample07a.cpp:705
void merge_member_sync(bm::bvector<> &bv_seq, bm::bvector<> &bv_kmer)
Definition: xsample07a.cpp:718
bool is_assigned()
check is cluster is non-empty
Definition: xsample07a.cpp:702
bm::id64_t get_lead() const
Get lead id.
Definition: xsample07a.cpp:699
void clear_member(bm::id64_t id)
Definition: xsample07a.cpp:731
Collection of sequences and k-mer fingerprint vectors.
Definition: xsample07a.cpp:131
void add_sequence(const string &acc, vector_char_type *seq_ptr)
Definition: xsample07a.cpp:140
CSequenceColl(const CSequenceColl &)=delete
void deserialize_k_mers(bvector_ptr_vector_type &k_mers_vect, const bm::bvector<> &bv_req, bm::bvector<>::size_type bv_req_count) const
Deserialize group of k-mer fingerprint vectors.
Definition: xsample07a.cpp:208
size_t total_seq_size() const
Definition: xsample07a.cpp:171
std::vector< unsigned char > buffer_type
Definition: xsample07a.cpp:133
const vector_char_type & get_sequence(size_t i) const
Definition: xsample07a.cpp:167
size_t get_buf_size(size_t i) const
Get k-mer vector BLOB size.
Definition: xsample07a.cpp:183
size_t size() const
Definition: xsample07a.cpp:163
size_t buf_size() const
Definition: xsample07a.cpp:180
size_t seq_size(size_t i) const
Definition: xsample07a.cpp:169
const string & get_acc(size_t i) const
Definition: xsample07a.cpp:166
void sync_buffers_size()
Definition: xsample07a.cpp:158
void set_buffer(size_t i, const buffer_type &buf)
Definition: xsample07a.cpp:146
const unsigned char * get_buf(size_t i) const
Get k-mer BLOB pointer.
Definition: xsample07a.cpp:186
void reset()
Reset aggregate groups, forget all attached vectors.
Definition: bmaggregator.h:932
void combine_or(bvector_type &bv_target)
Aggregate added group of vectors using logical OR Operation does NOT perform an explicit reset of arg...
Definition: bmaggregator.h:994
void set_optimization(typename bvector_type::optmode opt=bvector_type::opt_compress) BMNOEXCEPT
set on-the-fly bit-block compression By default aggregator does not try to optimize result,...
Definition: bmaggregator.h:359
size_t add(const bvector_type *bv, unsigned agr_group=0)
Attach source bit-vector to a argument group (0 or 1).
Definition: bmaggregator.h:986
Constant iterator designed to enumerate "ON" bits.
Definition: bm.h:603
bool go_to(size_type pos) BMNOEXCEPT
go to a specific position in the bit-vector (or next)
Definition: bm.h:7971
bool valid() const BMNOEXCEPT
Checks if iterator is still valid.
Definition: bm.h:283
Bitvector Bit-vector container with runtime compression of bits.
Definition: bm.h:115
bool test(size_type n) const BMNOEXCEPT
returns true if bit n is set and false is bit n is 0.
Definition: bm.h:1480
void merge(bm::bvector< Alloc > &bvect)
Merge/move content from another vector.
Definition: bm.h:5578
allocator_type::allocator_pool_type allocator_pool_type
Definition: bm.h:118
size_type size() const BMNOEXCEPT
Returns bvector's capacity (number of bits it can store)
Definition: bm.h:1278
size_type count() const BMNOEXCEPT
population count (count of ON bits)
Definition: bm.h:2366
bm::bvector< Alloc > & bit_and(const bm::bvector< Alloc > &bv1, const bm::bvector< Alloc > &bv2, typename bm::bvector< Alloc >::optmode opt_mode=opt_none)
3-operand AND : this := bv1 AND bv2
Definition: bm.h:5880
bvector< Alloc > & set(size_type n, bool val=true)
Sets bit n if val is true, clears bit n if val is false.
Definition: bm.h:4153
void set_allocator_pool(allocator_pool_type *pool_ptr) BMNOEXCEPT
Set allocator pool for local (non-th readed) memory cyclic(lots of alloc-free ops) opertations.
Definition: bm.h:1026
void optimize(bm::word_t *temp_block=0, optmode opt_mode=opt_compress, statistics *stat=0)
Optimize memory bitvector's memory allocation.
Definition: bm.h:3600
bool any() const BMNOEXCEPT
Returns true if any bits in this bitset are set, and otherwise returns false.
Definition: bm.h:2416
bvector_size_type size_type
Definition: bm.h:121
enumerator first() const
Returns enumerator pointing on the first non-zero bit.
Definition: bm.h:1849
void swap(bvector< Alloc > &bvect) BMNOEXCEPT
Exchanges content of bv and this bvector.
Definition: bm.h:3931
size_type count_range(size_type left, size_type right, const rs_index_type &rs_idx) const BMNOEXCEPT
Returns count of 1 bits in the given range [left..right] Uses rank-select index to accelerate the sea...
Definition: bm.h:3481
size_type get_first() const BMNOEXCEPT
find first 1 bit in vector. Function may return 0 and this requires an extra check if bit 0 is actual...
Definition: bm.h:1578
bvector< Alloc > & set_range(size_type left, size_type right, bool value=true)
Sets all bits in the specified closed interval [left,right] Interval must be inside the bvector's siz...
Definition: bm.h:2333
void clear(const size_type *ids, size_type ids_size, bm::sort_order so=bm::BM_UNKNOWN)
clear list of bits in this bitset
Definition: bm.h:4114
Alloc allocator_type
Definition: bm.h:117
void set_bit_no_check(size_type n)
Set bit without checking preconditions (size, etc)
Definition: bm.h:4405
Utility class to collect performance measurements and statistics.
Definition: bmtimer.h:41
std::map< std::string, statistics > duration_map_type
test name to duration map
Definition: bmtimer.h:66
static void print_duration_map(TOut &tout, const duration_map_type &dmap, format fmt=ct_time)
Definition: bmtimer.h:150
Deserializer, performs logical operations between bit-vector and serialized bit-vector.
Definition: bmserial.h:930
size_type deserialize(bvector_type &bv, const unsigned char *buf, set_operation op, bool exit_on_one=false)
Deserialize bvector using buffer as set operation argument.
Definition: bmserial.h:6579
void sample(BV &bv_out, const BV &bv_in, size_type sample_count)
Get random subset of input vector.
Definition: bmrandom.h:140
Bit-vector serialization class.
Definition: bmserial.h:76
void set_bookmarks(bool enable, unsigned bm_interval=256) BMNOEXCEPT
Add skip-markers to serialization BLOB for faster range decode at the expense of some BLOB size incre...
Definition: bmserial.h:1287
size_type serialize(const BV &bv, unsigned char *buf, size_t buf_size)
Bitvector serialization into memory block.
Definition: bmserial.h:2706
bm::alloc_pool_guard< allocator_pool_type, bvector< Alloc > > mem_pool_guard
Definition: bm.h:790
@ BM_SORTED
input set is sorted (ascending order)
Definition: bmconst.h:205
@ set_COUNT_AND
Definition: bmconst.h:174
@ BM_GAP
GAP compression is ON.
Definition: bmconst.h:147
size_t deserialize(BV &bv, const unsigned char *buf, bm::word_t *temp_block=0, const bm::bv_ref_vector< BV > *ref_vect=0)
Bitvector deserialization from a memory BLOB.
Definition: bmserial.h:3140
void rank_range_split(const BV &bv, typename BV::size_type rank, PairVect &target_v)
Algorithm to identify bit-vector ranges (splits) for the rank.
Definition: bmalgo.h:394
BV::size_type count_and(const BV &bv1, const BV &bv2) BMNOEXCEPT
Computes bitcount of AND operation of two bitsets.
Definition: bmalgo.h:49
Definition: bm.h:78
unsigned long long int id64_t
Definition: bmconst.h:35
std::vector< std::pair< bv_size_type, bv_size_type > > bv_ranges_vector
Definition: sample24.cpp:55
Utility class to accumulate cahnges to cluster before commiting it (mutex syncronous operation)
void add(size_t cluster_id, bm::bvector<>::size_type m_id, bm::bvector<> &bv_kmer)
bvector_ptr_vector_type bv_kmers
bvector_ptr_vector_type bv_members
CKMerAcc(size_t sz)
size_t max_serialize_mem
estimated maximum memory for serialization
Definition: bmfunc.h:61
Statistical information about bitset's memory allocation details.
Definition: bm.h:125
static int parse_args(int argc, char *argv[])
Definition: xsample03.cpp:110
std::vector< char > vector_char_type
Definition: xsample06.cpp:133
static void compute_jaccard_clusters(CSeqClusters &seq_clusters, const CSequenceColl &seq_coll, unsigned num_clust, float similarity_cut_off, unsigned concurrency)
static void assign_to_best_cluster(CSeqClusters &cluster_groups, const CSequenceColl &seq_coll, const bm::bvector<> &bv_seq_ids, bm::bvector<>::size_type seq_id_from, bm::bvector<>::size_type seq_id_to)
Compute AND similarity to all available clusters assign to the most similar using cluster representat...
int main(int argc, char *argv[])
bvector_type::size_type bv_size_type
Definition: xsample07a.cpp:98
static void generate_k_mers(CSequenceColl &seq_coll, unsigned k_size, size_t from, size_t to)
Definition: xsample07a.cpp:561
static void compute_random_clusters(CSeqClusters &cluster_groups, const CSequenceColl &seq_coll, const bm::bvector<> &bv_total, bm::random_subset< bvector_type > &rsub, unsigned num_clust, float similarity_cut_off, unsigned concurrency)
Pick random sequences as cluster seed elements, try attach initial sequences based on weighted simila...
std::string ikd_rep_name
Definition: xsample07a.cpp:79
void wait_for_slot(FV &futures, unsigned *parallel_cnt, unsigned concurrency)
wait for any opening in a list of futures used to schedule parallel tasks with CPU overbooking contro...
Definition: xsample07a.cpp:111
unsigned ik_size
Definition: xsample07a.cpp:84
void translate_kmer(std::string &dna, bm::id64_t kmer_code, unsigned k_size)
Translate k-mer code into ATGC DNA string.
Definition: xsample07a.cpp:444
static void save_kmer_buffers(const std::string &fname, const CSequenceColl &seq_coll)
save k-mer vectors to a file
Definition: xsample07a.cpp:294
bm::bvector bvector_type
Definition: xsample07a.cpp:94
bool is_bench
Definition: xsample07a.cpp:83
std::string kh_name
Definition: xsample07a.cpp:78
static int load_FASTA(const std::string &fname, CSequenceColl &seq_coll)
Load multi-sequence FASTA.
Definition: xsample07a.cpp:244
static void compute_and_sim_row(unsigned *row, const bm::bvector<> *bv_i, size_t i, const std::vector< std::unique_ptr< bvector_type > > &k_mers_vect)
Compute similarity distances for one row/vector (1:N) of distance matrix.
Definition: xsample07a.cpp:890
std::vector< std::pair< bv_size_type, bv_size_type > > bv_ranges_vector
Definition: xsample07a.cpp:99
static void load_kmer_buffers(const std::string &fname, CSequenceColl &seq_coll)
Load k-mer vectors.
Definition: xsample07a.cpp:332
bm::dynamic_heap_matrix< unsigned, bm::bvector<>::allocator_type > distance_matrix_type
Definition: xsample07a.cpp:96
std::string ikd_freq_name
Definition: xsample07a.cpp:80
std::string ikd_counts_name
Definition: xsample07a.cpp:77
std::string ifa_name
Definition: xsample07a.cpp:75
static void compute_group(CSeqGroup &seq_group, const CSequenceColl &seq_coll, const bm::bvector<> &bv_exceptions, float similarity_cut_off)
std::string ikd_name
Definition: xsample07a.cpp:76
bm::chrono_taker ::duration_map_type timing_map
Definition: xsample07a.cpp:104
bool get_kmer_code(const char *dna, size_t pos, unsigned k_size, bm::id64_t &k_mer)
Calculate k-mer as an unsigned long integer.
Definition: xsample07a.cpp:402
std::vector< char > vector_char_type
Definition: xsample07a.cpp:95
bool is_diag
Definition: xsample07a.cpp:81
char int2DNA(unsigned code)
Translate integer code to DNA letter.
Definition: xsample07a.cpp:429
void generate_k_mer_bvector(BV &bv, const vector_char_type &seq_vect, unsigned k_size, std::vector< bm::id64_t > &k_buf, const bm::id64_t chunk_size=400000000)
This function turns each k-mer into an integer number and encodes it in a bit-vector (presense vector...
Definition: xsample07a.cpp:474
static void assign_to_best_cluster_union(CSeqClusters &cluster_groups, const CSequenceColl &seq_coll, const bm::bvector<> &bv_seq_ids, bm::bvector<>::size_type seq_id_from, bm::bvector<>::size_type seq_id_to)
Compute AND similarity to all available clusters assign to the most similar using UNION of k-mers in ...
unsigned f_percent
Definition: xsample07a.cpp:86
static void generate_k_mers_parallel(CSequenceColl &seq_coll, unsigned k_size, unsigned concurrency)
Definition: xsample07a.cpp:616
unsigned parallel_jobs
Definition: xsample07a.cpp:85
bool get_DNA_code(char bp, bm::id64_t &dna_code)
Definition: xsample07a.cpp:374
bool is_timing
Definition: xsample07a.cpp:82
static void compute_seq_group_union(CSeqGroup &seq_group, const CSequenceColl &seq_coll)
Compute union (Universe) of all k-mers in the cluster group Implemented as a OR of all k-mer fingerpr...
Definition: xsample07a.cpp:973
void resolve_duplicates(CSeqGroup &seq_group1, CSeqGroup &seq_group2, const CSequenceColl &seq_coll)
Resolve duplicate members between two groups.
std::atomic_ullong k_mer_progress_count(0)
static void compute_and_sim(distance_matrix_type &dm, const CSequenceColl &seq_coll, const bm::bvector<> &bv_mem, bm::bvector<>::size_type bv_mem_cnt, unsigned concurrency)
Compute similarity distances matrix (COUNT(AND(a, b))
Definition: xsample07a.cpp:910
std::vector< std::unique_ptr< bvector_type > > bvector_ptr_vector_type
Definition: xsample07a.cpp:97