Program Listing for File multilikelihoodsource.cpp

Return to documentation for file (processors/multilikelihoodsource/multilikelihoodsource.cpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2021-now Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------

#include "multilikelihoodsource.hpp"
#include "utilities/string.hpp"

MultiLikelihoodSource::MultiLikelihoodSource(): IProcessor(){
    add_option("decoding size", delta_t_, "Time bin size for decoding (aka delta_t).");
}


void MultiLikelihoodSource::CreatePorts() {

  data_in_port_ = create_input_port<ColumnsType<double>>(
      ColumnsType<double>::Capabilities(ChannelRange(1, MAX_NCHANNELS),
                                        SampleRange(0, std::numeric_limits<uint32_t>::max()), true),
      PortInPolicy(SlotRange(0, MAX_NCHANNELS)));

  data_out_port_ = create_output_port<ColumnsType<double>>(
      "loglikelihood", ColumnsType<double>::Parameters({"logL"}, 0, true),
      PortOutPolicy(SlotRange(0, MAX_NCHANNELS)));

  to_decode_ = create_follower_state("to decode", true, Permission::READ);

  shared_likelihoods_ = create_follower_state("likelihoods",
                                              (std::map<std::string, std::shared_ptr<PoissonLikelihood>>*)nullptr,
                                              Permission::NONE);
}

void MultiLikelihoodSource::CompleteStreamInfo() {

  nslots_ = data_in_port_->number_of_slots();
  if (data_in_port_->number_of_slots() != data_out_port_->number_of_slots()) {
    auto err_msg = "Number of output slots (" +
                   std::to_string(data_out_port_->number_of_slots()) +
                   ") on port '" + data_out_port_->name() +
                   "' does not match number of input slots (" +
                   std::to_string(data_in_port_->number_of_slots()) +
                   ") on port '" + data_in_port_->name() + "'.";
    throw ProcessingStreamInfoError(err_msg, name());
  }

  for (slot_ = 0; slot_ < nslots_; ++slot_) {
      data_out_port_->streaminfo(slot_).set_stream_rate(1/delta_t_());
  }
}

void MultiLikelihoodSource::Prepare(GlobalContext &context){

    if( shared_likelihoods_->get() == nullptr){
        throw ProcessingPrepareError("The likelihoods state is not connected "
                                     "to an Encoding processor (used for load/update the model).", name());
    }

    grid_size_= shared_likelihoods_->get()->begin()->second->grid().size();
    for (slot_ = 0; slot_ < nslots_; ++slot_) {
        try {
             shared_likelihoods_->get()->at(data_in_port_->streaminfo(slot_).stream_name());
        } catch (const std::out_of_range& e) {
           throw ProcessingPrepareError("The decoding model does not contain this likelihood label: "
                                        + data_in_port_->streaminfo(slot_).stream_name());
        }
    }
}

void MultiLikelihoodSource::Process(ProcessingContext &context){
#pragma omp parallel
{
    ColumnsType<double>::Data *data_in = nullptr;
    ColumnsType<double>::Data *data_out = nullptr;
    bool should_break = false;

    while (!context.terminated() and !should_break) {
    #pragma omp for nowait
        for (slot_ = 0; slot_ < nslots_; ++slot_) {

            if (!data_in_port_->slot(slot_)->RetrieveData(data_in)) {
                should_break = true;
                continue;
            }

            if(to_decode_->get()){
                data_out = data_out_port_->slot(slot_)->ClaimData(false);
                data_out->set_nsamples(grid_size_);
                shared_likelihoods_->get()->at(data_in_port_->streaminfo(slot_).stream_name())->logL(
                                                                                data_in->data().data(),
                                                                                data_in->nsamples(),
                                                                                delta_t_(),
                                                                                data_out->begin_column("logL"));

                data_out->set_hardware_timestamp(data_in->hardware_timestamp());
                data_out->set_source_timestamp();
                data_out_port_->slot(slot_)->PublishData();
            }else{
                LOG(INFO) << "Received a packet but it is not a decoding bin.";
            }
            data_in_port_->slot(slot_)->ReleaseData();
    }
    }
}
}
REGISTERPROCESSOR(MultiLikelihoodSource)